Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 30 additions & 36 deletions internal/stream/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,15 @@ func (p *StreamPipe) Close() error {
return nil
}

// blockResult holds the outcome of a single block download.
type blockResult struct {
data []byte
err error
}

// prefetch runs in a goroutine, fetching blocks concurrently and sending to blockQueue.
// each block is pushed to the reader as soon as it's ready (in order), rather than
// waiting for the entire batch to complete — this minimizes time-to-first-byte.
func (p *StreamPipe) prefetch() {
defer close(p.blockQueue)

Expand All @@ -159,34 +167,24 @@ func (p *StreamPipe) prefetch() {
default:
}

// fetch a batch of blocks concurrently
// fetch a batch of blocks concurrently, each with its own result channel
batchSize := min(config.ValueOf.StreamConcurrency, totalBlocks-currentBlock)
blocks := make([][]byte, batchSize)

var wg sync.WaitGroup
var fetchErr error
var errMu sync.Mutex
results := make([]chan blockResult, batchSize)

for i := range batchSize {
wg.Add(1)
results[i] = make(chan blockResult, 1)
go func(idx int) {
defer wg.Done()

blockNum := currentBlock + idx
blockOffset := offset + int64(idx)*p.blockSize

data, err := p.downloadBlockWithRetry(blockOffset)
dataLen := int64(len(data))

if err != nil {
errMu.Lock()
if fetchErr == nil {
fetchErr = err
}
errMu.Unlock()
results[idx] <- blockResult{err: err}
return
}

dataLen := int64(len(data))

// trim first/last block to exact range
if totalBlocks == 1 {
if dataLen < rightTrim {
Expand All @@ -207,30 +205,26 @@ func (p *StreamPipe) prefetch() {
}
}

blocks[idx] = data
results[idx] <- blockResult{data: data}
}(i)
}

wg.Wait()

// handle errors
// ignore context cancellation cuz it's expected on disconnect
if fetchErr != nil {
if p.ctx.Err() == nil {
p.log.Error("block download failed", zap.Error(fetchErr))
}
return
}

// send blocks to queue in order
for _, block := range blocks {
if block == nil {
// a fetch failure that wasn't captured, should not happen but just in case.
p.log.Error("unexpected nil block in batch, aborting prefetch")
return
}
// drain results in order — block 0 is sent to reader as soon as it's ready,
// without waiting for blocks 1, 2, 3 to finish
for i := range batchSize {
select {
case p.blockQueue <- block:
case res := <-results[i]:
if res.err != nil {
if p.ctx.Err() == nil {
p.log.Error("block download failed", zap.Error(res.err))
}
return
}
select {
case p.blockQueue <- res.data:
case <-p.ctx.Done():
return
}
case <-p.ctx.Done():
return
}
Expand Down