Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions e2e/add_node_data_safety_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
//go:build e2e_test

package e2e

import (
"context"
"fmt"
"testing"
"time"

"github.com/jackc/pgx/v5"
controlplane "github.com/pgEdge/control-plane/api/apiv1/gen/control_plane"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestAddNodeOriginAdvanced verifies that after adding a node the replication
// origin on the new subscriber has been advanced past 0/0. A zeroed origin
// causes the apply worker to start from the beginning of WAL, producing
// duplicate-key errors or silently overwriting rows.
//
// Covers: Change 1 — EnsureReplicationOriginExists + AdvanceReplicationOrigin
// wired into ReplicationSlotAdvanceFromCTSResource.
// Ref: zodan.sql:2071-2073, 2183-2185; spock PR #397.
func TestAddNodeOriginAdvanced(t *testing.T) {
t.Parallel()

const (
username = "admin"
password = "password"
dbName = "origin_adv_db"
)

ctx, cancel := context.WithTimeout(t.Context(), 7*time.Minute)
defer cancel()

hostIDs := fixture.HostIDs()
db := fixture.NewDatabaseFixture(ctx, t, &controlplane.CreateDatabaseRequest{
Spec: &controlplane.DatabaseSpec{
DatabaseName: dbName,
Port: pointerTo(0),
PatroniPort: pointerTo(0),
DatabaseUsers: []*controlplane.DatabaseUserSpec{{
Username: username,
Password: pointerTo(password),
DbOwner: pointerTo(true),
Attributes: []string{"LOGIN", "SUPERUSER"},
}},
Nodes: []*controlplane.DatabaseNodeSpec{
{Name: "n1", HostIds: []controlplane.Identifier{controlplane.Identifier(hostIDs[0])}},
{Name: "n2", HostIds: []controlplane.Identifier{controlplane.Identifier(hostIDs[1])}},
},
},
})

// Write rows on n2 so its WAL position is meaningfully ahead of the slot's
// consistent_point. This gives the origin advancement a non-trivial LSN.
n2Opts := ConnectionOptions{
Matcher: And(WithNode("n2"), WithRole("primary")),
Username: username,
Password: password,
}
db.WithConnection(ctx, n2Opts, t, func(conn *pgx.Conn) {
_, err := conn.Exec(ctx, `CREATE TABLE origin_probe (id INT PRIMARY KEY, v TEXT)`)
require.NoError(t, err)

for i := 1; i <= 100; i++ {
_, err = conn.Exec(ctx, `INSERT INTO origin_probe VALUES ($1, $2)`, i, fmt.Sprintf("r%d", i))
require.NoError(t, err)
}
})

// Add n3 with n1 as source.
db.Spec.Nodes = append(db.Spec.Nodes, &controlplane.DatabaseNodeSpec{
Name: "n3",
HostIds: []controlplane.Identifier{controlplane.Identifier(hostIDs[2])},
SourceNode: pointerTo("n1"),
})
require.NoError(t, db.Update(ctx, UpdateOptions{Spec: db.Spec}))

// The replication slot spk_<db>_n2_sub_n2_n3 lives on n2.
// The origin with the same name lives on n3 (subscriber side).
slotName := e2eReplicationSlotName(dbName, "n2", "n3")

n3Opts := ConnectionOptions{
Matcher: And(WithNode("n3"), WithRole("primary")),
Username: username,
Password: password,
}
db.WithConnection(ctx, n3Opts, t, func(conn *pgx.Conn) {
// Query progress; COALESCE returns '0/0' when the origin is absent or
// has never been advanced, so a single assert covers both failure modes.
var lsn string
err := conn.QueryRow(ctx, `
SELECT COALESCE(
(SELECT pg_replication_origin_progress($1, false)::text
FROM pg_replication_origin WHERE roname = $1),
'0/0'
)`, slotName,
).Scan(&lsn)
require.NoError(t, err)

assert.NotEqual(t, "0/0", lsn,
"replication origin %q on n3 should be advanced past 0/0 (got %s); "+
"a zeroed origin risks the apply worker replaying historical WAL",
slotName, lsn)
})
}

// e2eReplicationSlotName mirrors postgres.ReplicationSlotName without
// importing the server package from the e2e test binary.
// Format: spk_<db>_<provider>_sub_<provider>_<subscriber>
func e2eReplicationSlotName(databaseName, providerNode, subscriberNode string) string {
return fmt.Sprintf("spk_%s_%s_sub_%s_%s",
databaseName, providerNode, providerNode, subscriberNode)
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@
}
],
[
{
"type": "create",
"resource_id": "database.peer_catchup::n1:n2:test",
"reason": "does_not_exist",
"diff": null
},
{
"type": "create",
"resource_id": "database.wait_for_sync_event::n2:n1:test",
Expand Down Expand Up @@ -143,6 +149,14 @@
"reason": "does_not_exist",
"diff": null
}
],
[
{
"type": "create",
"resource_id": "database.replication_origin_advance::n2:n3:test",
"reason": "does_not_exist",
"diff": null
}
]
],
[
Expand Down Expand Up @@ -211,7 +225,7 @@
[
{
"type": "delete",
"resource_id": "database.replication_slot_advance_from_cts::n2:n3:test",
"resource_id": "database.replication_origin_advance::n2:n3:test",
"diff": null
},
{
Expand All @@ -226,6 +240,13 @@
"resource_id": "database.dump_roles::n1",
"diff": null
},
{
"type": "delete",
"resource_id": "database.replication_slot_advance_from_cts::n2:n3:test",
"diff": null
}
],
[
{
"type": "delete",
"resource_id": "database.lag_tracker_commit_ts::n2:n3:test",
Expand All @@ -247,6 +268,11 @@
}
],
[
{
"type": "delete",
"resource_id": "database.peer_catchup::n1:n2:test",
"diff": null
},
{
"type": "delete",
"resource_id": "database.wait_for_sync_event::n2:n1:test",
Expand Down
15 changes: 15 additions & 0 deletions server/internal/database/operations/populate_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func PopulateNode(node *NodeResources, existingNodeNames []string) (*resource.St
peerWaitForSync = append(
peerWaitForSync,
database.WaitForSyncEventResourceIdentifier(peer, node.SourceNode, dbName),
database.PeerCatchupResourceIdentifier(node.SourceNode, peer, dbName),
)
}

Expand Down Expand Up @@ -125,6 +126,13 @@ func addPeerResources(
ProviderNode: peerNode,
SubscriberNode: sourceNode,
},
// Belt-and-suspenders: also wait using remote_lsn, which
// tracks actual commit application rather than WAL receipt.
&database.PeerCatchupResource{
DatabaseName: dbName,
SourceNode: sourceNode,
PeerNode: peerNode,
},
// After the new node has caught up to the source node, we advance the
// replication slots we created earlier.
&database.LagTrackerCommitTimestampResource{
Expand All @@ -144,6 +152,13 @@ func addPeerResources(
ProviderNode: peerNode,
SubscriberNode: newNode,
},
// Origin advance runs on the subscriber's host; must be separate from
// slot advance which runs on the provider's host.
&database.ReplicationOriginAdvanceResource{
DatabaseName: dbName,
ProviderNode: peerNode,
SubscriberNode: newNode,
},
)
}

Expand Down
44 changes: 44 additions & 0 deletions server/internal/database/operations/populate_nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ func TestPopulateNode(t *testing.T) {
ProviderNode: "n2",
SubscriberNode: "n1",
},
&database.PeerCatchupResource{
DatabaseName: "test",
SourceNode: "n1",
PeerNode: "n2",
},
&database.ReplicationSlotResource{
DatabaseName: "test",
ProviderNode: "n1",
Expand All @@ -127,6 +132,7 @@ func TestPopulateNode(t *testing.T) {
SyncData: true,
ExtraDependencies: []resource.Identifier{
database.WaitForSyncEventResourceIdentifier("n2", "n1", "test"),
database.PeerCatchupResourceIdentifier("n1", "n2", "test"),
},
},
&database.SyncEventResource{
Expand All @@ -152,6 +158,11 @@ func TestPopulateNode(t *testing.T) {
ProviderNode: "n2",
SubscriberNode: "n3",
},
&database.ReplicationOriginAdvanceResource{
DatabaseName: "test",
ProviderNode: "n2",
SubscriberNode: "n3",
},
},
nil,
),
Expand Down Expand Up @@ -304,6 +315,11 @@ func TestPopulateNodes(t *testing.T) {
ProviderNode: "n2",
SubscriberNode: "n1",
},
&database.PeerCatchupResource{
DatabaseName: "test",
SourceNode: "n1",
PeerNode: "n2",
},
&database.ReplicationSlotResource{
DatabaseName: "test",
ProviderNode: "n1",
Expand All @@ -317,6 +333,7 @@ func TestPopulateNodes(t *testing.T) {
SyncData: true,
ExtraDependencies: []resource.Identifier{
database.WaitForSyncEventResourceIdentifier("n2", "n1", "test"),
database.PeerCatchupResourceIdentifier("n1", "n2", "test"),
},
},
&database.SyncEventResource{
Expand All @@ -342,6 +359,11 @@ func TestPopulateNodes(t *testing.T) {
ProviderNode: "n2",
SubscriberNode: "n3",
},
&database.ReplicationOriginAdvanceResource{
DatabaseName: "test",
ProviderNode: "n2",
SubscriberNode: "n3",
},
},
nil,
),
Expand Down Expand Up @@ -446,11 +468,21 @@ func TestPopulateNodes(t *testing.T) {
ProviderNode: "n2",
SubscriberNode: "n1",
},
&database.PeerCatchupResource{
DatabaseName: "test",
SourceNode: "n1",
PeerNode: "n2",
},
&database.WaitForSyncEventResource{
DatabaseName: "test",
ProviderNode: "n3",
SubscriberNode: "n1",
},
&database.PeerCatchupResource{
DatabaseName: "test",
SourceNode: "n1",
PeerNode: "n3",
},
&database.ReplicationSlotResource{
DatabaseName: "test",
ProviderNode: "n1",
Expand All @@ -464,7 +496,9 @@ func TestPopulateNodes(t *testing.T) {
SyncData: true,
ExtraDependencies: []resource.Identifier{
database.WaitForSyncEventResourceIdentifier("n2", "n1", "test"),
database.PeerCatchupResourceIdentifier("n1", "n2", "test"),
database.WaitForSyncEventResourceIdentifier("n3", "n1", "test"),
database.PeerCatchupResourceIdentifier("n1", "n3", "test"),
},
},
&database.SyncEventResource{
Expand Down Expand Up @@ -498,11 +532,21 @@ func TestPopulateNodes(t *testing.T) {
ProviderNode: "n2",
SubscriberNode: "n4",
},
&database.ReplicationOriginAdvanceResource{
DatabaseName: "test",
ProviderNode: "n2",
SubscriberNode: "n4",
},
&database.ReplicationSlotAdvanceFromCTSResource{
DatabaseName: "test",
ProviderNode: "n3",
SubscriberNode: "n4",
},
&database.ReplicationOriginAdvanceResource{
DatabaseName: "test",
ProviderNode: "n3",
SubscriberNode: "n4",
},
},
nil,
),
Expand Down
Loading