diff --git a/e2e/add_node_data_safety_test.go b/e2e/add_node_data_safety_test.go new file mode 100644 index 00000000..df56b4ba --- /dev/null +++ b/e2e/add_node_data_safety_test.go @@ -0,0 +1,115 @@ +//go:build e2e_test + +package e2e + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/jackc/pgx/v5" + controlplane "github.com/pgEdge/control-plane/api/apiv1/gen/control_plane" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestAddNodeOriginAdvanced verifies that after adding a node the replication +// origin on the new subscriber has been advanced past 0/0. A zeroed origin +// causes the apply worker to start from the beginning of WAL, producing +// duplicate-key errors or silently overwriting rows. +// +// Covers: Change 1 — EnsureReplicationOriginExists + AdvanceReplicationOrigin +// wired into ReplicationSlotAdvanceFromCTSResource. +func TestAddNodeOriginAdvanced(t *testing.T) { + t.Parallel() + + const ( + username = "admin" + password = "password" + dbName = "origin_adv_db" + ) + + ctx, cancel := context.WithTimeout(t.Context(), 7*time.Minute) + defer cancel() + + hostIDs := fixture.HostIDs() + db := fixture.NewDatabaseFixture(ctx, t, &controlplane.CreateDatabaseRequest{ + Spec: &controlplane.DatabaseSpec{ + DatabaseName: dbName, + Port: pointerTo(0), + PatroniPort: pointerTo(0), + DatabaseUsers: []*controlplane.DatabaseUserSpec{{ + Username: username, + Password: pointerTo(password), + DbOwner: pointerTo(true), + Attributes: []string{"LOGIN", "SUPERUSER"}, + }}, + Nodes: []*controlplane.DatabaseNodeSpec{ + {Name: "n1", HostIds: []controlplane.Identifier{controlplane.Identifier(hostIDs[0])}}, + {Name: "n2", HostIds: []controlplane.Identifier{controlplane.Identifier(hostIDs[1])}}, + }, + }, + }) + + // Write rows on n2 so its WAL position is meaningfully ahead of the slot's + // consistent_point. This gives the origin advancement a non-trivial LSN. + n2Opts := ConnectionOptions{ + Matcher: And(WithNode("n2"), WithRole("primary")), + Username: username, + Password: password, + } + db.WithConnection(ctx, n2Opts, t, func(conn *pgx.Conn) { + _, err := conn.Exec(ctx, `CREATE TABLE origin_probe (id INT PRIMARY KEY, v TEXT)`) + require.NoError(t, err) + + for i := 1; i <= 100; i++ { + _, err = conn.Exec(ctx, `INSERT INTO origin_probe VALUES ($1, $2)`, i, fmt.Sprintf("r%d", i)) + require.NoError(t, err) + } + }) + + // Add n3 with n1 as source. + db.Spec.Nodes = append(db.Spec.Nodes, &controlplane.DatabaseNodeSpec{ + Name: "n3", + HostIds: []controlplane.Identifier{controlplane.Identifier(hostIDs[2])}, + SourceNode: pointerTo("n1"), + }) + require.NoError(t, db.Update(ctx, UpdateOptions{Spec: db.Spec})) + + // The replication slot spk__n2_sub_n2_n3 lives on n2. + // The origin with the same name lives on n3 (subscriber side). + slotName := e2eReplicationSlotName(dbName, "n2", "n3") + + n3Opts := ConnectionOptions{ + Matcher: And(WithNode("n3"), WithRole("primary")), + Username: username, + Password: password, + } + db.WithConnection(ctx, n3Opts, t, func(conn *pgx.Conn) { + // Query progress; COALESCE returns '0/0' when the origin is absent or + // has never been advanced, so a single assert covers both failure modes. + var lsn string + err := conn.QueryRow(ctx, ` + SELECT COALESCE( + (SELECT pg_replication_origin_progress($1, false)::text + FROM pg_replication_origin WHERE roname = $1), + '0/0' + )`, slotName, + ).Scan(&lsn) + require.NoError(t, err) + + assert.NotEqual(t, "0/0", lsn, + "replication origin %q on n3 should be advanced past 0/0 (got %s); "+ + "a zeroed origin risks the apply worker replaying historical WAL", + slotName, lsn) + }) +} + +// e2eReplicationSlotName mirrors postgres.ReplicationSlotName without +// importing the server package from the e2e test binary. +// Format: spk___sub__ +func e2eReplicationSlotName(databaseName, providerNode, subscriberNode string) string { + return fmt.Sprintf("spk_%s_%s_sub_%s_%s", + databaseName, providerNode, providerNode, subscriberNode) +} diff --git a/e2e/minor_version_upgrade_test.go b/e2e/minor_version_upgrade_test.go index 74360fac..dfbd8e57 100644 --- a/e2e/minor_version_upgrade_test.go +++ b/e2e/minor_version_upgrade_test.go @@ -25,8 +25,8 @@ func TestMinorVersionUpgrade(t *testing.T) { username := "admin" password := "password" - fromVersion := "18.2" - toVersion := "18.3" + fromVersion := "18.3" + toVersion := "18.4" ctx, cancel := context.WithTimeout(t.Context(), 5*time.Minute) defer cancel() diff --git a/server/internal/database/operations/golden_test/TestUpdateDatabase/two_nodes_to_three_nodes_with_populate.json b/server/internal/database/operations/golden_test/TestUpdateDatabase/two_nodes_to_three_nodes_with_populate.json index f51bec04..869f0e4a 100644 --- a/server/internal/database/operations/golden_test/TestUpdateDatabase/two_nodes_to_three_nodes_with_populate.json +++ b/server/internal/database/operations/golden_test/TestUpdateDatabase/two_nodes_to_three_nodes_with_populate.json @@ -97,6 +97,12 @@ } ], [ + { + "type": "create", + "resource_id": "database.peer_catchup::n1:n2:test", + "reason": "does_not_exist", + "diff": null + }, { "type": "create", "resource_id": "database.wait_for_sync_event::n2:n1:test", @@ -143,6 +149,14 @@ "reason": "does_not_exist", "diff": null } + ], + [ + { + "type": "create", + "resource_id": "database.replication_origin_advance::n2:n3:test", + "reason": "does_not_exist", + "diff": null + } ] ], [ @@ -211,7 +225,7 @@ [ { "type": "delete", - "resource_id": "database.replication_slot_advance_from_cts::n2:n3:test", + "resource_id": "database.replication_origin_advance::n2:n3:test", "diff": null }, { @@ -226,6 +240,13 @@ "resource_id": "database.dump_roles::n1", "diff": null }, + { + "type": "delete", + "resource_id": "database.replication_slot_advance_from_cts::n2:n3:test", + "diff": null + } + ], + [ { "type": "delete", "resource_id": "database.lag_tracker_commit_ts::n2:n3:test", @@ -247,6 +268,11 @@ } ], [ + { + "type": "delete", + "resource_id": "database.peer_catchup::n1:n2:test", + "diff": null + }, { "type": "delete", "resource_id": "database.wait_for_sync_event::n2:n1:test", diff --git a/server/internal/database/operations/populate_nodes.go b/server/internal/database/operations/populate_nodes.go index dba4f867..9aa84eef 100644 --- a/server/internal/database/operations/populate_nodes.go +++ b/server/internal/database/operations/populate_nodes.go @@ -38,6 +38,7 @@ func PopulateNode(node *NodeResources, existingNodeNames []string) (*resource.St peerWaitForSync = append( peerWaitForSync, database.WaitForSyncEventResourceIdentifier(peer, node.SourceNode, dbName), + database.PeerCatchupResourceIdentifier(node.SourceNode, peer, dbName), ) } @@ -125,6 +126,13 @@ func addPeerResources( ProviderNode: peerNode, SubscriberNode: sourceNode, }, + // Belt-and-suspenders: also wait using remote_lsn, which + // tracks actual commit application rather than WAL receipt. + &database.PeerCatchupResource{ + DatabaseName: dbName, + SourceNode: sourceNode, + PeerNode: peerNode, + }, // After the new node has caught up to the source node, we advance the // replication slots we created earlier. &database.LagTrackerCommitTimestampResource{ @@ -144,6 +152,13 @@ func addPeerResources( ProviderNode: peerNode, SubscriberNode: newNode, }, + // Origin advance runs on the subscriber's host; must be separate from + // slot advance which runs on the provider's host. + &database.ReplicationOriginAdvanceResource{ + DatabaseName: dbName, + ProviderNode: peerNode, + SubscriberNode: newNode, + }, ) } diff --git a/server/internal/database/operations/populate_nodes_test.go b/server/internal/database/operations/populate_nodes_test.go index 6230a24f..7e41ee9d 100644 --- a/server/internal/database/operations/populate_nodes_test.go +++ b/server/internal/database/operations/populate_nodes_test.go @@ -114,6 +114,11 @@ func TestPopulateNode(t *testing.T) { ProviderNode: "n2", SubscriberNode: "n1", }, + &database.PeerCatchupResource{ + DatabaseName: "test", + SourceNode: "n1", + PeerNode: "n2", + }, &database.ReplicationSlotResource{ DatabaseName: "test", ProviderNode: "n1", @@ -127,6 +132,7 @@ func TestPopulateNode(t *testing.T) { SyncData: true, ExtraDependencies: []resource.Identifier{ database.WaitForSyncEventResourceIdentifier("n2", "n1", "test"), + database.PeerCatchupResourceIdentifier("n1", "n2", "test"), }, }, &database.SyncEventResource{ @@ -152,6 +158,11 @@ func TestPopulateNode(t *testing.T) { ProviderNode: "n2", SubscriberNode: "n3", }, + &database.ReplicationOriginAdvanceResource{ + DatabaseName: "test", + ProviderNode: "n2", + SubscriberNode: "n3", + }, }, nil, ), @@ -304,6 +315,11 @@ func TestPopulateNodes(t *testing.T) { ProviderNode: "n2", SubscriberNode: "n1", }, + &database.PeerCatchupResource{ + DatabaseName: "test", + SourceNode: "n1", + PeerNode: "n2", + }, &database.ReplicationSlotResource{ DatabaseName: "test", ProviderNode: "n1", @@ -317,6 +333,7 @@ func TestPopulateNodes(t *testing.T) { SyncData: true, ExtraDependencies: []resource.Identifier{ database.WaitForSyncEventResourceIdentifier("n2", "n1", "test"), + database.PeerCatchupResourceIdentifier("n1", "n2", "test"), }, }, &database.SyncEventResource{ @@ -342,6 +359,11 @@ func TestPopulateNodes(t *testing.T) { ProviderNode: "n2", SubscriberNode: "n3", }, + &database.ReplicationOriginAdvanceResource{ + DatabaseName: "test", + ProviderNode: "n2", + SubscriberNode: "n3", + }, }, nil, ), @@ -446,11 +468,21 @@ func TestPopulateNodes(t *testing.T) { ProviderNode: "n2", SubscriberNode: "n1", }, + &database.PeerCatchupResource{ + DatabaseName: "test", + SourceNode: "n1", + PeerNode: "n2", + }, &database.WaitForSyncEventResource{ DatabaseName: "test", ProviderNode: "n3", SubscriberNode: "n1", }, + &database.PeerCatchupResource{ + DatabaseName: "test", + SourceNode: "n1", + PeerNode: "n3", + }, &database.ReplicationSlotResource{ DatabaseName: "test", ProviderNode: "n1", @@ -464,7 +496,9 @@ func TestPopulateNodes(t *testing.T) { SyncData: true, ExtraDependencies: []resource.Identifier{ database.WaitForSyncEventResourceIdentifier("n2", "n1", "test"), + database.PeerCatchupResourceIdentifier("n1", "n2", "test"), database.WaitForSyncEventResourceIdentifier("n3", "n1", "test"), + database.PeerCatchupResourceIdentifier("n1", "n3", "test"), }, }, &database.SyncEventResource{ @@ -498,11 +532,21 @@ func TestPopulateNodes(t *testing.T) { ProviderNode: "n2", SubscriberNode: "n4", }, + &database.ReplicationOriginAdvanceResource{ + DatabaseName: "test", + ProviderNode: "n2", + SubscriberNode: "n4", + }, &database.ReplicationSlotAdvanceFromCTSResource{ DatabaseName: "test", ProviderNode: "n3", SubscriberNode: "n4", }, + &database.ReplicationOriginAdvanceResource{ + DatabaseName: "test", + ProviderNode: "n3", + SubscriberNode: "n4", + }, }, nil, ), diff --git a/server/internal/database/peer_catchup_resource.go b/server/internal/database/peer_catchup_resource.go new file mode 100644 index 00000000..597b0e7f --- /dev/null +++ b/server/internal/database/peer_catchup_resource.go @@ -0,0 +1,110 @@ +package database + +import ( + "context" + "fmt" + "time" + + "github.com/pgEdge/control-plane/server/internal/postgres" + "github.com/pgEdge/control-plane/server/internal/resource" +) + +var _ resource.Resource = (*PeerCatchupResource)(nil) + +const ResourceTypePeerCatchup resource.Type = "database.peer_catchup" + +func PeerCatchupResourceIdentifier(sourceNode, peerNode, databaseName string) resource.Identifier { + return resource.Identifier{ + Type: ResourceTypePeerCatchup, + ID: fmt.Sprintf("%s:%s:%s", sourceNode, peerNode, databaseName), + } +} + +// PeerCatchupResource waits until the source node's apply progress from the +// peer node has reached the peer's sync event LSN. This ensures the COPY +// snapshot (Phase 5 source→new subscription) includes all peer writes up to +// the slot creation point, preventing data loss on add-node. +// +// Uses spock.progress.remote_lsn (apply progress at last committed +// transaction) rather than received_lsn, which can advance on keepalive +// messages before commits have been applied. +type PeerCatchupResource struct { + DatabaseName string `json:"database_name"` + SourceNode string `json:"source_node"` // node where we check progress + PeerNode string `json:"peer_node"` // peer whose commits must be applied +} + +func (r *PeerCatchupResource) ResourceVersion() string { return "1" } +func (r *PeerCatchupResource) DiffIgnore() []string { return nil } + +func (r *PeerCatchupResource) Executor() resource.Executor { + return resource.PrimaryExecutor(r.SourceNode) +} + +func (r *PeerCatchupResource) Identifier() resource.Identifier { + return PeerCatchupResourceIdentifier(r.SourceNode, r.PeerNode, r.DatabaseName) +} + +func (r *PeerCatchupResource) Dependencies() []resource.Identifier { + return []resource.Identifier{ + SyncEventResourceIdentifier(r.PeerNode, r.SourceNode, r.DatabaseName), + } +} + +func (r *PeerCatchupResource) TypeDependencies() []resource.Type { + return nil +} + +func (r *PeerCatchupResource) Refresh(ctx context.Context, rc *resource.Context) error { + syncEvent, err := resource.FromContext[*SyncEventResource]( + rc, + SyncEventResourceIdentifier(r.PeerNode, r.SourceNode, r.DatabaseName), + ) + if err != nil { + return fmt.Errorf("failed to get sync event for peer %q: %w", r.PeerNode, err) + } + if syncEvent.SyncEventLsn == "" { + return resource.ErrNotFound + } + + source, err := GetPrimaryInstance(ctx, rc, r.SourceNode) + if err != nil { + return fmt.Errorf("failed to get source instance for node %q: %w", r.SourceNode, err) + } + conn, err := source.Connection(ctx, rc, r.DatabaseName) + if err != nil { + return fmt.Errorf("failed to connect to source node %q: %w", r.SourceNode, err) + } + defer conn.Close(ctx) + + const pollInterval = 500 * time.Millisecond + + for { + if ctx.Err() != nil { + return ctx.Err() + } + + reached, err := postgres.SpockProgressReachedLSN(r.PeerNode, syncEvent.SyncEventLsn). + Scalar(ctx, conn) + if err != nil { + return fmt.Errorf("failed to query spock progress for peer %q: %w", r.PeerNode, err) + } + if reached { + return nil + } + + time.Sleep(pollInterval) + } +} + +func (r *PeerCatchupResource) Create(ctx context.Context, rc *resource.Context) error { + return r.Refresh(ctx, rc) +} + +func (r *PeerCatchupResource) Update(ctx context.Context, rc *resource.Context) error { + return nil +} + +func (r *PeerCatchupResource) Delete(ctx context.Context, rc *resource.Context) error { + return nil +} diff --git a/server/internal/database/replication_origin_advance_resource.go b/server/internal/database/replication_origin_advance_resource.go new file mode 100644 index 00000000..9b688c23 --- /dev/null +++ b/server/internal/database/replication_origin_advance_resource.go @@ -0,0 +1,99 @@ +package database + +import ( + "context" + "fmt" + + "github.com/pgEdge/control-plane/server/internal/postgres" + "github.com/pgEdge/control-plane/server/internal/resource" +) + +var _ resource.Resource = (*ReplicationOriginAdvanceResource)(nil) + +const ResourceTypeReplicationOriginAdvance resource.Type = "database.replication_origin_advance" + +func ReplicationOriginAdvanceResourceIdentifier(providerNode, subscriberNode, databaseName string) resource.Identifier { + return resource.Identifier{ + Type: ResourceTypeReplicationOriginAdvance, + ID: fmt.Sprintf("%s:%s:%s", providerNode, subscriberNode, databaseName), + } +} + +// ReplicationOriginAdvanceResource advances the replication origin on the +// subscriber to the LSN that ReplicationSlotAdvanceFromCTSResource recorded +// after advancing the provider-side slot. Both must be updated together to +// prevent the apply worker from replaying historical WAL from 0/0. +// +// Runs on the subscriber's host (cross-host connections are not allowed, so +// this must be separate from ReplicationSlotAdvanceFromCTSResource which runs +// on the provider's host). +type ReplicationOriginAdvanceResource struct { + DatabaseName string `json:"database_name"` + ProviderNode string `json:"provider_node"` + SubscriberNode string `json:"subscriber_node"` +} + +func (r *ReplicationOriginAdvanceResource) ResourceVersion() string { return "1" } +func (r *ReplicationOriginAdvanceResource) DiffIgnore() []string { return nil } + +func (r *ReplicationOriginAdvanceResource) Executor() resource.Executor { + return resource.PrimaryExecutor(r.SubscriberNode) +} + +func (r *ReplicationOriginAdvanceResource) Identifier() resource.Identifier { + return ReplicationOriginAdvanceResourceIdentifier(r.ProviderNode, r.SubscriberNode, r.DatabaseName) +} + +func (r *ReplicationOriginAdvanceResource) Dependencies() []resource.Identifier { + return []resource.Identifier{ + ReplicationSlotAdvanceFromCTSResourceIdentifier(r.ProviderNode, r.SubscriberNode, r.DatabaseName), + } +} + +func (r *ReplicationOriginAdvanceResource) TypeDependencies() []resource.Type { return nil } + +func (r *ReplicationOriginAdvanceResource) Refresh(ctx context.Context, rc *resource.Context) error { + return nil +} + +func (r *ReplicationOriginAdvanceResource) Create(ctx context.Context, rc *resource.Context) error { + slotAdvance, err := resource.FromContext[*ReplicationSlotAdvanceFromCTSResource]( + rc, + ReplicationSlotAdvanceFromCTSResourceIdentifier(r.ProviderNode, r.SubscriberNode, r.DatabaseName), + ) + if err != nil { + return fmt.Errorf("failed to get slot advance resource: %w", err) + } + if slotAdvance.AdvancedToLSN == "" { + // Slot advance was skipped (slot active or no commit timestamp) — nothing to do. + return nil + } + + subscriber, err := GetPrimaryInstance(ctx, rc, r.SubscriberNode) + if err != nil { + return fmt.Errorf("failed to get subscriber instance for node %q: %w", r.SubscriberNode, err) + } + conn, err := subscriber.Connection(ctx, rc, r.DatabaseName) + if err != nil { + return fmt.Errorf("failed to connect to subscriber %q: %w", r.SubscriberNode, err) + } + defer conn.Close(ctx) + + slotName := postgres.ReplicationSlotName(r.DatabaseName, r.ProviderNode, r.SubscriberNode) + + if err := postgres.EnsureReplicationOriginExists(slotName).Exec(ctx, conn); err != nil { + return fmt.Errorf("failed to ensure replication origin on subscriber %q: %w", r.SubscriberNode, err) + } + if err := postgres.AdvanceReplicationOrigin(slotName, slotAdvance.AdvancedToLSN).Exec(ctx, conn); err != nil { + return fmt.Errorf("failed to advance replication origin on subscriber %q: %w", r.SubscriberNode, err) + } + return nil +} + +func (r *ReplicationOriginAdvanceResource) Update(ctx context.Context, rc *resource.Context) error { + return r.Create(ctx, rc) +} + +func (r *ReplicationOriginAdvanceResource) Delete(ctx context.Context, rc *resource.Context) error { + return nil +} diff --git a/server/internal/database/replication_slot_advance_from_cts_resource.go b/server/internal/database/replication_slot_advance_from_cts_resource.go index 3b0cc238..4b3fb710 100644 --- a/server/internal/database/replication_slot_advance_from_cts_resource.go +++ b/server/internal/database/replication_slot_advance_from_cts_resource.go @@ -25,16 +25,22 @@ func ReplicationSlotAdvanceFromCTSResourceIdentifier(providerNode, subscriberNod // ReplicationSlotAdvanceFromCTSResource advances the replication slot on the provider // to the LSN derived from the commit timestamp captured in lag_tracker. +// AdvancedToLSN is written as output after a successful advance so that +// ReplicationOriginAdvanceResource (running on the subscriber) can read it. type ReplicationSlotAdvanceFromCTSResource struct { DatabaseName string `json:"database_name"` ProviderNode string `json:"provider_node"` // slot lives here SubscriberNode string `json:"subscriber_node"` // target/receiver node + + // Output: LSN the slot was advanced to (empty if advance was skipped). + AdvancedToLSN string `json:"advanced_to_lsn,omitempty"` } func (r *ReplicationSlotAdvanceFromCTSResource) ResourceVersion() string { return "1" } -// No diff-ignore fields needed; this always executes idempotently when asked. -func (r *ReplicationSlotAdvanceFromCTSResource) DiffIgnore() []string { return nil } +func (r *ReplicationSlotAdvanceFromCTSResource) DiffIgnore() []string { + return []string{"advanced_to_lsn"} +} // Execute on the provider node (the slot exists there). func (r *ReplicationSlotAdvanceFromCTSResource) Executor() resource.Executor { @@ -62,6 +68,8 @@ func (r *ReplicationSlotAdvanceFromCTSResource) Refresh(ctx context.Context, rc } func (r *ReplicationSlotAdvanceFromCTSResource) Create(ctx context.Context, rc *resource.Context) error { + r.AdvancedToLSN = "" + // Fetch commit timestamp from lag tracker resource lagTracker, err := resource.FromContext[*LagTrackerCommitTimestampResource]( rc, @@ -142,6 +150,9 @@ func (r *ReplicationSlotAdvanceFromCTSResource) Create(ctx context.Context, rc * return fmt.Errorf("failed to advance replication slot: %w", err) } + // Record the LSN so ReplicationOriginAdvanceResource (running on the + // subscriber's host) can advance the origin to the same position. + r.AdvancedToLSN = targetLSN return nil } diff --git a/server/internal/database/resources.go b/server/internal/database/resources.go index 13f4665a..998b1190 100644 --- a/server/internal/database/resources.go +++ b/server/internal/database/resources.go @@ -12,6 +12,8 @@ func RegisterResourceTypes(registry *resource.Registry) { resource.RegisterResourceType[*ReplicationSlotResource](registry, ResourceTypeReplicationSlot) resource.RegisterResourceType[*LagTrackerCommitTimestampResource](registry, ResourceTypeLagTrackerCommitTS) resource.RegisterResourceType[*ReplicationSlotAdvanceFromCTSResource](registry, ResourceTypeReplicationSlotAdvanceFromCTS) + resource.RegisterResourceType[*ReplicationOriginAdvanceResource](registry, ResourceTypeReplicationOriginAdvance) + resource.RegisterResourceType[*PeerCatchupResource](registry, ResourceTypePeerCatchup) resource.RegisterResourceType[*SwitchoverResource](registry, ResourceTypeSwitchover) resource.RegisterResourceType[*PostgresDatabaseResource](registry, ResourceTypePostgresDatabase) resource.RegisterResourceType[*DumpRolesResource](registry, ResourceTypeDumpRoles) diff --git a/server/internal/database/wait_for_sync_event_resource.go b/server/internal/database/wait_for_sync_event_resource.go index 3189658e..45e97098 100644 --- a/server/internal/database/wait_for_sync_event_resource.go +++ b/server/internal/database/wait_for_sync_event_resource.go @@ -85,10 +85,11 @@ func (r *WaitForSyncEventResource) Refresh(ctx context.Context, rc *resource.Con return ctx.Err() } - // Check subscription health first — fail early if broken. - // Only statuses where the spock worker is running can make - // progress. The others ("disabled", "down") mean sync will - // never complete. + // Check subscription health. "disabled" and "down" are transient + // during add-node: the apply worker may not have started yet. + // Keep polling until the context deadline rather than failing fast. + // Only statuses where the worker is confirmed broken warrant an + // immediate error. status, err := postgres.GetSubscriptionStatus(r.ProviderNode, r.SubscriberNode). Scalar(ctx, subscriberConn) if err != nil { @@ -100,6 +101,14 @@ func (r *WaitForSyncEventResource) Refresh(ctx context.Context, rc *resource.Con switch status { case postgres.SubStatusInitializing, postgres.SubStatusReplicating, postgres.SubStatusUnknown: // Worker is running — continue waiting + case postgres.SubStatusDisabled, postgres.SubStatusDown: + // Worker not yet started; transient — keep polling + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(pollInterval): + } + continue default: return fmt.Errorf("subscription has unhealthy status %q: provider=%s subscriber=%s", status, r.ProviderNode, r.SubscriberNode) diff --git a/server/internal/orchestrator/swarm/images.go b/server/internal/orchestrator/swarm/images.go index a474c252..9fcf20a5 100644 --- a/server/internal/orchestrator/swarm/images.go +++ b/server/internal/orchestrator/swarm/images.go @@ -34,9 +34,11 @@ func NewVersions(cfg config.Config) *Versions { versions.addImage(ds.MustPgEdgeVersion("16.12", "5"), &Images{ PgEdgeImage: imageTag(cfg, "16.12-spock5.0.5-standard-1"), }) - versions.addImage(ds.MustPgEdgeVersion("16.13", "5"), &Images{ - PgEdgeImage: imageTag(cfg, "16.13-spock5.0.7-standard-1"), + PgEdgeImage: imageTag(cfg, "16.13-spock5.0.6-standard-2"), + }) + versions.addImage(ds.MustPgEdgeVersion("16.14", "5"), &Images{ + PgEdgeImage: imageTag(cfg, "16.14-spock5.0.8-standard-1"), }) // pg17 @@ -50,7 +52,10 @@ func NewVersions(cfg config.Config) *Versions { PgEdgeImage: imageTag(cfg, "17.8-spock5.0.5-standard-1"), }) versions.addImage(ds.MustPgEdgeVersion("17.9", "5"), &Images{ - PgEdgeImage: imageTag(cfg, "17.9-spock5.0.7-standard-1"), + PgEdgeImage: imageTag(cfg, "17.9-spock5.0.6-standard-2"), + }) + versions.addImage(ds.MustPgEdgeVersion("17.10", "5"), &Images{ + PgEdgeImage: imageTag(cfg, "17.10-spock5.0.8-standard-1"), }) // pg18 @@ -64,10 +69,13 @@ func NewVersions(cfg config.Config) *Versions { PgEdgeImage: imageTag(cfg, "18.2-spock5.0.5-standard-1"), }) versions.addImage(ds.MustPgEdgeVersion("18.3", "5"), &Images{ - PgEdgeImage: imageTag(cfg, "18.3-spock5.0.7-standard-1"), + PgEdgeImage: imageTag(cfg, "18.3-spock5.0.6-standard-2"), + }) + versions.addImage(ds.MustPgEdgeVersion("18.4", "5"), &Images{ + PgEdgeImage: imageTag(cfg, "18.4-spock5.0.8-standard-1"), }) - versions.defaultVersion = ds.MustPgEdgeVersion("18.3", "5") + versions.defaultVersion = ds.MustPgEdgeVersion("18.4", "5") return versions } diff --git a/server/internal/postgres/create_db.go b/server/internal/postgres/create_db.go index 90d2e1d5..cd6e0107 100644 --- a/server/internal/postgres/create_db.go +++ b/server/internal/postgres/create_db.go @@ -402,6 +402,52 @@ func AdvanceReplicationSlotToLSN(databaseName, providerNode, subscriberNode stri } } +func EnsureReplicationOriginExists(slotName string) ConditionalStatement { + return ConditionalStatement{ + If: Query[bool]{ + SQL: "SELECT NOT EXISTS (SELECT 1 FROM pg_replication_origin WHERE roname = @slot_name);", + Args: pgx.NamedArgs{"slot_name": slotName}, + }, + Then: Statement{ + SQL: "SELECT pg_replication_origin_create(@slot_name);", + Args: pgx.NamedArgs{"slot_name": slotName}, + }, + } +} + +func AdvanceReplicationOrigin(slotName, lsn string) Statement { + return Statement{ + SQL: "SELECT pg_replication_origin_advance(@slot_name, @lsn::pg_lsn);", + Args: pgx.NamedArgs{ + "slot_name": slotName, + "lsn": lsn, + }, + } +} + +// SpockProgressReachedLSN reports whether the local node's apply progress +// from the named peer has reached targetLSN. Uses remote_lsn (the LSN of the +// last applied commit in Spock 5.x) rather than received_lsn, which can +// advance on keepalive messages before any commits have been applied. +func SpockProgressReachedLSN(peerNodeName, targetLSN string) Query[bool] { + return Query[bool]{ + SQL: ` + SELECT COALESCE( + (SELECT p.remote_lsn >= @target_lsn::pg_lsn + FROM spock.progress p + JOIN spock.node n ON n.node_id = p.remote_node_id + WHERE p.node_id = (SELECT node_id FROM spock.node_info()) + AND n.node_name = @peer_node_name), + false + ) + `, + Args: pgx.NamedArgs{ + "peer_node_name": peerNodeName, + "target_lsn": targetLSN, + }, + } +} + // GetSubscriptionStatus returns the current status of a specific subscription func GetSubscriptionStatus(providerNode, subscriberNode string) Query[string] { return Query[string]{