diff --git a/CHANGELOG.md b/CHANGELOG.md index b9e6dd94..75059726 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ river migrate-get --database-url sqlite:// --version 6 --down > river7.down.sql - Convert SQLite JSON columns to JSONB (including migration). [PR #1224](https://github.com/riverqueue/river/pull/1224). - Change SQLite driver operations over to use bulk inserts where possible now that sqlc has better support for `json_each`. [PR #1276](https://github.com/riverqueue/river/pull/1276) - Detect duplicate step names across `river.ResumableStep` and return a validation error. [PR #1281](https://github.com/riverqueue/river/pull/1281) +- Jobs that didn't finish in time organically while a client was stopping and had to have their context cancelled no longer have this cancellation counted as an error. `attempt` is reset to the number it was before the job started working, `errors` is left unchanged, and `state` is made `available` so jobs are eligible to be retried immediately. [PR #1290](https://github.com/riverqueue/river/pull/1290) ### Fixed diff --git a/client_test.go b/client_test.go index 9352dcbb..1ea4562d 100644 --- a/client_test.go +++ b/client_test.go @@ -2581,6 +2581,67 @@ func Test_Client_SoftStopTimeout(t *testing.T) { } }) + t.Run("ErroringJobGetsFreshAttempt", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + config.SoftStopTimeout = 100 * time.Millisecond + + firstRunDoneChan := make(chan struct{}) + jobStartedChan := make(chan int64, 2) + var runCount atomic.Int32 + AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + jobStartedChan <- job.ID + switch runCount.Add(1) { + case 1: + <-ctx.Done() + close(firstRunDoneChan) + return ctx.Err() + default: + return errors.New("real job error") + } + })) + + client := runNewTestClient(ctx, t, config) + + insertRes, err := client.Insert(ctx, JobArgs{}, &InsertOpts{MaxAttempts: 2}) + require.NoError(t, err) + + jobID := riversharedtest.WaitOrTimeout(t, jobStartedChan) + require.Equal(t, insertRes.Job.ID, jobID) + + require.NoError(t, client.Stop(ctx)) + riversharedtest.WaitOrTimeout(t, firstRunDoneChan) + + jobAfter, err := client.driver.GetExecutor().JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: jobID, Schema: client.config.Schema}) + require.NoError(t, err) + require.Equal(t, 0, jobAfter.Attempt) + require.Nil(t, jobAfter.FinalizedAt) + require.Equal(t, 2, jobAfter.MaxAttempts) + require.Equal(t, rivertype.JobStateAvailable, jobAfter.State) + require.WithinDuration(t, time.Now(), jobAfter.ScheduledAt, 2*time.Second) + require.Empty(t, jobAfter.Errors) + + require.NoError(t, client.Start(ctx)) + + jobID = riversharedtest.WaitOrTimeout(t, jobStartedChan) + require.Equal(t, insertRes.Job.ID, jobID) + + var jobAfterRealError *rivertype.JobRow + require.Eventually(t, func() bool { + var err error + jobAfterRealError, err = client.driver.GetExecutor().JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: jobID, Schema: client.config.Schema}) + require.NoError(t, err) + return jobAfterRealError.State != rivertype.JobStateRunning + }, 5*time.Second, 10*time.Millisecond) + + require.Equal(t, 1, jobAfterRealError.Attempt) + require.Equal(t, rivertype.JobStateRetryable, jobAfterRealError.State) + require.Len(t, jobAfterRealError.Errors, 1) + require.Equal(t, "real job error", jobAfterRealError.Errors[0].Error) + require.Less(t, time.Until(jobAfterRealError.ScheduledAt), 3*time.Second) + }) + t.Run("SoftStopSucceedsBeforeTimeout", func(t *testing.T) { t.Parallel() diff --git a/internal/jobcompleter/job_completer_test.go b/internal/jobcompleter/job_completer_test.go index 18d9f465..6c8e7a45 100644 --- a/internal/jobcompleter/job_completer_test.go +++ b/internal/jobcompleter/job_completer_test.go @@ -779,7 +779,7 @@ func testCompleter[TCompleter JobCompleter]( require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCancelled(job1.ID, time.Now(), []byte("{}"), nil))) require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job2.ID, time.Now(), nil))) require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateDiscarded(job3.ID, time.Now(), []byte("{}"), nil))) - require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateErrorAvailable(job4.ID, time.Now(), []byte("{}"), nil))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateErrorAvailable(job4.ID, time.Now(), nil, []byte("{}"), nil))) require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateErrorRetryable(job5.ID, time.Now(), []byte("{}"), nil))) require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateSnoozed(job6.ID, time.Now(), 10, nil))) require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateSnoozedAvailable(job7.ID, time.Now(), 10, nil))) @@ -1128,7 +1128,7 @@ func benchmarkCompleter( require.NoError(b, err) case 3: - err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateErrorAvailable(bundle.jobs[i].ID, time.Now(), []byte("{}"), nil)) + err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateErrorAvailable(bundle.jobs[i].ID, time.Now(), nil, []byte("{}"), nil)) require.NoError(b, err) case 4: diff --git a/internal/jobexecutor/job_executor.go b/internal/jobexecutor/job_executor.go index dab44f39..18d191e8 100644 --- a/internal/jobexecutor/job_executor.go +++ b/internal/jobexecutor/job_executor.go @@ -18,9 +18,11 @@ import ( "github.com/riverqueue/river/internal/jobcompleter" "github.com/riverqueue/river/internal/jobstats" "github.com/riverqueue/river/internal/middlewarelookup" + "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/workunit" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" + "github.com/riverqueue/river/rivershared/util/ptrutil" "github.com/riverqueue/river/rivertype" ) @@ -423,6 +425,7 @@ func (e *JobExecutor) reportError(ctx context.Context, jobRow *rivertype.JobRow, cancelJob bool cancelErr *rivertype.JobCancelError ) + softStopped := isSoftStopCancelError(ctx, res.Err) logAttrs := []any{ slog.String("error", res.ErrorStr()), @@ -434,6 +437,8 @@ func (e *JobExecutor) reportError(ctx context.Context, jobRow *rivertype.JobRow, case errors.As(res.Err, &cancelErr): cancelJob = true e.Logger.DebugContext(ctx, e.Name+": Job cancelled explicitly", logAttrs...) + case softStopped: + e.Logger.InfoContext(ctx, e.Name+": Job stopped due to client shutdown; retrying", logAttrs...) case res.Err != nil: if jobRow.Attempt >= jobRow.MaxAttempts { e.Logger.InfoContext(ctx, e.Name+": Job errored", logAttrs...) @@ -444,11 +449,21 @@ func (e *JobExecutor) reportError(ctx context.Context, jobRow *rivertype.JobRow, e.Logger.InfoContext(ctx, e.Name+": Job panicked", logAttrs...) } - if e.ErrorHandler != nil && !cancelJob { + if e.ErrorHandler != nil && !cancelJob && !softStopped { // Error handlers also have an opportunity to cancel the job. cancelJob = e.invokeErrorHandler(ctx, res) } + now := e.Time.Now() + + if softStopped { + params := riverdriver.JobSetStateErrorAvailable(jobRow.ID, now, ptrutil.Ptr(max(jobRow.Attempt-1, 0)), nil, metadataUpdates) + if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, params); err != nil { + e.Logger.ErrorContext(ctx, e.Name+": Failed to make soft-stopped job available", logAttrs...) + } + return + } + attemptErr := rivertype.AttemptError{ At: e.start, Attempt: jobRow.Attempt, @@ -462,8 +477,6 @@ func (e *JobExecutor) reportError(ctx context.Context, jobRow *rivertype.JobRow, return } - now := e.Time.Now() - if cancelJob { if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCancelled(jobRow.ID, now, errData, metadataUpdates)); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Failed to cancel job and report error", logAttrs...) @@ -503,7 +516,7 @@ func (e *JobExecutor) reportError(ctx context.Context, jobRow *rivertype.JobRow, // `available` if their retry was smaller than the scheduler's run interval. var params *riverdriver.JobSetStateIfRunningParams if nextRetryScheduledAt.Sub(e.Time.Now()) <= e.SchedulerInterval { - params = riverdriver.JobSetStateErrorAvailable(jobRow.ID, nextRetryScheduledAt, errData, metadataUpdates) + params = riverdriver.JobSetStateErrorAvailable(jobRow.ID, nextRetryScheduledAt, nil, errData, metadataUpdates) } else { params = riverdriver.JobSetStateErrorRetryable(jobRow.ID, nextRetryScheduledAt, errData, metadataUpdates) } @@ -512,6 +525,15 @@ func (e *JobExecutor) reportError(ctx context.Context, jobRow *rivertype.JobRow, } } +// isSoftStopCancelError reports whether a worker returned because the client +// was stopping and cancelled its job context. The context cause distinguishes +// client stop cancellation from ordinary worker cancellation or timeouts. +func isSoftStopCancelError(ctx context.Context, err error) bool { + return err != nil && + errors.Is(context.Cause(ctx), rivercommon.ErrStop) && + (errors.Is(err, context.Canceled) || errors.Is(err, rivercommon.ErrStop)) +} + type withJobsAndErrorsByID interface { ErrorsByID() map[int64]error Jobs() []*rivertype.JobRow diff --git a/internal/jobexecutor/job_executor_test.go b/internal/jobexecutor/job_executor_test.go index 1674fa7e..85734040 100644 --- a/internal/jobexecutor/job_executor_test.go +++ b/internal/jobexecutor/job_executor_test.go @@ -363,6 +363,42 @@ func TestJobExecutor_Execute(t *testing.T) { require.Equal(t, rivertype.JobStateDiscarded, job.State) }) + // "Decrements attempt" means restoring attempt to the value it had before + // JobGetAvailable incremented it for this run. + t.Run("SoftStopCancelMakesJobAvailableAndDecrementsAttempt", func(t *testing.T) { + t.Parallel() + + executor, bundle := setup(t) + + bundle.jobRow.Attempt = bundle.jobRow.MaxAttempts + _, err := bundle.exec.JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{ + ID: bundle.jobRow.ID, + AttemptDoUpdate: true, + Attempt: bundle.jobRow.Attempt, + }) + require.NoError(t, err) + + workCtx, cancel := context.WithCancelCause(ctx) + cancel(rivercommon.ErrStop) + + executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return context.Canceled }, nil).MakeUnit(bundle.jobRow) + + executor.Execute(workCtx) + riversharedtest.WaitOrTimeout(t, bundle.updateCh) + + job, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ + ID: bundle.jobRow.ID, + Schema: "", + }) + require.NoError(t, err) + require.Nil(t, job.FinalizedAt) + require.Equal(t, bundle.jobRow.Attempt-1, job.Attempt) + require.Equal(t, bundle.jobRow.MaxAttempts, job.MaxAttempts) + require.Equal(t, rivertype.JobStateAvailable, job.State) + require.WithinDuration(t, time.Now(), job.ScheduledAt, 2*time.Second) + require.Empty(t, job.Errors) + }) + t.Run("JobCancelErrorCancelsJobEvenWithRemainingAttempts", func(t *testing.T) { t.Parallel() diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 7d92ab32..f54e1de3 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -588,8 +588,11 @@ func JobSetStateDiscarded(id int64, finalizedAt time.Time, errData []byte, metad } } -func JobSetStateErrorAvailable(id int64, scheduledAt time.Time, errData []byte, metadataUpdates []byte) *JobSetStateIfRunningParams { +// JobSetStateErrorAvailable makes an errored job immediately available. +// attempt can be set to nil to leave attempt unchanged. +func JobSetStateErrorAvailable(id int64, scheduledAt time.Time, attempt *int, errData []byte, metadataUpdates []byte) *JobSetStateIfRunningParams { return &JobSetStateIfRunningParams{ + Attempt: attempt, ID: id, ErrData: errData, MetadataDoMerge: len(metadataUpdates) > 0, diff --git a/riverdriver/river_driver_interface_test.go b/riverdriver/river_driver_interface_test.go index 978e299c..5976eda8 100644 --- a/riverdriver/river_driver_interface_test.go +++ b/riverdriver/river_driver_interface_test.go @@ -131,14 +131,36 @@ func TestJobSetStateDiscarded(t *testing.T) { func TestJobSetStateErrorAvailable(t *testing.T) { t.Parallel() + t.Run("Attempt", func(t *testing.T) { + t.Parallel() + + id := int64(4) + scheduledAt := time.Now().Truncate(time.Second) + errData := []byte("error available") + attempt := 3 + result := JobSetStateErrorAvailable(id, scheduledAt, &attempt, errData, nil) + require.Equal(t, id, result.ID) + require.NotNil(t, result.Attempt) + require.Equal(t, attempt, *result.Attempt) + require.Equal(t, errData, result.ErrData) + require.False(t, result.MetadataDoMerge) + require.Nil(t, result.MetadataUpdates) + require.NotNil(t, result.ScheduledAt) + require.True(t, result.ScheduledAt.Equal(scheduledAt)) + require.Empty(t, result.Schema) + require.False(t, result.Snoozed) + require.Equal(t, rivertype.JobStateAvailable, result.State) + }) + t.Run("EmptyMetadata", func(t *testing.T) { t.Parallel() id := int64(4) scheduledAt := time.Now().Truncate(time.Second) errData := []byte("error available") - result := JobSetStateErrorAvailable(id, scheduledAt, errData, nil) + result := JobSetStateErrorAvailable(id, scheduledAt, nil, errData, nil) require.Equal(t, id, result.ID) + require.Nil(t, result.Attempt) require.Equal(t, errData, result.ErrData) require.False(t, result.MetadataDoMerge) require.Nil(t, result.MetadataUpdates) @@ -156,8 +178,9 @@ func TestJobSetStateErrorAvailable(t *testing.T) { scheduledAt := time.Now().Truncate(time.Second) errData := []byte("error available") metadata := []byte(`{"key": "value"}`) - result := JobSetStateErrorAvailable(id, scheduledAt, errData, metadata) + result := JobSetStateErrorAvailable(id, scheduledAt, nil, errData, metadata) require.Equal(t, id, result.ID) + require.Nil(t, result.Attempt) require.True(t, result.MetadataDoMerge) require.Equal(t, metadata, result.MetadataUpdates) require.NotNil(t, result.ScheduledAt) diff --git a/riverdriver/riverdrivertest/job_update.go b/riverdriver/riverdrivertest/job_update.go index e2d22210..e768a0d7 100644 --- a/riverdriver/riverdrivertest/job_update.go +++ b/riverdriver/riverdrivertest/job_update.go @@ -697,6 +697,44 @@ func exerciseJobUpdate[TTx any](ctx context.Context, t *testing.T, executorWithT require.Equal(t, "foo.go:123\nbar.go:456", jobAfter.Errors[0].Trace) }) + t.Run("SetsARunningJobToAvailableWithUpdatedAttempt", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + attempt := 2 + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + Attempt: ptrutil.Ptr(3), + MaxAttempts: ptrutil.Ptr(3), + State: ptrutil.Ptr(rivertype.JobStateRunning), + UniqueKey: []byte("unique-key"), + }) + + params := riverdriver.JobSetStateErrorAvailable(job.ID, now, &attempt, makeErrPayload(t, now), nil) + jobsAfter, err := exec.JobSetStateIfRunningMany(ctx, setStateManyParams(params)) + require.NoError(t, err) + jobAfter := jobsAfter[0] + require.Equal(t, attempt, jobAfter.Attempt) + require.Equal(t, rivertype.JobStateAvailable, jobAfter.State) + require.Equal(t, 3, jobAfter.MaxAttempts) + require.WithinDuration(t, now, jobAfter.ScheduledAt, time.Microsecond) + + jobUpdated, err := exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: ""}) + require.NoError(t, err) + require.Equal(t, attempt, jobUpdated.Attempt) + require.Equal(t, rivertype.JobStateAvailable, jobUpdated.State) + require.Equal(t, 3, jobUpdated.MaxAttempts) + require.Equal(t, "unique-key", string(jobUpdated.UniqueKey)) + + require.Len(t, jobAfter.Errors, 1) + require.Equal(t, now, jobAfter.Errors[0].At) + require.Equal(t, 1, jobAfter.Errors[0].Attempt) + require.Equal(t, "fake error", jobAfter.Errors[0].Error) + require.Equal(t, "foo.go:123\nbar.go:456", jobAfter.Errors[0].Trace) + }) + t.Run("DoesNotTouchAlreadyRetryableJobWithNoMetadataUpdates", func(t *testing.T) { t.Parallel()