diff --git a/api/v1alpha1/hyperbytedbcluster_types.go b/api/v1alpha1/hyperbytedbcluster_types.go index a89e6f0..66b5b1e 100644 --- a/api/v1alpha1/hyperbytedbcluster_types.go +++ b/api/v1alpha1/hyperbytedbcluster_types.go @@ -55,9 +55,6 @@ type HyperbytedbClusterSpec struct { // +optional Flush FlushSpec `json:"flush,omitempty"` - // +optional - Compaction CompactionSpec `json:"compaction,omitempty"` - // +optional ChDB ChDBSpec `json:"chdb,omitempty"` @@ -85,6 +82,9 @@ type HyperbytedbClusterSpec struct { // +optional RateLimit RateLimitSpec `json:"rateLimit,omitempty"` + // +optional + Retention RetentionSpec `json:"retention,omitempty"` + // +optional Monitoring MonitoringSpec `json:"monitoring,omitempty"` @@ -167,15 +167,8 @@ type CertManagerIssuerRef struct { } type StorageSpec struct { - // +kubebuilder:default="local" - // +kubebuilder:validation:Enum=local;s3 - Backend string `json:"backend,omitempty"` - // +optional VolumeClaimTemplate *PersistentVolumeClaimSpec `json:"volumeClaimTemplate,omitempty"` - - // +optional - S3 *S3StorageSpec `json:"s3,omitempty"` } type PersistentVolumeClaimSpec struct { @@ -186,16 +179,6 @@ type PersistentVolumeClaimSpec struct { Size resource.Quantity `json:"size,omitempty"` } -type S3StorageSpec struct { - Bucket string `json:"bucket"` - Prefix string `json:"prefix,omitempty"` - Region string `json:"region,omitempty"` - Endpoint string `json:"endpoint,omitempty"` - - // Reference to a Secret containing access_key_id and secret_access_key. - CredentialsSecretName string `json:"credentialsSecretName,omitempty"` -} - type FlushSpec struct { // +kubebuilder:default=10 IntervalSecs int32 `json:"intervalSecs,omitempty"` @@ -206,7 +189,8 @@ type FlushSpec struct { // +kubebuilder:default="1h" TimeBucketDuration string `json:"timeBucketDuration,omitempty"` - // Maximum points buffered per measurement before forcing a flush. 0 = unlimited. + // Max points per chDB insert batch (clamped server-side to 10k–500k). + // +kubebuilder:default=50000 // +optional // +kubebuilder:validation:Minimum=0 MaxPointsPerBatch int32 `json:"maxPointsPerBatch,omitempty"` @@ -222,50 +206,15 @@ type FlushSpec struct { WALBatchDelayUs int64 `json:"walBatchDelayUs,omitempty"` } -type CompactionSpec struct { - // +kubebuilder:default=true - Enabled *bool `json:"enabled,omitempty"` - - // +kubebuilder:default=300 - IntervalSecs int32 `json:"intervalSecs,omitempty"` - - // +kubebuilder:default=4 - MinFilesToCompact int32 `json:"minFilesToCompact,omitempty"` - - // +kubebuilder:default=256 - TargetFileSizeMB int32 `json:"targetFileSizeMb,omitempty"` - - // Compaction time bucket. "1h" (hourly, default) or "1d" (daily). Daily - // buckets merge all 24 hourly files, reducing file count for wide-range queries. - // +optional - // +kubebuilder:validation:Enum="1h";"1d";"24h" - BucketDuration string `json:"bucketDuration,omitempty"` - - // Minimum age (seconds) before per-node files are hash-verified and merged - // into a single compacted file. - // +optional - // +kubebuilder:validation:Minimum=0 - VerifiedCompactionAgeSecs int64 `json:"verifiedCompactionAgeSecs,omitempty"` - - // When true, periodically compare each active peer's manifest against local - // bucket hashes and fetch divergent or missing slices. - // +optional - SelfRepairEnabled *bool `json:"selfRepairEnabled,omitempty"` - - // Max bucket-hash comparisons (and repair attempts) per compaction tick for - // membership self-repair. +type ChDBSpec struct { + // chDB session directory inside the data volume. Defaults to + // /var/lib/hyperbytedb/chdb when unset. // +optional - // +kubebuilder:validation:Minimum=0 - MaxRepairChecksPerCycle int32 `json:"maxRepairChecksPerCycle,omitempty"` + SessionDataPath string `json:"sessionDataPath,omitempty"` - // Max concurrent measurement compactions when running compact_all. - // +optional - // +kubebuilder:validation:Minimum=0 - CompactAllMaxInflight int32 `json:"compactAllMaxInflight,omitempty"` -} - -type ChDBSpec struct { - // +kubebuilder:default=4 + // Ignored by hyperbytedb (libchdb is a process-global singleton). Retained + // for API stability; always written as 1 in config.toml. + // +kubebuilder:default=1 PoolSize int32 `json:"poolSize,omitempty"` } @@ -286,6 +235,18 @@ type LoggingSpec struct { // +kubebuilder:default="text" // +kubebuilder:validation:Enum=text;json Format string `json:"format,omitempty"` + + // Emit per-phase performance logs (write/query/flush). Off by default. + // +optional + DetailedTrace *bool `json:"detailedTrace,omitempty"` + + // OTLP HTTP endpoint for trace export (e.g. http://alloy-logs:4318). + // +optional + OtlpEndpoint string `json:"otlpEndpoint,omitempty"` + + // Fraction of traces exported to OTLP (0.0–1.0). Default 1.0 when OTLP is set. + // +optional + OtlpSampleRatio string `json:"otlpSampleRatio,omitempty"` } type ClusterTuningSpec struct { @@ -295,17 +256,6 @@ type ClusterTuningSpec struct { // +kubebuilder:default=5 HeartbeatMissThreshold int32 `json:"heartbeatMissThreshold,omitempty"` - // +kubebuilder:default=60 - AntiEntropyIntervalSecs int32 `json:"antiEntropyIntervalSecs,omitempty"` - - // When false, hyperbytedb does not run periodic Merkle verify / delta sync. - // +kubebuilder:default=true - // +optional - AntiEntropyEnabled *bool `json:"antiEntropyEnabled,omitempty"` - - // +kubebuilder:default=4 - SyncMaxConcurrentFiles int32 `json:"syncMaxConcurrentFiles,omitempty"` - // +kubebuilder:default=5 ReplicationMaxRetries int32 `json:"replicationMaxRetries,omitempty"` @@ -428,6 +378,18 @@ type HintedHandoffSpec struct { MaxHintAgeSecs int64 `json:"maxHintAgeSecs,omitempty"` } +// RetentionSpec controls the background retention enforcement loop. +type RetentionSpec struct { + // +kubebuilder:default=true + // +optional + Enabled *bool `json:"enabled,omitempty"` + + // How often retention scans run (humantime duration, e.g. "60s", "5m", "1h"). + // +kubebuilder:default="60s" + // +optional + Interval string `json:"interval,omitempty"` +} + // RateLimitSpec controls per-endpoint request rate limiting. type RateLimitSpec struct { // +optional diff --git a/api/v1alpha1/hyperbytedbcluster_webhook.go b/api/v1alpha1/hyperbytedbcluster_webhook.go index 5dc8056..067d3f1 100644 --- a/api/v1alpha1/hyperbytedbcluster_webhook.go +++ b/api/v1alpha1/hyperbytedbcluster_webhook.go @@ -51,10 +51,6 @@ func (w *HyperbytedbClusterWebhook) Default(_ context.Context, obj *HyperbytedbC obj.Spec.Server.Port = 8086 } - if obj.Spec.Storage.Backend == "" { - obj.Spec.Storage.Backend = "local" - } - if obj.Spec.Storage.VolumeClaimTemplate == nil { obj.Spec.Storage.VolumeClaimTemplate = &PersistentVolumeClaimSpec{ Size: resource.MustParse("10Gi"), @@ -80,6 +76,14 @@ func (w *HyperbytedbClusterWebhook) Default(_ context.Context, obj *HyperbytedbC obj.Spec.Flush.TimeBucketDuration = "1h" } + if obj.Spec.ChDB.PoolSize == 0 { + obj.Spec.ChDB.PoolSize = 1 + } + + if obj.Spec.Retention.Interval == "" { + obj.Spec.Retention.Interval = "60s" + } + return nil } @@ -111,10 +115,6 @@ func validateCluster(cluster *HyperbytedbCluster) (admission.Warnings, error) { return nil, fmt.Errorf("server port must be between 1 and 65535, got %d", cluster.Spec.Server.Port) } - if cluster.Spec.Storage.Backend == "s3" && cluster.Spec.Storage.S3 == nil { - return nil, fmt.Errorf("storage.s3 configuration required when backend is 's3'") - } - if cluster.Spec.Autoscaling != nil && cluster.Spec.Autoscaling.Enabled { if cluster.Spec.Autoscaling.MaxReplicas < 1 { return nil, fmt.Errorf("autoscaling.maxReplicas must be at least 1") diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index f534186..222f46d 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -121,11 +121,6 @@ func (in *ChDBSpec) DeepCopy() *ChDBSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClusterTuningSpec) DeepCopyInto(out *ClusterTuningSpec) { *out = *in - if in.AntiEntropyEnabled != nil { - in, out := &in.AntiEntropyEnabled, &out.AntiEntropyEnabled - *out = new(bool) - **out = **in - } if in.Replication != nil { in, out := &in.Replication, &out.Replication *out = new(ReplicationSpec) @@ -148,31 +143,6 @@ func (in *ClusterTuningSpec) DeepCopy() *ClusterTuningSpec { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *CompactionSpec) DeepCopyInto(out *CompactionSpec) { - *out = *in - if in.Enabled != nil { - in, out := &in.Enabled, &out.Enabled - *out = new(bool) - **out = **in - } - if in.SelfRepairEnabled != nil { - in, out := &in.SelfRepairEnabled, &out.SelfRepairEnabled - *out = new(bool) - **out = **in - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CompactionSpec. -func (in *CompactionSpec) DeepCopy() *CompactionSpec { - if in == nil { - return nil - } - out := new(CompactionSpec) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *FailoverSpec) DeepCopyInto(out *FailoverSpec) { *out = *in @@ -412,16 +382,16 @@ func (in *HyperbytedbClusterSpec) DeepCopyInto(out *HyperbytedbClusterSpec) { in.Server.DeepCopyInto(&out.Server) in.Storage.DeepCopyInto(&out.Storage) out.Flush = in.Flush - in.Compaction.DeepCopyInto(&out.Compaction) out.ChDB = in.ChDB out.Auth = in.Auth - out.Logging = in.Logging + in.Logging.DeepCopyInto(&out.Logging) in.Resources.DeepCopyInto(&out.Resources) in.Cluster.DeepCopyInto(&out.Cluster) out.Cardinality = in.Cardinality in.StatementSummary.DeepCopyInto(&out.StatementSummary) in.HintedHandoff.DeepCopyInto(&out.HintedHandoff) in.RateLimit.DeepCopyInto(&out.RateLimit) + in.Retention.DeepCopyInto(&out.Retention) out.Monitoring = in.Monitoring if in.Autoscaling != nil { in, out := &in.Autoscaling, &out.Autoscaling @@ -657,6 +627,11 @@ func (in *HyperbytedbRestoreStatus) DeepCopy() *HyperbytedbRestoreStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LoggingSpec) DeepCopyInto(out *LoggingSpec) { *out = *in + if in.DetailedTrace != nil { + in, out := &in.DetailedTrace, &out.DetailedTrace + *out = new(bool) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LoggingSpec. @@ -818,31 +793,36 @@ func (in *RestoreSource) DeepCopy() *RestoreSource { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *S3BackupSpec) DeepCopyInto(out *S3BackupSpec) { +func (in *RetentionSpec) DeepCopyInto(out *RetentionSpec) { *out = *in + if in.Enabled != nil { + in, out := &in.Enabled, &out.Enabled + *out = new(bool) + **out = **in + } } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new S3BackupSpec. -func (in *S3BackupSpec) DeepCopy() *S3BackupSpec { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RetentionSpec. +func (in *RetentionSpec) DeepCopy() *RetentionSpec { if in == nil { return nil } - out := new(S3BackupSpec) + out := new(RetentionSpec) in.DeepCopyInto(out) return out } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *S3StorageSpec) DeepCopyInto(out *S3StorageSpec) { +func (in *S3BackupSpec) DeepCopyInto(out *S3BackupSpec) { *out = *in } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new S3StorageSpec. -func (in *S3StorageSpec) DeepCopy() *S3StorageSpec { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new S3BackupSpec. +func (in *S3BackupSpec) DeepCopy() *S3BackupSpec { if in == nil { return nil } - out := new(S3StorageSpec) + out := new(S3BackupSpec) in.DeepCopyInto(out) return out } @@ -895,11 +875,6 @@ func (in *StorageSpec) DeepCopyInto(out *StorageSpec) { *out = new(PersistentVolumeClaimSpec) (*in).DeepCopyInto(*out) } - if in.S3 != nil { - in, out := &in.S3, &out.S3 - *out = new(S3StorageSpec) - **out = **in - } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StorageSpec. diff --git a/config/crd/bases/hyperbytedb.hyperbytedb.io_hyperbytedbclusters.yaml b/config/crd/bases/hyperbytedb.hyperbytedb.io_hyperbytedbclusters.yaml index 45cc3b0..507e184 100644 --- a/config/crd/bases/hyperbytedb.hyperbytedb.io_hyperbytedbclusters.yaml +++ b/config/crd/bases/hyperbytedb.hyperbytedb.io_hyperbytedbclusters.yaml @@ -2977,21 +2977,20 @@ spec: chdb: properties: poolSize: - default: 4 + default: 1 + description: |- + Ignored by hyperbytedb (libchdb is a process-global singleton). Retained + for API stability; always written as 1 in config.toml. format: int32 type: integer + sessionDataPath: + description: |- + chDB session directory inside the data volume. Defaults to + /var/lib/hyperbytedb/chdb when unset. + type: string type: object cluster: properties: - antiEntropyEnabled: - default: true - description: When false, hyperbytedb does not run periodic Merkle - verify / delta sync. - type: boolean - antiEntropyIntervalSecs: - default: 60 - format: int32 - type: integer heartbeatIntervalSecs: default: 2 format: int32 @@ -3085,10 +3084,6 @@ spec: format: int64 minimum: 0 type: integer - syncMaxConcurrentFiles: - default: 4 - format: int32 - type: integer tls: description: TLS for inter-node replication traffic. properties: @@ -3117,58 +3112,6 @@ spec: - enabled type: object type: object - compaction: - properties: - bucketDuration: - description: |- - Compaction time bucket. "1h" (hourly, default) or "1d" (daily). Daily - buckets merge all 24 hourly files, reducing file count for wide-range queries. - enum: - - 1h - - 1d - - 24h - type: string - compactAllMaxInflight: - description: Max concurrent measurement compactions when running - compact_all. - format: int32 - minimum: 0 - type: integer - enabled: - default: true - type: boolean - intervalSecs: - default: 300 - format: int32 - type: integer - maxRepairChecksPerCycle: - description: |- - Max bucket-hash comparisons (and repair attempts) per compaction tick for - membership self-repair. - format: int32 - minimum: 0 - type: integer - minFilesToCompact: - default: 4 - format: int32 - type: integer - selfRepairEnabled: - description: |- - When true, periodically compare each active peer's manifest against local - bucket hashes and fetch divergent or missing slices. - type: boolean - targetFileSizeMb: - default: 256 - format: int32 - type: integer - verifiedCompactionAgeSecs: - description: |- - Minimum age (seconds) before per-node files are hash-verified and merged - into a single compacted file. - format: int64 - minimum: 0 - type: integer - type: object failover: description: FailoverSpec controls automatic failure detection and recovery. @@ -3199,8 +3142,9 @@ spec: format: int32 type: integer maxPointsPerBatch: - description: Maximum points buffered per measurement before forcing - a flush. 0 = unlimited. + default: 50000 + description: Max points per chDB insert batch (clamped server-side + to 10k–500k). format: int32 minimum: 0 type: integer @@ -3271,6 +3215,10 @@ spec: type: array logging: properties: + detailedTrace: + description: Emit per-phase performance logs (write/query/flush). + Off by default. + type: boolean format: default: text enum: @@ -3286,6 +3234,13 @@ spec: - warn - error type: string + otlpEndpoint: + description: OTLP HTTP endpoint for trace export (e.g. http://alloy-logs:4318). + type: string + otlpSampleRatio: + description: Fraction of traces exported to OTLP (0.0–1.0). Default + 1.0 when OTLP is set. + type: string type: object monitoring: properties: @@ -3560,6 +3515,19 @@ spec: More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ type: object type: object + retention: + description: RetentionSpec controls the background retention enforcement + loop. + properties: + enabled: + default: true + type: boolean + interval: + default: 60s + description: How often retention scans run (humantime duration, + e.g. "60s", "5m", "1h"). + type: string + type: object server: properties: maxBodySizeBytes: @@ -3629,29 +3597,6 @@ spec: type: object storage: properties: - backend: - default: local - enum: - - local - - s3 - type: string - s3: - properties: - bucket: - type: string - credentialsSecretName: - description: Reference to a Secret containing access_key_id - and secret_access_key. - type: string - endpoint: - type: string - prefix: - type: string - region: - type: string - required: - - bucket - type: object volumeClaimTemplate: properties: size: diff --git a/config/samples/hyperbytedb_v1alpha1_hyperbytedbcluster_cluster.yaml b/config/samples/hyperbytedb_v1alpha1_hyperbytedbcluster_cluster.yaml index f2c3dce..373b83a 100644 --- a/config/samples/hyperbytedb_v1alpha1_hyperbytedbcluster_cluster.yaml +++ b/config/samples/hyperbytedb_v1alpha1_hyperbytedbcluster_cluster.yaml @@ -11,21 +11,21 @@ spec: port: 8086 requestTimeoutSecs: 30 queryTimeoutSecs: 30 + maxConcurrentQueries: 32 storage: - backend: local volumeClaimTemplate: size: 10Gi flush: intervalSecs: 10 walSizeThresholdMb: 64 timeBucketDuration: "1h" - compaction: - enabled: true - intervalSecs: 300 - minFilesToCompact: 4 - targetFileSizeMb: 256 + maxPointsPerBatch: 50000 chdb: - poolSize: 4 + sessionDataPath: /var/lib/hyperbytedb/chdb + poolSize: 1 + retention: + enabled: true + interval: 60s logging: level: info format: json @@ -39,10 +39,12 @@ spec: cluster: heartbeatIntervalSecs: 2 heartbeatMissThreshold: 5 - antiEntropyIntervalSecs: 60 replicationMaxRetries: 5 raftHeartbeatIntervalMs: 300 raftElectionTimeoutMs: 1000 + replication: + mode: async + ackTimeoutMs: 5000 monitoring: enabled: true serviceMonitor: true diff --git a/config/samples/hyperbytedb_v1alpha1_hyperbytedbcluster_ha.yaml b/config/samples/hyperbytedb_v1alpha1_hyperbytedbcluster_ha.yaml index 1b62086..76c2324 100644 --- a/config/samples/hyperbytedb_v1alpha1_hyperbytedbcluster_ha.yaml +++ b/config/samples/hyperbytedb_v1alpha1_hyperbytedbcluster_ha.yaml @@ -12,33 +12,39 @@ spec: requestTimeoutSecs: 60 queryTimeoutSecs: 60 storage: - backend: s3 volumeClaimTemplate: size: 50Gi storageClassName: fast-ssd - s3: - bucket: hyperbytedb-data - prefix: "production/" - region: us-east-1 - endpoint: "" - credentialsSecretName: hyperbytedb-s3-credentials flush: intervalSecs: 5 walSizeThresholdMb: 128 timeBucketDuration: "1h" - compaction: - enabled: true - intervalSecs: 120 - minFilesToCompact: 4 - targetFileSizeMb: 512 chdb: - poolSize: 8 + sessionDataPath: /var/lib/hyperbytedb/chdb auth: enabled: true credentialsSecretName: hyperbytedb-auth logging: level: info format: json + otlpEndpoint: http://alloy-logs:4318 + otlpSampleRatio: "0.1" + cardinality: + maxTagValuesPerMeasurement: 100000 + maxMeasurementsPerDatabase: 10000 + statementSummary: + enabled: true + maxEntries: 1000 + hintedHandoff: + enabled: true + maxHintsPerPeer: 100000 + maxHintAgeSecs: 3600 + rateLimit: + enabled: true + maxRequestsPerSecond: 1000 + retention: + enabled: true + interval: 5m resources: requests: cpu: "2" @@ -49,12 +55,20 @@ spec: cluster: heartbeatIntervalSecs: 1 heartbeatMissThreshold: 3 - antiEntropyIntervalSecs: 30 - syncMaxConcurrentFiles: 8 replicationMaxRetries: 10 raftHeartbeatIntervalMs: 200 raftElectionTimeoutMs: 800 raftSnapshotThreshold: 500 + replicationQueueDepth: 8192 + replicationMaxInflightBatches: 8 + replicationMaxCoalesceBodyBytes: 8388608 + replicateReceiverQueueDepth: 1024 + replicationTruncateStalePeerMultiplier: 2 + replication: + mode: sync_quorum + ackTimeoutMs: 5000 + syncQuorum: + minAcks: majority monitoring: enabled: true serviceMonitor: true diff --git a/config/samples/hyperbytedb_v1alpha1_hyperbytedbcluster_single.yaml b/config/samples/hyperbytedb_v1alpha1_hyperbytedbcluster_single.yaml index 78feef6..821aa06 100644 --- a/config/samples/hyperbytedb_v1alpha1_hyperbytedbcluster_single.yaml +++ b/config/samples/hyperbytedb_v1alpha1_hyperbytedbcluster_single.yaml @@ -10,13 +10,13 @@ spec: server: port: 8086 storage: - backend: local volumeClaimTemplate: size: 5Gi flush: intervalSecs: 10 - compaction: + retention: enabled: true + interval: 60s logging: level: info format: text diff --git a/dist/chart/templates/crd/hyperbytedbclusters.hyperbytedb.hyperbytedb.io.yaml b/dist/chart/templates/crd/hyperbytedbclusters.hyperbytedb.hyperbytedb.io.yaml index 4de2b5d..507e184 100644 --- a/dist/chart/templates/crd/hyperbytedbclusters.hyperbytedb.hyperbytedb.io.yaml +++ b/dist/chart/templates/crd/hyperbytedbclusters.hyperbytedb.hyperbytedb.io.yaml @@ -1,11 +1,8 @@ -{{- if .Values.crd.enable }} +--- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - {{- if .Values.crd.keep }} - "helm.sh/resource-policy": keep - {{- end }} controller-gen.kubebuilder.io/version: v0.20.1 name: hyperbytedbclusters.hyperbytedb.hyperbytedb.io spec: @@ -36,7 +33,8 @@ spec: name: v1alpha1 schema: openAPIV3Schema: - description: HyperbytedbCluster is the Schema for the hyperbytedbclusters API. + description: HyperbytedbCluster is the Schema for the hyperbytedbclusters + API. properties: apiVersion: description: |- @@ -2963,24 +2961,36 @@ spec: - enabled - maxReplicas type: object + cardinality: + description: CardinalitySpec configures cardinality limits enforced + by hyperbytedb. + properties: + maxMeasurementsPerDatabase: + format: int64 + minimum: 0 + type: integer + maxTagValuesPerMeasurement: + format: int64 + minimum: 0 + type: integer + type: object chdb: properties: poolSize: - default: 4 + default: 1 + description: |- + Ignored by hyperbytedb (libchdb is a process-global singleton). Retained + for API stability; always written as 1 in config.toml. format: int32 type: integer + sessionDataPath: + description: |- + chDB session directory inside the data volume. Defaults to + /var/lib/hyperbytedb/chdb when unset. + type: string type: object cluster: properties: - antiEntropyEnabled: - default: true - description: When false, hyperbytedb does not run periodic Merkle - verify / delta sync. - type: boolean - antiEntropyIntervalSecs: - default: 60 - format: int32 - type: integer heartbeatIntervalSecs: default: 2 format: int32 @@ -3001,13 +3011,78 @@ spec: default: 1000 format: int32 type: integer + replicateReceiverQueueDepth: + description: Bounded apply queue on the replicate receiver. + format: int32 + minimum: 0 + type: integer + replication: + description: Per-node, per-write replication mode and tuning. + properties: + ackTimeoutMs: + default: 5000 + description: |- + Worst-case latency budget (ms) for sync_quorum writes. On timeout the + coordinator returns 504; in-flight peer tasks keep running and unacked + peers fall back to hinted handoff. + format: int64 + minimum: 0 + type: integer + mode: + default: async + description: |- + Replication mode. "async" is fire-and-forget HTTP fan-out (default, + preserves today's behavior). "sync_quorum" awaits W-of-N peer acks + before returning to the client. + enum: + - async + - sync_quorum + type: string + syncQuorum: + description: SyncQuorumSpec configures the sync_quorum replication + mode. + properties: + minAcks: + anyOf: + - type: integer + - type: string + description: |- + Number of peer acks required for sync_quorum. Either the string + "majority" (resolved at request time against current active peers) or an + explicit integer count. The local WAL append always happens first, so + self-durability is implicit and the local node is never counted toward + the quorum. + x-kubernetes-int-or-string: true + type: object + type: object + replicationMaxCoalesceBodyBytes: + description: Max bytes for coalescing consecutive WAL batches + with the same db/rp/precision. + format: int64 + minimum: 0 + type: integer + replicationMaxInflightBatches: + description: Max concurrent outbound replication fan-out rounds + (token bucket). + format: int32 + minimum: 0 + type: integer replicationMaxRetries: default: 5 format: int32 type: integer - syncMaxConcurrentFiles: - default: 4 + replicationQueueDepth: + description: Bounded outbound replication queue depth (ingest-sized + batches). format: int32 + minimum: 0 + type: integer + replicationTruncateStalePeerMultiplier: + description: |- + When >0, peers with ack 0 and stale heartbeats (older than + heartbeatIntervalSecs * multiplier) are omitted from the WAL truncate barrier. + format: int64 + minimum: 0 type: integer tls: description: TLS for inter-node replication traffic. @@ -3037,24 +3112,6 @@ spec: - enabled type: object type: object - compaction: - properties: - enabled: - default: true - type: boolean - intervalSecs: - default: 300 - format: int32 - type: integer - minFilesToCompact: - default: 4 - format: int32 - type: integer - targetFileSizeMb: - default: 256 - format: int32 - type: integer - type: object failover: description: FailoverSpec controls automatic failure detection and recovery. @@ -3084,14 +3141,53 @@ spec: default: 10 format: int32 type: integer + maxPointsPerBatch: + default: 50000 + description: Max points per chDB insert batch (clamped server-side + to 10k–500k). + format: int32 + minimum: 0 + type: integer timeBucketDuration: default: 1h type: string + walBatchDelayUs: + description: 'WAL group-commit: max microseconds to wait for more + entries before flushing.' + format: int64 + minimum: 0 + type: integer + walBatchSize: + description: 'WAL group-commit: max entries to coalesce per write + batch. 0 = disabled.' + format: int32 + minimum: 0 + type: integer walSizeThresholdMb: default: 64 format: int32 type: integer type: object + hintedHandoff: + description: |- + HintedHandoffSpec configures the hinted-handoff queue used to retry writes + against peers that were temporarily unreachable. + properties: + enabled: + type: boolean + maxHintAgeSecs: + description: Hints older than this (seconds) are discarded on + drain. + format: int64 + minimum: 0 + type: integer + maxHintsPerPeer: + description: Maximum queued hints per unreachable peer before + oldest are dropped. + format: int64 + minimum: 0 + type: integer + type: object image: default: hyperbytedb:latest type: string @@ -3119,6 +3215,10 @@ spec: type: array logging: properties: + detailedTrace: + description: Emit per-phase performance logs (write/query/flush). + Off by default. + type: boolean format: default: text enum: @@ -3134,6 +3234,13 @@ spec: - warn - error type: string + otlpEndpoint: + description: OTLP HTTP endpoint for trace export (e.g. http://alloy-logs:4318). + type: string + otlpSampleRatio: + description: Fraction of traces exported to OTLP (0.0–1.0). Default + 1.0 when OTLP is set. + type: string type: object monitoring: properties: @@ -3157,6 +3264,193 @@ spec: additionalProperties: type: string type: object + proxy: + description: |- + Proxy deploys a stateless `hyperbytedb-proxy` Deployment in front of the + StatefulSet to absorb rolling restarts and drain events without + returning errors to clients. + properties: + enabled: + default: false + description: |- + When false, the operator does not create or reconcile any proxy + resources. Existing proxy Deployment/Service (if any) are left alone + so they can be cleaned up out-of-band. + type: boolean + healthPath: + description: |- + HTTP path used for backend health probes. Defaults to `/health`. + Set to `/health/ready` for the deeper chDB-aware readiness check. + type: string + holdTimeoutSecs: + default: 10 + description: |- + How long the proxy waits for a backend to come back before failing a + request with 503. Bigger values mean rolling restarts are smoother but + individual stuck requests sit longer. + format: int32 + minimum: 0 + type: integer + image: + default: hyperbytedb-proxy:latest + type: string + imagePullPolicy: + description: PullPolicy describes a policy for if/when to pull + a container image + type: string + imagePullSecrets: + items: + description: |- + LocalObjectReference contains enough information to let you locate the + referenced object inside the same namespace. + properties: + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + type: string + type: object + x-kubernetes-map-type: atomic + type: array + maxRetries: + default: 2 + description: Cap on per-backend retries for one request. 0 disables + retries. + format: int32 + minimum: 0 + type: integer + nodePort: + description: |- + Explicit nodePort when ServiceType=NodePort. Required for kind clusters + that pre-map a host port to a fixed nodePort. + format: int32 + maximum: 32767 + minimum: 30000 + type: integer + podAnnotations: + additionalProperties: + type: string + type: object + podLabels: + additionalProperties: + type: string + type: object + port: + description: |- + Port the proxy Service exposes. Defaults to the cluster server port so + existing clients can re-target the Service name with no port change. + format: int32 + maximum: 65535 + minimum: 1 + type: integer + replicas: + default: 2 + format: int32 + minimum: 1 + type: integer + requestTimeoutSecs: + description: |- + Per-request budget the proxy allows for the upstream call. Defaults + to ~ServerSpec.RequestTimeoutSecs. + format: int32 + minimum: 1 + type: integer + resources: + description: ResourceRequirements describes the compute resource + requirements. + properties: + claims: + description: |- + Claims lists the names of resources, defined in spec.resourceClaims, + that are used by this container. + + This field depends on the + DynamicResourceAllocation feature gate. + + This field is immutable. It can only be set for containers. + items: + description: ResourceClaim references one entry in PodSpec.ResourceClaims. + properties: + name: + description: |- + Name must match the name of one entry in pod.spec.resourceClaims of + the Pod where this field is used. It makes that resource available + inside a container. + type: string + request: + description: |- + Request is the name chosen for a request in the referenced claim. + If empty, everything from the claim is made available, otherwise + only the result of this request. + type: string + required: + - name + type: object + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: |- + Limits describes the maximum amount of compute resources allowed. + More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: |- + Requests describes the minimum amount of compute resources required. + If Requests is omitted for a container, it defaults to Limits if that is explicitly specified, + otherwise to an implementation-defined value. Requests cannot exceed Limits. + More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + type: object + type: object + serviceType: + description: |- + Type of the proxy Service. Defaults to ClusterIP. Set NodePort/ + LoadBalancer to expose externally. + enum: + - ClusterIP + - NodePort + - LoadBalancer + type: string + shutdownGraceSecs: + default: 30 + description: |- + How long the proxy keeps serving in-flight requests after SIGTERM + before exiting. Should comfortably exceed the longest expected query. + format: int32 + minimum: 1 + type: integer + required: + - enabled + type: object + rateLimit: + description: RateLimitSpec controls per-endpoint request rate limiting. + properties: + enabled: + type: boolean + maxRequestsPerSecond: + description: Maximum requests per second per endpoint (/write, + /query). 0 = unlimited. + format: int64 + minimum: 0 + type: integer + type: object replicas: default: 1 format: int32 @@ -3221,12 +3515,30 @@ spec: More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ type: object type: object + retention: + description: RetentionSpec controls the background retention enforcement + loop. + properties: + enabled: + default: true + type: boolean + interval: + default: 60s + description: How often retention scans run (humantime duration, + e.g. "60s", "5m", "1h"). + type: string + type: object server: properties: maxBodySizeBytes: default: 26214400 format: int64 type: integer + maxConcurrentQueries: + description: Maximum concurrent /query requests. 0 = unlimited. + format: int32 + minimum: 0 + type: integer port: default: 8086 format: int32 @@ -3268,31 +3580,23 @@ spec: - enabled type: object type: object + statementSummary: + description: |- + StatementSummarySpec controls collection of per-statement execution stats + exposed via /debug/statement_summary. + properties: + enabled: + type: boolean + maxEntries: + description: |- + Maximum number of distinct statements tracked. Oldest entries are evicted + when the limit is exceeded. + format: int32 + minimum: 0 + type: integer + type: object storage: properties: - backend: - default: local - enum: - - local - - s3 - type: string - s3: - properties: - bucket: - type: string - credentialsSecretName: - description: Reference to a Secret containing access_key_id - and secret_access_key. - type: string - endpoint: - type: string - prefix: - type: string - region: - type: string - required: - - bucket - type: object volumeClaimTemplate: properties: size: @@ -3673,4 +3977,3 @@ spec: storage: true subresources: status: {} -{{- end }} diff --git a/dist/install.yaml b/dist/install.yaml index 66f0139..6b14655 100644 --- a/dist/install.yaml +++ b/dist/install.yaml @@ -3180,21 +3180,20 @@ spec: chdb: properties: poolSize: - default: 4 + default: 1 + description: |- + Ignored by hyperbytedb (libchdb is a process-global singleton). Retained + for API stability; always written as 1 in config.toml. format: int32 type: integer + sessionDataPath: + description: |- + chDB session directory inside the data volume. Defaults to + /var/lib/hyperbytedb/chdb when unset. + type: string type: object cluster: properties: - antiEntropyEnabled: - default: true - description: When false, hyperbytedb does not run periodic Merkle - verify / delta sync. - type: boolean - antiEntropyIntervalSecs: - default: 60 - format: int32 - type: integer heartbeatIntervalSecs: default: 2 format: int32 @@ -3288,10 +3287,6 @@ spec: format: int64 minimum: 0 type: integer - syncMaxConcurrentFiles: - default: 4 - format: int32 - type: integer tls: description: TLS for inter-node replication traffic. properties: @@ -3320,58 +3315,6 @@ spec: - enabled type: object type: object - compaction: - properties: - bucketDuration: - description: |- - Compaction time bucket. "1h" (hourly, default) or "1d" (daily). Daily - buckets merge all 24 hourly files, reducing file count for wide-range queries. - enum: - - 1h - - 1d - - 24h - type: string - compactAllMaxInflight: - description: Max concurrent measurement compactions when running - compact_all. - format: int32 - minimum: 0 - type: integer - enabled: - default: true - type: boolean - intervalSecs: - default: 300 - format: int32 - type: integer - maxRepairChecksPerCycle: - description: |- - Max bucket-hash comparisons (and repair attempts) per compaction tick for - membership self-repair. - format: int32 - minimum: 0 - type: integer - minFilesToCompact: - default: 4 - format: int32 - type: integer - selfRepairEnabled: - description: |- - When true, periodically compare each active peer's manifest against local - bucket hashes and fetch divergent or missing slices. - type: boolean - targetFileSizeMb: - default: 256 - format: int32 - type: integer - verifiedCompactionAgeSecs: - description: |- - Minimum age (seconds) before per-node files are hash-verified and merged - into a single compacted file. - format: int64 - minimum: 0 - type: integer - type: object failover: description: FailoverSpec controls automatic failure detection and recovery. @@ -3402,8 +3345,9 @@ spec: format: int32 type: integer maxPointsPerBatch: - description: Maximum points buffered per measurement before forcing - a flush. 0 = unlimited. + default: 50000 + description: Max points per chDB insert batch (clamped server-side + to 10k–500k). format: int32 minimum: 0 type: integer @@ -3474,6 +3418,10 @@ spec: type: array logging: properties: + detailedTrace: + description: Emit per-phase performance logs (write/query/flush). + Off by default. + type: boolean format: default: text enum: @@ -3489,6 +3437,13 @@ spec: - warn - error type: string + otlpEndpoint: + description: OTLP HTTP endpoint for trace export (e.g. http://alloy-logs:4318). + type: string + otlpSampleRatio: + description: Fraction of traces exported to OTLP (0.0–1.0). Default + 1.0 when OTLP is set. + type: string type: object monitoring: properties: @@ -3763,6 +3718,19 @@ spec: More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ type: object type: object + retention: + description: RetentionSpec controls the background retention enforcement + loop. + properties: + enabled: + default: true + type: boolean + interval: + default: 60s + description: How often retention scans run (humantime duration, + e.g. "60s", "5m", "1h"). + type: string + type: object server: properties: maxBodySizeBytes: @@ -3832,29 +3800,6 @@ spec: type: object storage: properties: - backend: - default: local - enum: - - local - - s3 - type: string - s3: - properties: - bucket: - type: string - credentialsSecretName: - description: Reference to a Secret containing access_key_id - and secret_access_key. - type: string - endpoint: - type: string - prefix: - type: string - region: - type: string - required: - - bucket - type: object volumeClaimTemplate: properties: size: diff --git a/internal/hyperbytedb/configmap.go b/internal/hyperbytedb/configmap.go index 2a2b9c1..b3f8c01 100644 --- a/internal/hyperbytedb/configmap.go +++ b/internal/hyperbytedb/configmap.go @@ -12,6 +12,14 @@ import ( v1alpha1 "github.com/hyperbyte-cloud/hyperbytedb-operator/api/v1alpha1" ) +const ( + defaultChdbSessionPath = "/var/lib/hyperbytedb/chdb" + defaultWalDir = "/var/lib/hyperbytedb/wal" + defaultMetaDir = "/var/lib/hyperbytedb/meta" + defaultRaftDir = "/var/lib/hyperbytedb/raft" + defaultReplLogDir = "/var/lib/hyperbytedb/replication_log" +) + func ConfigMapName(cluster *v1alpha1.HyperbytedbCluster) string { return cluster.Name + "-config" } @@ -42,11 +50,15 @@ func ConfigHash(cluster *v1alpha1.HyperbytedbCluster) string { } func renderConfigTOML(cluster *v1alpha1.HyperbytedbCluster) string { + return renderConfigTOMLWithClusterEnabled(cluster, clusterMetadataEnabled(cluster)) +} + +func clusterMetadataEnabled(cluster *v1alpha1.HyperbytedbCluster) bool { replicas := int32(1) if cluster.Spec.Replicas != nil { replicas = *cluster.Spec.Replicas } - return renderConfigTOMLWithClusterEnabled(cluster, replicas > 1) + return replicas > 1 } func renderConfigTOMLWithClusterEnabled(cluster *v1alpha1.HyperbytedbCluster, clusterEnabled bool) string { @@ -54,9 +66,8 @@ func renderConfigTOMLWithClusterEnabled(cluster *v1alpha1.HyperbytedbCluster, cl var b strings.Builder writeServerSection(&b, spec) - writeStorageSection(&b, spec) + writeStorageSection(&b) writeFlushSection(&b, spec) - writeCompactionSection(&b, spec) writeChdbSection(&b, spec) writeAuthSection(&b, spec) writeCardinalitySection(&b, spec) @@ -64,7 +75,8 @@ func renderConfigTOMLWithClusterEnabled(cluster *v1alpha1.HyperbytedbCluster, cl writeStatementSummarySection(&b, spec) writeHintedHandoffSection(&b, spec) writeRateLimitSection(&b, spec) - writeClusterSection(&b, spec, clusterEnabled) + writeRetentionSection(&b, spec) + writeClusterSection(&b, cluster, clusterEnabled) return b.String() } @@ -86,43 +98,20 @@ func writeServerSection(b *strings.Builder, spec *v1alpha1.HyperbytedbClusterSpe if spec.Server.QueryTimeoutSecs > 0 { fmt.Fprintf(b, "query_timeout_secs = %d\n", spec.Server.QueryTimeoutSecs) } - // 0 means unlimited; emit only when caller explicitly set a positive limit. if spec.Server.MaxConcurrentQueries > 0 { fmt.Fprintf(b, "max_concurrent_queries = %d\n", spec.Server.MaxConcurrentQueries) } - if spec.Server.TLS != nil && spec.Server.TLS.Enabled { - b.WriteString("\n[server.tls]\n") - b.WriteString("enabled = true\n") - b.WriteString("cert_file = \"/etc/hyperbytedb/tls/tls.crt\"\n") - b.WriteString("key_file = \"/etc/hyperbytedb/tls/tls.key\"\n") + b.WriteString("tls_enabled = true\n") + b.WriteString("tls_cert_path = \"/etc/hyperbytedb/tls/tls.crt\"\n") + b.WriteString("tls_key_path = \"/etc/hyperbytedb/tls/tls.key\"\n") } } -func writeStorageSection(b *strings.Builder, spec *v1alpha1.HyperbytedbClusterSpec) { +func writeStorageSection(b *strings.Builder) { b.WriteString("\n[storage]\n") - b.WriteString("data_dir = \"/var/lib/hyperbytedb/data\"\n") - b.WriteString("wal_dir = \"/var/lib/hyperbytedb/wal\"\n") - b.WriteString("meta_dir = \"/var/lib/hyperbytedb/meta\"\n") - backend := "local" - if spec.Storage.Backend != "" { - backend = spec.Storage.Backend - } - fmt.Fprintf(b, "backend = \"%s\"\n", backend) - - if spec.Storage.S3 != nil { - b.WriteString("\n[storage.s3]\n") - fmt.Fprintf(b, "bucket = \"%s\"\n", spec.Storage.S3.Bucket) - if spec.Storage.S3.Prefix != "" { - fmt.Fprintf(b, "prefix = \"%s\"\n", spec.Storage.S3.Prefix) - } - if spec.Storage.S3.Region != "" { - fmt.Fprintf(b, "region = \"%s\"\n", spec.Storage.S3.Region) - } - if spec.Storage.S3.Endpoint != "" { - fmt.Fprintf(b, "endpoint = \"%s\"\n", spec.Storage.S3.Endpoint) - } - } + fmt.Fprintf(b, "wal_dir = \"%s\"\n", defaultWalDir) + fmt.Fprintf(b, "meta_dir = \"%s\"\n", defaultMetaDir) } func writeFlushSection(b *strings.Builder, spec *v1alpha1.HyperbytedbClusterSpec) { @@ -142,9 +131,11 @@ func writeFlushSection(b *strings.Builder, spec *v1alpha1.HyperbytedbClusterSpec timeBucket = spec.Flush.TimeBucketDuration } fmt.Fprintf(b, "time_bucket_duration = \"%s\"\n", timeBucket) + maxPointsPerBatch := int32(50000) if spec.Flush.MaxPointsPerBatch > 0 { - fmt.Fprintf(b, "max_points_per_batch = %d\n", spec.Flush.MaxPointsPerBatch) + maxPointsPerBatch = spec.Flush.MaxPointsPerBatch } + fmt.Fprintf(b, "max_points_per_batch = %d\n", maxPointsPerBatch) if spec.Flush.WALBatchSize > 0 { fmt.Fprintf(b, "wal_batch_size = %d\n", spec.Flush.WALBatchSize) } @@ -153,49 +144,14 @@ func writeFlushSection(b *strings.Builder, spec *v1alpha1.HyperbytedbClusterSpec } } -func writeCompactionSection(b *strings.Builder, spec *v1alpha1.HyperbytedbClusterSpec) { - b.WriteString("\n[compaction]\n") - compactionEnabled := true - if spec.Compaction.Enabled != nil { - compactionEnabled = *spec.Compaction.Enabled - } - fmt.Fprintf(b, "enabled = %t\n", compactionEnabled) - compactionInterval := int32(300) - if spec.Compaction.IntervalSecs > 0 { - compactionInterval = spec.Compaction.IntervalSecs - } - fmt.Fprintf(b, "interval_secs = %d\n", compactionInterval) - minFiles := int32(4) - if spec.Compaction.MinFilesToCompact > 0 { - minFiles = spec.Compaction.MinFilesToCompact - } - fmt.Fprintf(b, "min_files_to_compact = %d\n", minFiles) - targetSize := int32(256) - if spec.Compaction.TargetFileSizeMB > 0 { - targetSize = spec.Compaction.TargetFileSizeMB - } - fmt.Fprintf(b, "target_file_size_mb = %d\n", targetSize) - if spec.Compaction.BucketDuration != "" { - fmt.Fprintf(b, "bucket_duration = \"%s\"\n", spec.Compaction.BucketDuration) - } - if spec.Compaction.VerifiedCompactionAgeSecs > 0 { - fmt.Fprintf(b, "verified_compaction_age_secs = %d\n", spec.Compaction.VerifiedCompactionAgeSecs) - } - if spec.Compaction.SelfRepairEnabled != nil { - fmt.Fprintf(b, "self_repair_enabled = %t\n", *spec.Compaction.SelfRepairEnabled) - } - if spec.Compaction.MaxRepairChecksPerCycle > 0 { - fmt.Fprintf(b, "max_repair_checks_per_cycle = %d\n", spec.Compaction.MaxRepairChecksPerCycle) - } - if spec.Compaction.CompactAllMaxInflight > 0 { - fmt.Fprintf(b, "compact_all_max_inflight = %d\n", spec.Compaction.CompactAllMaxInflight) - } -} - func writeChdbSection(b *strings.Builder, spec *v1alpha1.HyperbytedbClusterSpec) { b.WriteString("\n[chdb]\n") - b.WriteString("session_data_path = \"/var/lib/hyperbytedb/chdb\"\n") - poolSize := int32(4) + sessionPath := defaultChdbSessionPath + if spec.ChDB.SessionDataPath != "" { + sessionPath = spec.ChDB.SessionDataPath + } + fmt.Fprintf(b, "session_data_path = \"%s\"\n", sessionPath) + poolSize := int32(1) if spec.ChDB.PoolSize > 0 { poolSize = spec.ChDB.PoolSize } @@ -208,16 +164,17 @@ func writeAuthSection(b *strings.Builder, spec *v1alpha1.HyperbytedbClusterSpec) } func writeCardinalitySection(b *strings.Builder, spec *v1alpha1.HyperbytedbClusterSpec) { - if spec.Cardinality.MaxTagValuesPerMeasurement == 0 && spec.Cardinality.MaxMeasurementsPerDatabase == 0 { - return - } - b.WriteString("\n[cardinality]\n") + maxTag := int64(100_000) if spec.Cardinality.MaxTagValuesPerMeasurement > 0 { - fmt.Fprintf(b, "max_tag_values_per_measurement = %d\n", spec.Cardinality.MaxTagValuesPerMeasurement) + maxTag = spec.Cardinality.MaxTagValuesPerMeasurement } + maxMeas := int64(10_000) if spec.Cardinality.MaxMeasurementsPerDatabase > 0 { - fmt.Fprintf(b, "max_measurements_per_database = %d\n", spec.Cardinality.MaxMeasurementsPerDatabase) + maxMeas = spec.Cardinality.MaxMeasurementsPerDatabase } + b.WriteString("\n[cardinality]\n") + fmt.Fprintf(b, "max_tag_values_per_measurement = %d\n", maxTag) + fmt.Fprintf(b, "max_measurements_per_database = %d\n", maxMeas) } func writeLoggingSection(b *strings.Builder, spec *v1alpha1.HyperbytedbClusterSpec) { @@ -232,35 +189,48 @@ func writeLoggingSection(b *strings.Builder, spec *v1alpha1.HyperbytedbClusterSp format = spec.Logging.Format } fmt.Fprintf(b, "format = \"%s\"\n", format) + if spec.Logging.DetailedTrace != nil { + fmt.Fprintf(b, "detailed_trace = %t\n", *spec.Logging.DetailedTrace) + } + if spec.Logging.OtlpEndpoint != "" { + fmt.Fprintf(b, "otlp_endpoint = \"%s\"\n", spec.Logging.OtlpEndpoint) + } + if spec.Logging.OtlpSampleRatio != "" { + fmt.Fprintf(b, "otlp_sample_ratio = %s\n", spec.Logging.OtlpSampleRatio) + } } func writeStatementSummarySection(b *strings.Builder, spec *v1alpha1.HyperbytedbClusterSpec) { - if spec.StatementSummary.Enabled == nil && spec.StatementSummary.MaxEntries == 0 { - return - } - b.WriteString("\n[statement_summary]\n") + enabled := true if spec.StatementSummary.Enabled != nil { - fmt.Fprintf(b, "enabled = %t\n", *spec.StatementSummary.Enabled) + enabled = *spec.StatementSummary.Enabled } + maxEntries := int32(1000) if spec.StatementSummary.MaxEntries > 0 { - fmt.Fprintf(b, "max_entries = %d\n", spec.StatementSummary.MaxEntries) + maxEntries = spec.StatementSummary.MaxEntries } + b.WriteString("\n[statement_summary]\n") + fmt.Fprintf(b, "enabled = %t\n", enabled) + fmt.Fprintf(b, "max_entries = %d\n", maxEntries) } func writeHintedHandoffSection(b *strings.Builder, spec *v1alpha1.HyperbytedbClusterSpec) { - if spec.HintedHandoff.Enabled == nil && spec.HintedHandoff.MaxHintsPerPeer == 0 && spec.HintedHandoff.MaxHintAgeSecs == 0 { - return - } - b.WriteString("\n[hinted_handoff]\n") + enabled := true if spec.HintedHandoff.Enabled != nil { - fmt.Fprintf(b, "enabled = %t\n", *spec.HintedHandoff.Enabled) + enabled = *spec.HintedHandoff.Enabled } + maxHints := int64(100_000) if spec.HintedHandoff.MaxHintsPerPeer > 0 { - fmt.Fprintf(b, "max_hints_per_peer = %d\n", spec.HintedHandoff.MaxHintsPerPeer) + maxHints = spec.HintedHandoff.MaxHintsPerPeer } + maxAge := int64(3600) if spec.HintedHandoff.MaxHintAgeSecs > 0 { - fmt.Fprintf(b, "max_hint_age_secs = %d\n", spec.HintedHandoff.MaxHintAgeSecs) + maxAge = spec.HintedHandoff.MaxHintAgeSecs } + b.WriteString("\n[hinted_handoff]\n") + fmt.Fprintf(b, "enabled = %t\n", enabled) + fmt.Fprintf(b, "max_hints_per_peer = %d\n", maxHints) + fmt.Fprintf(b, "max_hint_age_secs = %d\n", maxAge) } func writeRateLimitSection(b *strings.Builder, spec *v1alpha1.HyperbytedbClusterSpec) { @@ -276,10 +246,27 @@ func writeRateLimitSection(b *strings.Builder, spec *v1alpha1.HyperbytedbCluster } } -func writeClusterSection(b *strings.Builder, spec *v1alpha1.HyperbytedbClusterSpec, clusterEnabled bool) { +func writeRetentionSection(b *strings.Builder, spec *v1alpha1.HyperbytedbClusterSpec) { + enabled := true + if spec.Retention.Enabled != nil { + enabled = *spec.Retention.Enabled + } + interval := "60s" + if spec.Retention.Interval != "" { + interval = spec.Retention.Interval + } + b.WriteString("\n[retention]\n") + fmt.Fprintf(b, "enabled = %t\n", enabled) + fmt.Fprintf(b, "interval = \"%s\"\n", interval) +} + +func writeClusterSection(b *strings.Builder, cluster *v1alpha1.HyperbytedbCluster, clusterEnabled bool) { + spec := &cluster.Spec b.WriteString("\n[cluster]\n") fmt.Fprintf(b, "enabled = %t\n", clusterEnabled) - b.WriteString("replication_log_dir = \"/var/lib/hyperbytedb/replication_log\"\n") + b.WriteString("peers = \"\"\n") + b.WriteString(fmt.Sprintf("replication_log_dir = \"%s\"\n", defaultReplLogDir)) + b.WriteString(fmt.Sprintf("raft_dir = \"%s\"\n", defaultRaftDir)) heartbeatInterval := int32(2) if spec.Cluster.HeartbeatIntervalSecs > 0 { @@ -293,63 +280,51 @@ func writeClusterSection(b *strings.Builder, spec *v1alpha1.HyperbytedbClusterSp } fmt.Fprintf(b, "heartbeat_miss_threshold = %d\n", heartbeatMiss) - aeEnabled := true - if spec.Cluster.AntiEntropyEnabled != nil { - aeEnabled = *spec.Cluster.AntiEntropyEnabled - } - fmt.Fprintf(b, "anti_entropy_enabled = %t\n", aeEnabled) - - aeInterval := int32(60) - if spec.Cluster.AntiEntropyIntervalSecs > 0 { - aeInterval = spec.Cluster.AntiEntropyIntervalSecs - } - fmt.Fprintf(b, "anti_entropy_interval_secs = %d\n", aeInterval) - - syncFiles := int32(4) - if spec.Cluster.SyncMaxConcurrentFiles > 0 { - syncFiles = spec.Cluster.SyncMaxConcurrentFiles - } - fmt.Fprintf(b, "sync_max_concurrent_files = %d\n", syncFiles) - replRetries := int32(5) if spec.Cluster.ReplicationMaxRetries > 0 { replRetries = spec.Cluster.ReplicationMaxRetries } fmt.Fprintf(b, "replication_max_retries = %d\n", replRetries) + replQueue := int32(8192) if spec.Cluster.ReplicationQueueDepth > 0 { - fmt.Fprintf(b, "replication_queue_depth = %d\n", spec.Cluster.ReplicationQueueDepth) + replQueue = spec.Cluster.ReplicationQueueDepth } + fmt.Fprintf(b, "replication_queue_depth = %d\n", replQueue) + + replInflight := int32(8) if spec.Cluster.ReplicationMaxInflightBatches > 0 { - fmt.Fprintf(b, "replication_max_inflight_batches = %d\n", spec.Cluster.ReplicationMaxInflightBatches) + replInflight = spec.Cluster.ReplicationMaxInflightBatches } + fmt.Fprintf(b, "replication_max_inflight_batches = %d\n", replInflight) + + replCoalesce := int64(8 * 1024 * 1024) if spec.Cluster.ReplicationMaxCoalesceBodyBytes > 0 { - fmt.Fprintf(b, "replication_max_coalesce_body_bytes = %d\n", spec.Cluster.ReplicationMaxCoalesceBodyBytes) + replCoalesce = spec.Cluster.ReplicationMaxCoalesceBodyBytes } + fmt.Fprintf(b, "replication_max_coalesce_body_bytes = %d\n", replCoalesce) + + recvQueue := int32(1024) if spec.Cluster.ReplicateReceiverQueueDepth > 0 { - fmt.Fprintf(b, "replicate_receiver_queue_depth = %d\n", spec.Cluster.ReplicateReceiverQueueDepth) + recvQueue = spec.Cluster.ReplicateReceiverQueueDepth } + fmt.Fprintf(b, "replicate_receiver_queue_depth = %d\n", recvQueue) + + truncateMult := int64(2) if spec.Cluster.ReplicationTruncateStalePeerMultiplier > 0 { - fmt.Fprintf(b, "replication_truncate_stale_peer_multiplier = %d\n", spec.Cluster.ReplicationTruncateStalePeerMultiplier) + truncateMult = spec.Cluster.ReplicationTruncateStalePeerMultiplier } + fmt.Fprintf(b, "replication_truncate_stale_peer_multiplier = %d\n", truncateMult) - raftHB := int32(300) if spec.Cluster.RaftHeartbeatIntervalMs > 0 { - raftHB = spec.Cluster.RaftHeartbeatIntervalMs + fmt.Fprintf(b, "raft_heartbeat_interval_ms = %d\n", spec.Cluster.RaftHeartbeatIntervalMs) } - fmt.Fprintf(b, "raft_heartbeat_interval_ms = %d\n", raftHB) - - raftElection := int32(1000) if spec.Cluster.RaftElectionTimeoutMs > 0 { - raftElection = spec.Cluster.RaftElectionTimeoutMs + fmt.Fprintf(b, "raft_election_timeout_ms = %d\n", spec.Cluster.RaftElectionTimeoutMs) } - fmt.Fprintf(b, "raft_election_timeout_ms = %d\n", raftElection) - - raftSnapshot := int32(1000) if spec.Cluster.RaftSnapshotThreshold > 0 { - raftSnapshot = spec.Cluster.RaftSnapshotThreshold + fmt.Fprintf(b, "raft_snapshot_threshold = %d\n", spec.Cluster.RaftSnapshotThreshold) } - fmt.Fprintf(b, "raft_snapshot_threshold = %d\n", raftSnapshot) writeReplicationSubsections(b, spec.Cluster.Replication) } @@ -378,7 +353,6 @@ func writeMinAcks(b *strings.Builder, v *intstr.IntOrString) { case intstr.Int: fmt.Fprintf(b, "min_acks = %d\n", v.IntValue()) case intstr.String: - // Quote string forms (e.g. "majority") so TOML treats them as strings. fmt.Fprintf(b, "min_acks = \"%s\"\n", v.StrVal) } } diff --git a/internal/hyperbytedb/configmap_test.go b/internal/hyperbytedb/configmap_test.go new file mode 100644 index 0000000..83c7dc2 --- /dev/null +++ b/internal/hyperbytedb/configmap_test.go @@ -0,0 +1,108 @@ +package hyperbytedb + +import ( + "strings" + "testing" + + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/ptr" + + v1alpha1 "github.com/hyperbyte-cloud/hyperbytedb-operator/api/v1alpha1" +) + +func TestRenderConfigTOML_singleNode(t *testing.T) { + cluster := &v1alpha1.HyperbytedbCluster{ + Spec: v1alpha1.HyperbytedbClusterSpec{ + Replicas: ptr.To(int32(1)), + Server: v1alpha1.ServerSpec{ + Port: 9090, + }, + }, + } + + out := renderConfigTOML(cluster) + + for _, want := range []string{ + "[server]", + "port = 9090", + "[storage]", + `wal_dir = "/var/lib/hyperbytedb/wal"`, + `meta_dir = "/var/lib/hyperbytedb/meta"`, + "[cluster]", + "enabled = false", + "[retention]", + `interval = "60s"`, + } { + if !strings.Contains(out, want) { + t.Fatalf("expected config to contain %q\n\ngot:\n%s", want, out) + } + } + + for _, absent := range []string{ + "[compaction]", + "[server.tls]", + "data_dir", + "anti_entropy", + "wal_replication", + } { + if strings.Contains(out, absent) { + t.Fatalf("expected config NOT to contain %q\n\ngot:\n%s", absent, out) + } + } +} + +func TestRenderConfigTOML_clusterAndTLS(t *testing.T) { + cluster := &v1alpha1.HyperbytedbCluster{ + Spec: v1alpha1.HyperbytedbClusterSpec{ + Replicas: ptr.To(int32(3)), + Server: v1alpha1.ServerSpec{ + TLS: &v1alpha1.TLSSpec{Enabled: true}, + }, + Cluster: v1alpha1.ClusterTuningSpec{ + Replication: &v1alpha1.ReplicationSpec{ + Mode: "sync_quorum", + AckTimeoutMs: 3000, + SyncQuorum: &v1alpha1.SyncQuorumSpec{ + MinAcks: ptr.To(intstr.FromString("majority")), + }, + }, + }, + Retention: v1alpha1.RetentionSpec{ + Enabled: ptr.To(false), + Interval: "5m", + }, + }, + } + + out := renderConfigTOML(cluster) + + for _, want := range []string{ + "enabled = true", + "tls_enabled = true", + `tls_cert_path = "/etc/hyperbytedb/tls/tls.crt"`, + `tls_key_path = "/etc/hyperbytedb/tls/tls.key"`, + `mode = "sync_quorum"`, + "ack_timeout_ms = 3000", + `min_acks = "majority"`, + "enabled = false", + `interval = "5m"`, + } { + if !strings.Contains(out, want) { + t.Fatalf("expected config to contain %q\n\ngot:\n%s", want, out) + } + } +} + +func TestConfigHash_ignoresReplicaCount(t *testing.T) { + base := v1alpha1.HyperbytedbClusterSpec{ + Server: v1alpha1.ServerSpec{Port: 8086}, + } + one := &v1alpha1.HyperbytedbCluster{Spec: base} + one.Spec.Replicas = ptr.To(int32(1)) + three := &v1alpha1.HyperbytedbCluster{Spec: base} + three.Spec.Replicas = ptr.To(int32(3)) + + if ConfigHash(one) != ConfigHash(three) { + t.Fatal("ConfigHash must not change when only replica count changes") + } +} diff --git a/internal/hyperbytedb/statefulset.go b/internal/hyperbytedb/statefulset.go index b46efb1..ae4dfe0 100644 --- a/internal/hyperbytedb/statefulset.go +++ b/internal/hyperbytedb/statefulset.go @@ -129,47 +129,22 @@ exec hyperbytedb --config /etc/hyperbytedb/config.toml serve } // --------------- env vars --------------- + chdbPath := defaultChdbSessionPath + if cluster.Spec.ChDB.SessionDataPath != "" { + chdbPath = cluster.Spec.ChDB.SessionDataPath + } env := []corev1.EnvVar{ - {Name: "HYPERBYTEDB__STORAGE__DATA_DIR", Value: "/var/lib/hyperbytedb/data"}, - {Name: "HYPERBYTEDB__STORAGE__WAL_DIR", Value: "/var/lib/hyperbytedb/wal"}, - {Name: "HYPERBYTEDB__STORAGE__META_DIR", Value: "/var/lib/hyperbytedb/meta"}, - {Name: "HYPERBYTEDB__CHDB__SESSION_DATA_PATH", Value: "/var/lib/hyperbytedb/chdb"}, + {Name: "HYPERBYTEDB__STORAGE__WAL_DIR", Value: defaultWalDir}, + {Name: "HYPERBYTEDB__STORAGE__META_DIR", Value: defaultMetaDir}, + {Name: "HYPERBYTEDB__CHDB__SESSION_DATA_PATH", Value: chdbPath}, + } + if cluster.Spec.Logging.OtlpEndpoint != "" { + env = append(env, corev1.EnvVar{Name: "OTEL_SERVICE_NAME", Value: cluster.Name}) } // Cluster mode and paths come from mounted config.toml (hot-updated on scale); // avoid replica-dependent env vars so the pod template stays stable across scale events. - if cluster.Spec.Storage.S3 != nil && cluster.Spec.Storage.S3.CredentialsSecretName != "" { - env = append(env, - corev1.EnvVar{ - Name: "HYPERBYTEDB__STORAGE__S3__ACCESS_KEY_ID", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{Name: cluster.Spec.Storage.S3.CredentialsSecretName}, - Key: "access_key_id", - }, - }, - }, - corev1.EnvVar{ - Name: "HYPERBYTEDB__STORAGE__S3__SECRET_ACCESS_KEY", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{Name: cluster.Spec.Storage.S3.CredentialsSecretName}, - Key: "secret_access_key", - }, - }, - }, - ) - } - - if cluster.Spec.Server.TLS != nil && cluster.Spec.Server.TLS.Enabled { - env = append(env, - corev1.EnvVar{Name: "HYPERBYTEDB__SERVER__TLS__ENABLED", Value: "true"}, - corev1.EnvVar{Name: "HYPERBYTEDB__SERVER__TLS__CERT_FILE", Value: "/etc/hyperbytedb/tls/tls.crt"}, - corev1.EnvVar{Name: "HYPERBYTEDB__SERVER__TLS__KEY_FILE", Value: "/etc/hyperbytedb/tls/tls.key"}, - ) - } - // --------------- volume mounts --------------- volumeMounts := []corev1.VolumeMount{ {Name: "data", MountPath: "/var/lib/hyperbytedb"},