diff --git a/CHANGELOG.md b/CHANGELOG.md index b9e6dd94..891513e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ river migrate-get --database-url sqlite:// --version 6 --down > river7.down.sql - Convert SQLite JSON columns to JSONB (including migration). [PR #1224](https://github.com/riverqueue/river/pull/1224). - Change SQLite driver operations over to use bulk inserts where possible now that sqlc has better support for `json_each`. [PR #1276](https://github.com/riverqueue/river/pull/1276) - Detect duplicate step names across `river.ResumableStep` and return a validation error. [PR #1281](https://github.com/riverqueue/river/pull/1281) +- Earlier backpressure from `BatchCompleter` when it's throughput is saturated with fewer warnings to console. [PR #1292](https://github.com/riverqueue/river/pull/1292) ### Fixed diff --git a/internal/jobcompleter/job_completer.go b/internal/jobcompleter/job_completer.go index 823394c4..13159473 100644 --- a/internal/jobcompleter/job_completer.go +++ b/internal/jobcompleter/job_completer.go @@ -257,9 +257,11 @@ type BatchCompleter struct { baseservice.BaseService startstop.BaseStartStop + backlogWaitThreshold int // configurable for testing purposes; backlog at which completions start waiting for the completer to catch up + batchReadyChan chan struct{} completionMaxSize int // configurable for testing purposes; max jobs to complete in single database operation disableSleep bool // disable sleep in testing - maxBacklog int // configurable for testing purposes; max backlog allowed before no more completions accepted + maxBacklog int // configurable for testing purposes; emergency backlog threshold where a warning is logged exec riverdriver.Executor pilot riverpilot.Pilot schema string @@ -273,19 +275,22 @@ type BatchCompleter struct { func NewBatchCompleter(archetype *baseservice.Archetype, schema string, exec riverdriver.Executor, pilot riverpilot.Pilot, subscribeCh SubscribeChan) *BatchCompleter { const ( - completionMaxSize = 5_000 - maxBacklog = 20_000 + completionMaxSize = 5_000 + backlogWaitThreshold = completionMaxSize * 2 + maxBacklog = 20_000 ) return baseservice.Init(archetype, &BatchCompleter{ - completionMaxSize: completionMaxSize, - exec: exec, - maxBacklog: maxBacklog, - pilot: pilot, - schema: schema, - setStateParams: make(map[int64]*batchCompleterSetState), - setStateStartTimes: make(map[int64]time.Time), - subscribeCh: subscribeCh, + backlogWaitThreshold: backlogWaitThreshold, + batchReadyChan: make(chan struct{}, 1), + completionMaxSize: completionMaxSize, + exec: exec, + maxBacklog: maxBacklog, + pilot: pilot, + schema: schema, + setStateParams: make(map[int64]*batchCompleterSetState), + setStateStartTimes: make(map[int64]time.Time), + subscribeCh: subscribeCh, }) } @@ -330,6 +335,7 @@ func (c *BatchCompleter) Start(ctx context.Context) error { } return + case <-c.batchReadyChan: case <-ticker.C: } @@ -340,7 +346,7 @@ func (c *BatchCompleter) Start(ctx context.Context) error { // multiple of 5. So, jobs will be completed every 250ms even if the // threshold hasn't been met. const batchCompleterStartThreshold = 100 - if backlogSize() < min(c.maxBacklog, batchCompleterStartThreshold) && numTicks != 0 && numTicks%5 != 0 { + if backlogSize() < min(c.backlogWaitThresholdEffective(), batchCompleterStartThreshold) && numTicks != 0 && numTicks%5 != 0 { continue } @@ -391,6 +397,16 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { return nil } + handleBatchError := func(err error) error { + if isNonRetryableCompleterError(err) { + c.releaseBacklogWaitIfReady(ctx) + return err + } + + c.requeueBatch(ctx, setStateBatch, setStateStartTimes) + return err + } + // Complete a sub-batch with retries. Also helps reduce visual noise and // increase readability of loop below. completeSubBatch := func(batchParams *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { @@ -465,7 +481,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { } jobRowsSubBatch, err := completeSubBatch(subBatch) if err != nil { - return err + return handleBatchError(err) } jobRows = append(jobRows, jobRowsSubBatch...) } @@ -473,7 +489,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { var err error jobRows, err = completeSubBatch(params) if err != nil { - return err + return handleBatchError(err) } } @@ -494,7 +510,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { c.setStateParamsMu.Lock() defer c.setStateParamsMu.Unlock() - if c.waitOnBacklogWaiting && len(c.setStateParams) < c.maxBacklog { + if c.waitOnBacklogWaiting && len(c.setStateParams) < c.backlogResumeThreshold() { c.Logger.DebugContext(ctx, c.Name+": Disabling waitOnBacklog; ready to complete more jobs") close(c.waitOnBacklogChan) c.waitOnBacklogWaiting = false @@ -504,6 +520,41 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { return nil } +func (c *BatchCompleter) releaseBacklogWaitIfReady(ctx context.Context) { + c.setStateParamsMu.Lock() + defer c.setStateParamsMu.Unlock() + + if c.waitOnBacklogWaiting && len(c.setStateParams) < c.backlogResumeThreshold() { + c.Logger.DebugContext(ctx, c.Name+": Disabling waitOnBacklog; ready to complete more jobs") + close(c.waitOnBacklogChan) + c.waitOnBacklogWaiting = false + } +} + +func (c *BatchCompleter) requeueBatch(ctx context.Context, setStateBatch map[int64]*batchCompleterSetState, setStateStartTimes map[int64]time.Time) { + c.setStateParamsMu.Lock() + for id, setState := range setStateBatch { + if _, exists := c.setStateParams[id]; exists { + continue + } + c.setStateParams[id] = setState + c.setStateStartTimes[id] = setStateStartTimes[id] + } + backlogSize := len(c.setStateParams) + if c.waitOnBacklogWaiting && backlogSize < c.backlogResumeThreshold() { + c.Logger.DebugContext(ctx, c.Name+": Disabling waitOnBacklog; ready to complete more jobs") + close(c.waitOnBacklogChan) + c.waitOnBacklogWaiting = false + } + c.setStateParamsMu.Unlock() + + if backlogSize >= c.batchReadyThreshold() { + c.signalBatchReady() + } + + c.Logger.DebugContext(ctx, c.Name+": Requeued failed batch of job(s)", "num_jobs", len(setStateBatch)) +} + func (c *BatchCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error { now := c.Time.Now() // If we've built up too much of a backlog because the completer's fallen @@ -512,22 +563,60 @@ func (c *BatchCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobsta c.waitOrInitBacklogChannel(ctx) c.setStateParamsMu.Lock() - defer c.setStateParamsMu.Unlock() statsSnapshot := *stats c.setStateParams[params.ID] = &batchCompleterSetState{params, &statsSnapshot} c.setStateStartTimes[params.ID] = now + backlogSize := len(c.setStateParams) + c.setStateParamsMu.Unlock() + + if backlogSize >= c.batchReadyThreshold() { + c.signalBatchReady() + } return nil } +// backlogResumeThreshold returns the low-water mark below which waiting +// completers are released. Keeping this below the wait threshold avoids rapidly +// cycling between waiting and not waiting when the completer is near capacity. +func (c *BatchCompleter) backlogResumeThreshold() int { + return max(c.backlogWaitThresholdEffective()/2, 1) +} + +// backlogWaitThresholdEffective returns the backlog size at which new +// completions should wait for the batch completer to catch up. It's capped at +// maxBacklog so tests and future configuration can't set a normal wait +// threshold beyond the emergency warning threshold. +func (c *BatchCompleter) backlogWaitThresholdEffective() int { + if c.backlogWaitThreshold <= 0 { + return c.maxBacklog + } + return min(c.backlogWaitThreshold, c.maxBacklog) +} + +// batchReadyThreshold returns the backlog size at which the run loop should be +// nudged to process a batch immediately instead of waiting for its next ticker. +// It aims for a full database batch while still respecting low test thresholds. +func (c *BatchCompleter) batchReadyThreshold() int { + return min(c.completionMaxSize, c.backlogWaitThresholdEffective()) +} + +func (c *BatchCompleter) signalBatchReady() { + select { + case c.batchReadyChan <- struct{}{}: + default: + } +} + func (c *BatchCompleter) waitOrInitBacklogChannel(ctx context.Context) { c.setStateParamsMu.RLock() var ( backlogSize = len(c.setStateParams) waitChan = c.waitOnBacklogChan waiting = c.waitOnBacklogWaiting + waitAt = c.backlogWaitThresholdEffective() ) c.setStateParamsMu.RUnlock() @@ -536,33 +625,52 @@ func (c *BatchCompleter) waitOrInitBacklogChannel(ctx context.Context) { return } - // Not at max backlog. A little raciness is allowed here: multiple + // Not at the wait threshold. A little raciness is allowed here: multiple // goroutines may have acquired the read lock above and seen a size under // limit, but with all allowed to continue it could put the backlog over its - // maximum. The backlog will only be nominally over because generally max - // backlog >> max workers, so consider this okay. - if backlogSize < c.maxBacklog { + // wait threshold. The backlog will only be nominally over because generally + // the wait threshold >> max workers, so consider this okay. + if backlogSize < waitAt { return } c.setStateParamsMu.Lock() - defer c.setStateParamsMu.Unlock() - - // Check once more if another process has already started waiting (it's - // possible for multiple to race between the acquiring the lock above). If - // so, we fall through and allow this insertion to happen, even though it - // might bring the batch slightly over limit, because arranging the locks - // otherwise would get complicated. if c.waitOnBacklogWaiting { + waitChan := c.waitOnBacklogChan + c.setStateParamsMu.Unlock() + <-waitChan + return + } + backlogSize = len(c.setStateParams) + waitAt = c.backlogWaitThresholdEffective() + if backlogSize < waitAt { + c.setStateParamsMu.Unlock() return } - // Tell all future insertions to start waiting. This one is allowed to fall - // through and succeed even though it may bring the batch a little over - // limit. + // Tell future insertions to start waiting. This caller falls through so + // there is guaranteed to be a pending completion for a future batch to + // process, even if the current in-flight batch fails. c.waitOnBacklogChan = make(chan struct{}) c.waitOnBacklogWaiting = true - c.Logger.WarnContext(ctx, c.Name+": Hit maximum backlog; completions will wait until below threshold", "max_backlog", c.maxBacklog) + if backlogSize >= c.maxBacklog { + c.Logger.WarnContext(ctx, c.Name+": Hit maximum backlog; completions will wait until below threshold", + "backlog_size", backlogSize, + "backlog_wait_threshold", waitAt, + "max_backlog", c.maxBacklog, + ) + } else { + c.Logger.DebugContext(ctx, c.Name+": Applying completion backlog pressure", + "backlog_resume_threshold", c.backlogResumeThreshold(), + "backlog_size", backlogSize, + "backlog_wait_threshold", waitAt, + ) + } + c.setStateParamsMu.Unlock() +} + +func isNonRetryableCompleterError(err error) bool { + return errors.Is(err, context.Canceled) || errors.Is(err, riverdriver.ErrClosedPool) } // As configured, total time asleep from initial attempt is ~7 seconds (1 + 2 + @@ -586,13 +694,8 @@ func withRetries[T any](logCtx context.Context, baseService *baseservice.BaseSer retVal, err := retryFunc(ctx) if err != nil { - // A cancelled context will never succeed, return immediately. - if errors.Is(err, context.Canceled) { - return defaultVal, err - } - - // A closed pool will never succeed, return immediately. - if errors.Is(err, riverdriver.ErrClosedPool) { + // A cancelled context or a closed pool will never succeed. + if isNonRetryableCompleterError(err) { return defaultVal, err } diff --git a/internal/jobcompleter/job_completer_test.go b/internal/jobcompleter/job_completer_test.go index 18d9f465..54e74623 100644 --- a/internal/jobcompleter/job_completer_test.go +++ b/internal/jobcompleter/job_completer_test.go @@ -459,7 +459,8 @@ func TestBatchCompleter(t *testing.T) { t.Parallel() completer, bundle := setup(t) - completer.maxBacklog = 10 // set to something artificially low + completer.backlogWaitThreshold = 10 // set to something artificially low + completer.completionMaxSize = 10 startCompleter(ctx, t, completer) jobUpdateChan := make(chan CompleterJobUpdated, 100) @@ -487,6 +488,195 @@ func TestBatchCompleter(t *testing.T) { }) } +func TestBatchCompleter_BackpressureBeforeMaxBacklog(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + execMock := &partialExecutorMock{} + execMock.JobSetStateIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { + rows := make([]*rivertype.JobRow, len(params.ID)) + for i := range params.ID { + rows[i] = &rivertype.JobRow{ + ID: params.ID[i], + State: params.State[i], + } + } + return rows, nil + } + + subscribeCh := make(chan []CompleterJobUpdated, 1) + completer := NewBatchCompleter(riversharedtest.BaseServiceArchetype(t), "", execMock, &riverpilot.StandardPilot{}, subscribeCh) + completer.backlogWaitThreshold = 2 + completer.completionMaxSize = 2 + completer.disableSleep = true + completer.maxBacklog = 100 + + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(1, time.Now(), nil))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(2, time.Now(), nil))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(3, time.Now(), nil))) + + errCh := make(chan error, 1) + go func() { + errCh <- completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(4, time.Now(), nil)) + }() + + require.Eventually(t, func() bool { + completer.setStateParamsMu.RLock() + defer completer.setStateParamsMu.RUnlock() + return completer.waitOnBacklogWaiting + }, riversharedtest.WaitTimeout(), 10*time.Millisecond) + + select { + case err := <-errCh: + require.NoError(t, err) + require.FailNow(t, "expected completion to wait for backlog pressure to clear") + default: + } + + require.NoError(t, completer.handleBatch(ctx)) + require.NoError(t, riversharedtest.WaitOrTimeout(t, errCh)) +} + +func TestBatchCompleter_BackpressureReleasedAfterNonRetryableCompletionFailure(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + execMock := &partialExecutorMock{} + execMock.JobSetStateIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { + return nil, context.Canceled + } + + subscribeCh := make(chan []CompleterJobUpdated, 1) + completer := NewBatchCompleter(riversharedtest.BaseServiceArchetype(t), "", execMock, &riverpilot.StandardPilot{}, subscribeCh) + completer.backlogWaitThreshold = 2 + completer.disableSleep = true + completer.maxBacklog = 100 + + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(1, time.Now(), nil))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(2, time.Now(), nil))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(3, time.Now(), nil))) + + require.Eventually(t, func() bool { + completer.setStateParamsMu.RLock() + defer completer.setStateParamsMu.RUnlock() + return completer.waitOnBacklogWaiting + }, riversharedtest.WaitTimeout(), 10*time.Millisecond) + + errCh := make(chan error, 1) + go func() { + errCh <- completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(4, time.Now(), nil)) + }() + + select { + case err := <-errCh: + require.NoError(t, err) + require.FailNow(t, "expected completion to wait for backlog pressure to clear") + default: + } + + require.ErrorIs(t, completer.handleBatch(ctx), context.Canceled) + require.NoError(t, riversharedtest.WaitOrTimeout(t, errCh)) +} + +func TestBatchCompleter_BackpressureRequeuesBatchAfterCompletionFailure(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + expectedErr := errors.New("error from batch completion") + var numCalls int + execMock := &partialExecutorMock{} + execMock.JobSetStateIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { + numCalls++ + if numCalls <= numRetries { + return nil, expectedErr + } + + rows := make([]*rivertype.JobRow, len(params.ID)) + for i := range params.ID { + rows[i] = &rivertype.JobRow{ + ID: params.ID[i], + State: params.State[i], + } + } + return rows, nil + } + + subscribeCh := make(chan []CompleterJobUpdated, 1) + completer := NewBatchCompleter(riversharedtest.BaseServiceArchetype(t), "", execMock, &riverpilot.StandardPilot{}, subscribeCh) + completer.backlogWaitThreshold = 2 + completer.completionMaxSize = 10 + completer.disableSleep = true + completer.maxBacklog = 100 + + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(1, time.Now(), nil))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(2, time.Now(), nil))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(3, time.Now(), nil))) + + errCh := make(chan error, 1) + go func() { + errCh <- completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(4, time.Now(), nil)) + }() + + require.Eventually(t, func() bool { + completer.setStateParamsMu.RLock() + defer completer.setStateParamsMu.RUnlock() + return completer.waitOnBacklogWaiting + }, riversharedtest.WaitTimeout(), 10*time.Millisecond) + + require.ErrorIs(t, completer.handleBatch(ctx), expectedErr) + + completer.setStateParamsMu.RLock() + require.Len(t, completer.setStateParams, 3) + completer.setStateParamsMu.RUnlock() + + require.NoError(t, completer.handleBatch(ctx)) + require.NoError(t, riversharedtest.WaitOrTimeout(t, errCh)) +} + +func TestBatchCompleter_NonRetryableCompletionFailureDoesNotRequeueBatch(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + for _, tt := range []struct { + err error + name string + }{ + {err: context.Canceled, name: "ContextCanceled"}, + {err: riverdriver.ErrClosedPool, name: "ErrClosedPool"}, + } { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + var numCalls int + execMock := &partialExecutorMock{} + execMock.JobSetStateIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { + numCalls++ + return nil, tt.err + } + + subscribeCh := make(chan []CompleterJobUpdated, 1) + completer := NewBatchCompleter(riversharedtest.BaseServiceArchetype(t), "", execMock, &riverpilot.StandardPilot{}, subscribeCh) + completer.disableSleep = true + + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(1, time.Now(), nil))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(2, time.Now(), nil))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(3, time.Now(), nil))) + + require.ErrorIs(t, completer.handleBatch(ctx), tt.err) + require.Equal(t, 1, numCalls) + + completer.setStateParamsMu.RLock() + require.Empty(t, completer.setStateParams) + require.Empty(t, completer.setStateStartTimes) + completer.setStateParamsMu.RUnlock() + }) + } +} + func TestBatchCompleter_JobStatsSnapshotsPerUpdate(t *testing.T) { t.Parallel()