diff --git a/cmd/workflow/purge.go b/cmd/workflow/purge.go index 3197fafb4..415003091 100644 --- a/cmd/workflow/purge.go +++ b/cmd/workflow/purge.go @@ -15,6 +15,8 @@ package workflow import ( "errors" + "slices" + "strings" "github.com/dapr/cli/pkg/workflow" "github.com/dapr/kit/signals" @@ -22,11 +24,12 @@ import ( ) var ( - flagPurgeOlderThan string - flagPurgeAll bool - flagPurgeConn *connFlag - flagPurgeForce bool - schedulerNamespace string + flagPurgeOlderThan string + flagPurgeAll bool + flagPurgeConn *connFlag + flagPurgeForce bool + flagPurgeFilterStatus string + schedulerNamespace string ) var PurgeCmd = &cobra.Command{ @@ -41,6 +44,9 @@ var PurgeCmd = &cobra.Command{ return errors.New("no arguments are accepted when using purge all flags") } default: + if cmd.Flags().Changed("all-filter-status") { + return errors.New("--all-filter-status can only be used with --all-older-than") + } if len(args) == 0 { return errors.New("one or more workflow instance ID arguments are required when not using purge all flags") } @@ -75,14 +81,38 @@ var PurgeCmd = &cobra.Command{ } } + if cmd.Flags().Changed("all-filter-status") { + opts.AllFilterStatus = &flagPurgeFilterStatus + } + return workflow.Purge(ctx, opts) }, } +var purgeFilterStatuses = workflow.RuntimeStatuses + func init() { PurgeCmd.Flags().StringVar(&flagPurgeOlderThan, "all-older-than", "", "Purge workflow instances older than the specified Go duration or timestamp, e.g., '24h' or '2023-01-02T15:04:05Z'.") PurgeCmd.Flags().BoolVar(&flagPurgeAll, "all", false, "Purge all workflow instances in a terminal state. Use with caution.") + PurgeCmd.Flags().StringVar(&flagPurgeFilterStatus, "all-filter-status", "", "Filter purge to only workflow instances with the given runtime status. Must be used with --all-older-than. One of "+strings.Join(purgeFilterStatuses, ", ")) PurgeCmd.MarkFlagsMutuallyExclusive("all-older-than", "all") + PurgeCmd.MarkFlagsMutuallyExclusive("all-filter-status", "all") + + pre := PurgeCmd.PreRunE + PurgeCmd.PreRunE = func(cmd *cobra.Command, args []string) error { + if cmd.Flags().Changed("all-filter-status") { + if !slices.Contains(purgeFilterStatuses, flagPurgeFilterStatus) { + return errors.New("invalid value for --all-filter-status. Supported values are " + strings.Join(purgeFilterStatuses, ", ")) + } + if !slices.Contains(workflow.TerminalStatuses, flagPurgeFilterStatus) && !flagPurgeForce { + return errors.New("--force is required when using --all-filter-status with a non-terminal status (" + flagPurgeFilterStatus + ")") + } + } + if pre != nil { + return pre(cmd, args) + } + return nil + } PurgeCmd.Flags().BoolVar(&flagPurgeForce, "force", false, "force will force a purge of a workflow, regardless of its current runtime state, or whether an active worker can process it, the backend will attempt to delete it anyway. This necessarily means the purging is executed out side of the workflow state machine, and therefore, can lead to corrupt state or broken workflow execution. Usage of this should _only_ be used when you know the workflow is not being currently processed. It is highly recommended to avoid using this flag unless absolutely necessary.") PurgeCmd.Flags().StringVar(&schedulerNamespace, "scheduler-namespace", "dapr-system", "Kubernetes namespace where the scheduler is deployed, only relevant if --kubernetes is set") diff --git a/cmd/workflow/purge_test.go b/cmd/workflow/purge_test.go new file mode 100644 index 000000000..9a30ae9ed --- /dev/null +++ b/cmd/workflow/purge_test.go @@ -0,0 +1,54 @@ +/* +Copyright 2026 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "testing" + + "github.com/dapr/cli/pkg/workflow" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPurgeFilterStatuses(t *testing.T) { + assert.Equal(t, workflow.RuntimeStatuses, purgeFilterStatuses) +} + +func TestPurgeCmdFlags(t *testing.T) { + t.Run("all-filter-status flag is registered", func(t *testing.T) { + f := PurgeCmd.Flags().Lookup("all-filter-status") + assert.NotNil(t, f) + assert.Equal(t, "string", f.Value.Type()) + assert.Contains(t, f.Usage, "Must be used with --all-older-than") + }) + + t.Run("all-filter-status and all are mutually exclusive", func(t *testing.T) { + WorkflowCmd.SetArgs([]string{"purge", "--all", "--all-filter-status", "COMPLETED"}) + err := WorkflowCmd.Execute() + require.Error(t, err) + assert.Contains(t, err.Error(), "if any flags in the group [all-filter-status all] are set none of the others can be") + }) + + t.Run("all-older-than flag is registered", func(t *testing.T) { + f := PurgeCmd.Flags().Lookup("all-older-than") + assert.NotNil(t, f) + }) + + t.Run("non-terminal status without force errors", func(t *testing.T) { + WorkflowCmd.SetArgs([]string{"purge", "--all-older-than", "1s", "--all-filter-status", "RUNNING"}) + err := WorkflowCmd.Execute() + require.Error(t, err) + assert.Contains(t, err.Error(), "--force is required when using --all-filter-status with a non-terminal status") + }) +} diff --git a/cmd/workflow/workflow.go b/cmd/workflow/workflow.go index bbfb91ff5..f16de974d 100644 --- a/cmd/workflow/workflow.go +++ b/cmd/workflow/workflow.go @@ -148,16 +148,7 @@ func filterCmd(cmd *cobra.Command) *workflow.Filter { status string maxAge string - listStatuses = []string{ - "RUNNING", - "COMPLETED", - "CONTINUED_AS_NEW", - "FAILED", - "CANCELED", - "TERMINATED", - "PENDING", - "SUSPENDED", - } + listStatuses = workflow.RuntimeStatuses ) cmd.Flags().StringVarP(&name, "filter-name", "w", "", "Filter only the workflows with the given name") diff --git a/pkg/workflow/list.go b/pkg/workflow/list.go index 477512e3c..9fbbd5376 100644 --- a/pkg/workflow/list.go +++ b/pkg/workflow/list.go @@ -44,6 +44,28 @@ type Filter struct { Terminal bool } +// RuntimeStatuses is the canonical list of workflow runtime statuses. +var RuntimeStatuses = []string{ + "RUNNING", + "COMPLETED", + "CONTINUED_AS_NEW", + "FAILED", + "CANCELED", + "TERMINATED", + "PENDING", + "SUSPENDED", +} + +// TerminalStatuses is the subset of RuntimeStatuses that represent terminal +// (completed) workflow states. +var TerminalStatuses = []string{ + "COMPLETED", + "CONTINUED_AS_NEW", + "FAILED", + "CANCELED", + "TERMINATED", +} + type ListOutputShort struct { Namespace string `csv:"-" json:"namespace" yaml:"namespace"` AppID string `csv:"-" json:"appID" yaml:"appID"` diff --git a/pkg/workflow/purge.go b/pkg/workflow/purge.go index 711473009..7a9c220c1 100644 --- a/pkg/workflow/purge.go +++ b/pkg/workflow/purge.go @@ -32,6 +32,7 @@ type PurgeOptions struct { AppID string InstanceIDs []string AllOlderThan *time.Time + AllFilterStatus *string All bool Force bool @@ -39,12 +40,28 @@ type PurgeOptions struct { TableName *string } +// BuildPurgeFilter constructs the Filter used when listing workflow instances +// for bulk purge. When AllFilterStatus is set, it filters by that status +// instead of using the default terminal-only filter. +func BuildPurgeFilter(allFilterStatus *string) Filter { + filter := Filter{ + Terminal: true, + } + if allFilterStatus != nil { + filter.Terminal = false + filter.Status = allFilterStatus + } + return filter +} + func Purge(ctx context.Context, opts PurgeOptions) error { var toPurge []string if len(opts.InstanceIDs) > 0 { toPurge = opts.InstanceIDs } else { + filter := BuildPurgeFilter(opts.AllFilterStatus) + var list []*ListOutputWide var err error list, err = ListWide(ctx, ListOptions{ @@ -53,9 +70,7 @@ func Purge(ctx context.Context, opts PurgeOptions) error { AppID: opts.AppID, ConnectionString: opts.ConnectionString, TableName: opts.TableName, - Filter: Filter{ - Terminal: true, - }, + Filter: filter, }) if err != nil { return err diff --git a/pkg/workflow/purge_test.go b/pkg/workflow/purge_test.go new file mode 100644 index 000000000..120d17a08 --- /dev/null +++ b/pkg/workflow/purge_test.go @@ -0,0 +1,48 @@ +/* +Copyright 2026 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "testing" + + "github.com/dapr/kit/ptr" + "github.com/stretchr/testify/assert" +) + +func TestBuildPurgeFilter(t *testing.T) { + t.Run("nil status uses terminal filter", func(t *testing.T) { + filter := BuildPurgeFilter(nil) + assert.True(t, filter.Terminal) + assert.Nil(t, filter.Status) + }) + + t.Run("with status uses status filter instead of terminal", func(t *testing.T) { + filter := BuildPurgeFilter(ptr.Of("FAILED")) + assert.False(t, filter.Terminal) + assert.NotNil(t, filter.Status) + assert.Equal(t, "FAILED", *filter.Status) + }) + + t.Run("with COMPLETED status", func(t *testing.T) { + filter := BuildPurgeFilter(ptr.Of("COMPLETED")) + assert.False(t, filter.Terminal) + assert.Equal(t, "COMPLETED", *filter.Status) + }) + + t.Run("with RUNNING status", func(t *testing.T) { + filter := BuildPurgeFilter(ptr.Of("RUNNING")) + assert.False(t, filter.Terminal) + assert.Equal(t, "RUNNING", *filter.Status) + }) +} diff --git a/tests/e2e/standalone/workflow_test.go b/tests/e2e/standalone/workflow_test.go index a4698e7d1..5a308be76 100644 --- a/tests/e2e/standalone/workflow_test.go +++ b/tests/e2e/standalone/workflow_test.go @@ -319,6 +319,151 @@ func TestWorkflowPurge(t *testing.T) { assert.NotContains(t, output, "purge-older") }) + t.Run("purge older than with filter status only purges matching status", func(t *testing.T) { + // Create one workflow that will complete (SimpleWorkflow) and one that + // will be terminated (LongWorkflow) so they have different statuses. + output, err := cmdWorkflowRun(appID, "SimpleWorkflow", + "--instance-id=filter-completed") + require.NoError(t, err, output) + + output, err = cmdWorkflowRun(appID, "LongWorkflow", + "--instance-id=filter-terminated") + require.NoError(t, err, output) + + // Wait for SimpleWorkflow to complete. + require.EventuallyWithT(t, func(c *assert.CollectT) { + out, err := cmdWorkflowList(appID, redisConnString, "-o", "json") + require.NoError(c, err) + var list []map[string]interface{} + require.NoError(c, json.Unmarshal([]byte(out), &list)) + for _, item := range list { + if item["instanceID"] == "filter-completed" { + assert.Equal(c, "COMPLETED", item["runtimeStatus"]) + return + } + } + assert.Fail(c, "filter-completed not found") + }, 30*time.Second, 500*time.Millisecond) + + // Terminate one so we have two different terminal statuses. + _, err = cmdWorkflowTerminate(appID, "filter-terminated") + require.NoError(t, err) + + // Wait for the terminate to take effect. + require.EventuallyWithT(t, func(c *assert.CollectT) { + out, err := cmdWorkflowList(appID, redisConnString, "-o", "json") + require.NoError(c, err) + var list []map[string]interface{} + require.NoError(c, json.Unmarshal([]byte(out), &list)) + for _, item := range list { + if item["instanceID"] == "filter-terminated" { + assert.Equal(c, "TERMINATED", item["runtimeStatus"]) + return + } + } + assert.Fail(c, "filter-terminated not found") + }, 30*time.Second, 500*time.Millisecond) + + // Purge only COMPLETED instances older than 1s. + output, err = cmdWorkflowPurge(appID, redisConnString, + "--all-older-than", "1s", "--all-filter-status", "COMPLETED") + require.NoError(t, err, output) + assert.Contains(t, output, `Purged workflow instance "filter-completed"`) + assert.NotContains(t, output, "filter-terminated") + + // Verify filter-terminated still exists. + output, err = cmdWorkflowList(appID, "-o", "json", redisConnString) + require.NoError(t, err, output) + assert.NotContains(t, output, "filter-completed") + assert.Contains(t, output, "filter-terminated") + + // Clean up the remaining instance. + t.Cleanup(func() { + _, err := cmdWorkflowPurge(appID, redisConnString, "filter-terminated") + assert.NoError(t, err) + }) + }) + + t.Run("purge older than with filter status TERMINATED", func(t *testing.T) { + output, err := cmdWorkflowRun(appID, "SimpleWorkflow", + "--instance-id=fs-completed") + require.NoError(t, err, output) + + output, err = cmdWorkflowRun(appID, "LongWorkflow", + "--instance-id=fs-terminated") + require.NoError(t, err, output) + + // Wait for SimpleWorkflow to complete. + require.EventuallyWithT(t, func(c *assert.CollectT) { + out, err := cmdWorkflowList(appID, redisConnString, "-o", "json") + require.NoError(c, err) + var list []map[string]interface{} + require.NoError(c, json.Unmarshal([]byte(out), &list)) + for _, item := range list { + if item["instanceID"] == "fs-completed" { + assert.Equal(c, "COMPLETED", item["runtimeStatus"]) + return + } + } + assert.Fail(c, "fs-completed not found") + }, 30*time.Second, 500*time.Millisecond) + + _, err = cmdWorkflowTerminate(appID, "fs-terminated") + require.NoError(t, err) + + // Wait for the terminate to take effect. + require.EventuallyWithT(t, func(c *assert.CollectT) { + out, err := cmdWorkflowList(appID, redisConnString, "-o", "json") + require.NoError(c, err) + var list []map[string]interface{} + require.NoError(c, json.Unmarshal([]byte(out), &list)) + for _, item := range list { + if item["instanceID"] == "fs-terminated" { + assert.Equal(c, "TERMINATED", item["runtimeStatus"]) + return + } + } + assert.Fail(c, "fs-terminated not found") + }, 30*time.Second, 500*time.Millisecond) + + // Purge only TERMINATED instances older than 1s. + output, err = cmdWorkflowPurge(appID, redisConnString, + "--all-older-than", "1s", "--all-filter-status", "TERMINATED") + require.NoError(t, err, output) + assert.Contains(t, output, `Purged workflow instance "fs-terminated"`) + assert.NotContains(t, output, "fs-completed") + + // Verify fs-completed still exists. + output, err = cmdWorkflowList(appID, "-o", "json", redisConnString) + require.NoError(t, err, output) + assert.Contains(t, output, "fs-completed") + assert.NotContains(t, output, "fs-terminated") + + // Clean up. + t.Cleanup(func() { + _, err := cmdWorkflowPurge(appID, redisConnString, "fs-completed") + assert.NoError(t, err) + }) + }) + + t.Run("all-filter-status without all-older-than errors", func(t *testing.T) { + _, err := cmdWorkflowPurge(appID, redisConnString, + "--all-filter-status", "COMPLETED") + require.Error(t, err) + }) + + t.Run("all-filter-status with invalid value errors", func(t *testing.T) { + _, err := cmdWorkflowPurge(appID, redisConnString, + "--all-older-than", "1s", "--all-filter-status", "INVALID") + require.Error(t, err) + }) + + t.Run("all-filter-status with all flag errors", func(t *testing.T) { + _, err := cmdWorkflowPurge(appID, redisConnString, + "--all", "--all-filter-status", "COMPLETED") + require.Error(t, err) + }) + t.Run("also purge scheduler", func(t *testing.T) { output, err := cmdWorkflowRun(appID, "EventWorkflow", "--instance-id=also-sched")