From 54cc5164682194af3f698c26a7ae60deaa41bcce Mon Sep 17 00:00:00 2001 From: Jyothis Date: Fri, 6 Mar 2026 23:31:55 -0500 Subject: [PATCH] perf: improve prefetch logic with pipeline optimizations --- internal/stream/pipe.go | 66 +++++++++++++++++++---------------------- 1 file changed, 30 insertions(+), 36 deletions(-) diff --git a/internal/stream/pipe.go b/internal/stream/pipe.go index 0dc3f5b8..341398b0 100644 --- a/internal/stream/pipe.go +++ b/internal/stream/pipe.go @@ -138,7 +138,15 @@ func (p *StreamPipe) Close() error { return nil } +// blockResult holds the outcome of a single block download. +type blockResult struct { + data []byte + err error +} + // prefetch runs in a goroutine, fetching blocks concurrently and sending to blockQueue. +// each block is pushed to the reader as soon as it's ready (in order), rather than +// waiting for the entire batch to complete — this minimizes time-to-first-byte. func (p *StreamPipe) prefetch() { defer close(p.blockQueue) @@ -159,34 +167,24 @@ func (p *StreamPipe) prefetch() { default: } - // fetch a batch of blocks concurrently + // fetch a batch of blocks concurrently, each with its own result channel batchSize := min(config.ValueOf.StreamConcurrency, totalBlocks-currentBlock) - blocks := make([][]byte, batchSize) - - var wg sync.WaitGroup - var fetchErr error - var errMu sync.Mutex + results := make([]chan blockResult, batchSize) for i := range batchSize { - wg.Add(1) + results[i] = make(chan blockResult, 1) go func(idx int) { - defer wg.Done() - blockNum := currentBlock + idx blockOffset := offset + int64(idx)*p.blockSize data, err := p.downloadBlockWithRetry(blockOffset) - dataLen := int64(len(data)) - if err != nil { - errMu.Lock() - if fetchErr == nil { - fetchErr = err - } - errMu.Unlock() + results[idx] <- blockResult{err: err} return } + dataLen := int64(len(data)) + // trim first/last block to exact range if totalBlocks == 1 { if dataLen < rightTrim { @@ -207,30 +205,26 @@ func (p *StreamPipe) prefetch() { } } - blocks[idx] = data + results[idx] <- blockResult{data: data} }(i) } - wg.Wait() - - // handle errors - // ignore context cancellation cuz it's expected on disconnect - if fetchErr != nil { - if p.ctx.Err() == nil { - p.log.Error("block download failed", zap.Error(fetchErr)) - } - return - } - - // send blocks to queue in order - for _, block := range blocks { - if block == nil { - // a fetch failure that wasn't captured, should not happen but just in case. - p.log.Error("unexpected nil block in batch, aborting prefetch") - return - } + // drain results in order — block 0 is sent to reader as soon as it's ready, + // without waiting for blocks 1, 2, 3 to finish + for i := range batchSize { select { - case p.blockQueue <- block: + case res := <-results[i]: + if res.err != nil { + if p.ctx.Err() == nil { + p.log.Error("block download failed", zap.Error(res.err)) + } + return + } + select { + case p.blockQueue <- res.data: + case <-p.ctx.Done(): + return + } case <-p.ctx.Done(): return }