Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/v1alpha1/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
// ConditionTypeConfigurationInSync indicates that cluster configuration is in desired state.
ConditionTypeConfigurationInSync ConditionType = "ConfigurationInSync"
ConditionReasonConfigurationChanged ConditionReason = "ConfigurationChanged"
ConditionReasonConfigReloadFailed ConditionReason = "ConfigReloadFailed"
ConditionReasonConfigReloadPending ConditionReason = "ConfigReloadPending"

// ConditionTypeVersionInSync indicates that all replicas report the same version as the image.
ConditionTypeVersionInSync ConditionType = "VersionInSync"
Expand Down
43 changes: 35 additions & 8 deletions internal/controller/clickhouse/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,50 @@ func (cmd *commander) Close() {
cmd.conns = map[v1.ClickHouseReplicaID]clickhouse.Conn{}
}

func (cmd *commander) Version(ctx context.Context, id v1.ClickHouseReplicaID) (string, error) {
type replicaProbe struct {
Version string
ReloadConfigRevision string
}

// Probe reads the replica server version and the latest applied reload-safe config revision.
func (cmd *commander) Probe(ctx context.Context, id v1.ClickHouseReplicaID) (replicaProbe, error) {
conn, err := cmd.getConn(id)
if err != nil {
return "", fmt.Errorf("failed to get connection for replica %s: %w", id, err)
return replicaProbe{}, fmt.Errorf("failed to get connection for replica %s: %w", id, err)
}

var version string
if err := conn.QueryRow(ctx, "SELECT version()").Scan(&version); err != nil {
return "", fmt.Errorf("query version on replica %s: %w", id, err)
var probe replicaProbe

row := conn.QueryRow(ctx,
"SELECT version(),"+
" ifNull((SELECT collection[?] FROM system.named_collections WHERE name = ?), '')"+
" SETTINGS format_display_secrets_in_show_and_select=1",
OperatorConfigRevisionField, OperatorNamedCollectionName,
)
if err := row.Scan(&probe.Version, &probe.ReloadConfigRevision); err != nil {
return replicaProbe{}, fmt.Errorf("probe replica %s: %w", id, err)
}

version, err = controllerutil.ParseVersion(version)
probe.Version, err = controllerutil.ParseVersion(probe.Version)
if err != nil {
return "", fmt.Errorf("parse version from replica %s response: %w", id, err)
return replicaProbe{}, fmt.Errorf("parse version from replica %s response: %w", id, err)
}

return version, nil
return probe, nil
}

// ReloadConfig queries the replica to reload its configuration.
func (cmd *commander) ReloadConfig(ctx context.Context, id v1.ClickHouseReplicaID) error {
conn, err := cmd.getConn(id)
if err != nil {
return fmt.Errorf("failed to get connection for replica %s: %w", id, err)
}

if err := conn.Exec(ctx, "SYSTEM RELOAD CONFIG"); err != nil {
return fmt.Errorf("reload config on replica %s: %w", id, err)
}

return nil
}

func (cmd *commander) SyncDatabases(ctx context.Context, log controllerutil.Logger, replicas []v1.ClickHouseReplicaID) bool {
Expand Down
13 changes: 9 additions & 4 deletions internal/controller/clickhouse/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
testUsername = "operator"
keeperImage = "clickhouse/clickhouse-keeper:26.2"
clickhouseImage = "clickhouse/clickhouse-server:26.2"
testConfigRevision = "test-revision-v1"
)

func generateKeeperConfig() *strings.Reader {
Expand Down Expand Up @@ -70,8 +71,11 @@ remote_servers:
default:
shard:
replica: [%s]
named_collections:
__operator:
config_revision: %s
display_secrets_in_show_and_select: true
`, replica, keeperHostname, keeper.PortNative, strings.Join(replicas, ",")))
`, replica, keeperHostname, keeper.PortNative, strings.Join(replicas, ","), testConfigRevision))
}

func generateUsersConfig() *strings.Reader {
Expand Down Expand Up @@ -218,12 +222,13 @@ var _ = Describe("commander", Ordered, Label("integration"), func() {
})
})

It("should get every node version", func(ctx context.Context) {
It("should probe version and config revision in a single query", func(ctx context.Context) {
for id := range cmd.cluster.ReplicaIDs() {
Eventually(func(g Gomega) {
v, err := cmd.Version(ctx, id)
probe, err := cmd.Probe(ctx, id)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(v).To(MatchRegexp(`^\d+\.\d+\.\d+\.\d+$`))
g.Expect(probe.Version).To(MatchRegexp(`^\d+\.\d+\.\d+\.\d+$`))
g.Expect(probe.ReloadConfigRevision).To(Equal(testConfigRevision))
}, "1m", "100ms").Should(Succeed())
}
})
Expand Down
Loading
Loading