diff --git a/.githooks/pre-commit b/.githooks/pre-commit new file mode 100755 index 0000000..ff23063 --- /dev/null +++ b/.githooks/pre-commit @@ -0,0 +1,42 @@ +#!/usr/bin/env bash +# Run rustfmt and clippy before each commit (same checks as CI). +# +# Install once per clone: +# sh scripts/install-git-hooks.sh +# +# Bypass for a single commit: +# git commit --no-verify +set -euo pipefail + +REPO_ROOT="$(git rev-parse --show-toplevel)" +cd "${REPO_ROOT}" + +if [[ -f "${HOME}/.cargo/env" ]]; then + # shellcheck disable=SC1091 + source "${HOME}/.cargo/env" +fi + +if ! command -v cargo >/dev/null 2>&1; then + echo "pre-commit: cargo not found on PATH (install Rust or source ~/.cargo/env)" >&2 + exit 1 +fi + +CHDB_RUST_PATH="$(dirname "${REPO_ROOT}")/chdb-rust" +if [[ ! -f "${CHDB_RUST_PATH}/Cargo.toml" ]]; then + echo "pre-commit: checking out chdb-rust dependency..." + bash "${REPO_ROOT}/scripts/checkout-chdb-rust.sh" +fi + +rustup component add rustfmt clippy >/dev/null 2>&1 || true + +echo "pre-commit: cargo fmt --check" +if ! cargo fmt --check; then + echo "pre-commit: formatting failed; run 'cargo fmt' and restage" >&2 + exit 1 +fi + +echo "pre-commit: cargo clippy --all-targets -- -D warnings" +if ! cargo clippy --all-targets -- -D warnings; then + echo "pre-commit: clippy failed; fix warnings before committing" >&2 + exit 1 +fi diff --git a/.gitignore b/.gitignore index 05ed780..65a92e6 100644 --- a/.gitignore +++ b/.gitignore @@ -113,4 +113,5 @@ Desktop.ini .cursorrules .cursor/ hyperbytedb-operator -influx-multiplay \ No newline at end of file +influx-multiplay +chdb-rust \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 7e115eb..77bd22b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1909,6 +1909,7 @@ dependencies = [ "hyper-util", "hyperbytedb", "hyperlocal", + "parking_lot", "percent-encoding", "reqwest 0.12.28", "rpassword", diff --git a/Dockerfile b/Dockerfile index 1b71708..245abaf 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,7 +4,7 @@ FROM debian:bookworm-slim AS builder RUN apt-get update && apt-get install -y --no-install-recommends \ curl ca-certificates build-essential pkg-config \ clang llvm-dev libclang-dev \ - pkg-config libssl-dev \ + pkg-config libssl-dev git \ && rm -rf /var/lib/apt/lists/* # Install rustup itself, but not a toolchain — the actual rustc/cargo version @@ -26,9 +26,11 @@ RUN curl -sL https://lib.chdb.io | bash # docker build -f hyperbytedb/Dockerfile WORKDIR /build -# chdb-rust path dependency. Copied to /build/chdb-rust so `../../chdb-rust` -# from /build/hyperbytedb/hyperbytedb resolves correctly. -COPY chdb-rust /build/chdb-rust +# chdb-rust path dependency (`../../chdb-rust` from hyperbytedb/hyperbytedb). +# Clone during the image build so `docker compose up` works without a local checkout. +ARG CHDB_RUST_REPO=https://github.com/hyperbyte-cloud/chdb-rust.git +ARG CHDB_RUST_REF=feat_arrow_insert +RUN git clone --depth 1 --branch "${CHDB_RUST_REF}" "${CHDB_RUST_REPO}" /build/chdb-rust # Pinned toolchain spec must be in place before any cargo invocation so # rustup auto-installs the right version on first use. diff --git a/hyperbytedb-cli/Cargo.toml b/hyperbytedb-cli/Cargo.toml index 319e36f..d60a6ec 100644 --- a/hyperbytedb-cli/Cargo.toml +++ b/hyperbytedb-cli/Cargo.toml @@ -17,6 +17,7 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" serde_urlencoded = "0.7" rustyline = { version = "15", features = ["derive"] } +parking_lot = "0.12" comfy-table = "7" anyhow = "1" thiserror = "2" diff --git a/hyperbytedb-cli/src/output/mod.rs b/hyperbytedb-cli/src/output/mod.rs index f0e20f9..f124789 100644 --- a/hyperbytedb-cli/src/output/mod.rs +++ b/hyperbytedb-cli/src/output/mod.rs @@ -1,9 +1,31 @@ -use comfy_table::{Cell, Table}; +use std::collections::HashMap; +use std::io::IsTerminal; + +use comfy_table::presets::UTF8_NO_BORDERS; +use comfy_table::{Attribute, Cell, Color, ContentArrangement, Table}; use serde_json::Value; use crate::client::QueryResponse; use crate::session::OutputFormat; +struct DisplayStyle { + color: bool, +} + +impl DisplayStyle { + fn detect() -> Self { + Self { + color: stdout_supports_color(), + } + } +} + +fn stdout_supports_color() -> bool { + std::io::stdout().is_terminal() + && std::env::var("NO_COLOR").is_err() + && std::env::var("HYPERBYTEDB_NO_COLOR").is_err() +} + pub fn format_response(response: &QueryResponse, format: OutputFormat, pretty: bool) -> String { match format { OutputFormat::Json => format_json(response, pretty), @@ -26,52 +48,184 @@ pub fn format_json(response: &QueryResponse, pretty: bool) -> String { } pub fn format_column(response: &QueryResponse) -> String { + let style = DisplayStyle::detect(); let mut out = String::new(); - for result in &response.results { + + for (result_idx, result) in response.results.iter().enumerate() { + if result_idx > 0 { + out.push('\n'); + } + if let Some(ref err) = result.error { - out.push_str(&format!("ERR: {err}\n")); + out.push_str(&format_error(err, &style)); continue; } + let Some(ref series_list) = result.series else { continue; }; + if series_list.is_empty() { - out.push('\n'); + out.push_str(&format_empty_result(&style)); continue; } - for series in series_list { - if !series.name.is_empty() { - out.push_str(&format!("name: {}\n", series.name)); - } - if let Some(ref tags) = series.tags - && !tags.is_empty() - { - let tag_str: Vec = tags.iter().map(|(k, v)| format!("{k}={v}")).collect(); - out.push_str(&format!("tags: {}\n", tag_str.join(", "))); + + for (series_idx, series) in series_list.iter().enumerate() { + if series_idx > 0 { + out.push('\n'); } + out.push_str(&format_series_header( + &series.name, + series.tags.as_ref(), + &style, + )); + out.push('\n'); + let mut table = Table::new(); - table.set_header(series.columns.iter().map(|c| c.as_str())); + table + .load_preset(UTF8_NO_BORDERS) + .set_content_arrangement(ContentArrangement::Dynamic); + table.set_header( + series + .columns + .iter() + .map(|column| styled_column_header(column, &style)), + ); for row in &series.values { - table.add_row(row.iter().map(value_cell)); + table.add_row( + series + .columns + .iter() + .zip(row.iter()) + .map(|(column, value)| value_cell(value, column, &style)), + ); } - out.push_str(&format!("{table}\n")); + out.push_str(&format!("{table}")); + if series.partial == Some(true) { - out.push_str( - "(partial results: response was truncated — increase chunk size or narrow the query)\n", - ); + out.push('\n'); + out.push_str(&format_partial_notice(&style)); } } } + + if !out.is_empty() && !out.ends_with('\n') { + out.push('\n'); + } + out +} + +fn format_series_header( + name: &str, + tags: Option<&HashMap>, + style: &DisplayStyle, +) -> String { + let mut out = String::new(); + if style.color { + out.push_str("\x1b[1;34m"); + } + if name.is_empty() { + out.push('·'); + } else { + out.push_str(name); + } + if style.color { + out.push_str("\x1b[0m"); + } + + if let Some(tags) = tags + && !tags.is_empty() + { + let mut pairs: Vec<_> = tags.iter().collect(); + pairs.sort_by(|(a, _), (b, _)| a.cmp(b)); + if style.color { + out.push_str("\x1b[2;36m "); + } else { + out.push_str(" "); + } + let rendered: Vec = pairs + .into_iter() + .map(|(key, value)| { + if style.color { + format!("\x1b[1;36m{key}\x1b[0m\x1b[2;36m={value}\x1b[0m") + } else { + format!("{key}={value}") + } + }) + .collect(); + out.push_str(&rendered.join(", ")); + if style.color { + out.push_str("\x1b[0m"); + } + } out } -fn value_cell(v: &Value) -> Cell { - match v { - Value::Null => Cell::new(""), - Value::String(s) => Cell::new(s), - Value::Number(n) => Cell::new(n.to_string()), - Value::Bool(b) => Cell::new(b.to_string()), - other => Cell::new(other.to_string()), +fn styled_column_header(column: &str, style: &DisplayStyle) -> Cell { + let mut cell = Cell::new(column); + if style.color { + cell = cell.fg(Color::Cyan).add_attribute(Attribute::Bold); + } + cell +} + +fn value_cell(value: &Value, column: &str, style: &DisplayStyle) -> Cell { + let text = value_text(value); + let mut cell = Cell::new(&text); + + if !style.color { + return cell; + } + + match value { + Value::Null => cell = cell.fg(Color::DarkGrey), + Value::Bool(_) => cell = cell.fg(Color::Yellow), + Value::Number(_) => cell = cell.fg(Color::Green), + Value::String(_) if is_time_column(column) => cell = cell.fg(Color::Magenta), + Value::String(_) => {} + _ => cell = cell.fg(Color::Grey), + } + + cell +} + +fn is_time_column(column: &str) -> bool { + column.eq_ignore_ascii_case("time") || column.ends_with("_time") +} + +fn value_text(value: &Value) -> String { + match value { + Value::Null => String::new(), + Value::String(s) => s.clone(), + Value::Number(n) => n.to_string(), + Value::Bool(b) => b.to_string(), + other => other.to_string(), + } +} + +fn format_error(err: &str, style: &DisplayStyle) -> String { + if style.color { + format!("\x1b[1;31m✗ {err}\x1b[0m\n") + } else { + format!("ERR: {err}\n") + } +} + +fn format_empty_result(style: &DisplayStyle) -> String { + if style.color { + "\x1b[2m(no results)\x1b[0m\n".to_string() + } else { + "(no results)\n".to_string() + } +} + +fn format_partial_notice(style: &DisplayStyle) -> String { + let message = + "(partial results: response was truncated — increase chunk size or narrow the query)"; + if style.color { + format!("\x1b[33m{message}\x1b[0m\n") + } else { + format!("{message}\n") } } @@ -98,6 +252,33 @@ mod tests { let out = format_column(&resp); assert!(out.contains("mydb")); assert!(out.contains("name")); + assert!(!out.contains("││")); + } + + #[test] + fn column_format_includes_series_header() { + let mut tags = HashMap::new(); + tags.insert("host".to_string(), "srv1".to_string()); + let resp = QueryResponse { + results: vec![StatementResult { + statement_id: 1, + series: Some(vec![SeriesResult { + name: "cpu".to_string(), + tags: Some(tags), + columns: vec!["time".to_string(), "value".to_string()], + values: vec![vec![ + Value::String("2024-01-01T00:00:00Z".to_string()), + Value::Number(42.into()), + ]], + partial: None, + }]), + error: None, + }], + }; + let out = format_column(&resp); + assert!(out.contains("cpu")); + assert!(out.contains("host=srv1")); + assert!(out.contains("42")); } #[test] diff --git a/hyperbytedb-cli/src/repl/complete.rs b/hyperbytedb-cli/src/repl/complete.rs new file mode 100644 index 0000000..bbc69bd --- /dev/null +++ b/hyperbytedb-cli/src/repl/complete.rs @@ -0,0 +1,443 @@ +use std::sync::Arc; + +use parking_lot::RwLock; +use rustyline::completion::{Completer, Pair}; +use rustyline::highlight::Highlighter; +use rustyline::hint::Hinter; +use rustyline::validate::Validator; +use rustyline::{Context, Helper, Result as RustyResult}; + +use crate::client::{HyperbytedbClient, QueryOptions, QueryResponse}; +use crate::session::OutputFormat; + +const META_COMMANDS: &[&str] = &[ + "auth", + "chunk", + "clear", + "connect", + "consistency", + "exit", + "format", + "help", + "history", + "insert", + "precision", + "pretty", + "quit", + "settings", + "timing", + "use", +]; + +const SQL_VERBS: &[&str] = &[ + "ALTER", "CREATE", "DELETE", "DROP", "EXPLAIN", "GRANT", "REVOKE", "SELECT", "SHOW", +]; + +const FORMAT_VALUES: &[&str] = &["column", "csv", "json"]; +const CLEAR_TARGETS: &[&str] = &["database", "db", "retention", "rp"]; +const CONSISTENCY_VALUES: &[&str] = &["all", "any", "one", "quorum"]; +const PRECISION_VALUES: &[&str] = &["h", "m", "ms", "ns", "s", "u"]; +const SHOW_OBJECTS: &[&str] = &[ + "CONTINUOUS", + "DATABASES", + "FIELD", + "MATERIALIZED", + "MEASUREMENTS", + "RETENTION", + "SERIES", + "TAG", + "USERS", +]; +const TAG_SUBCOMMANDS: &[&str] = &["KEYS", "VALUES"]; +const SHOW_MODIFIERS: &[&str] = &["FROM", "ON", "WITH"]; +const CREATE_OBJECTS: &[&str] = &[ + "CONTINUOUS", + "DATABASE", + "MATERIALIZED", + "RETENTION", + "USER", +]; +const CREATE_TAIL: &[&str] = &["POLICY", "QUERY", "VIEW"]; +const DROP_OBJECTS: &[&str] = &[ + "CONTINUOUS", + "DATABASE", + "MATERIALIZED", + "MEASUREMENT", + "RETENTION", + "USER", +]; +const SELECT_KEYWORDS: &[&str] = &["FROM", "GROUP", "INTO", "LIMIT", "ORDER", "SELECT", "WHERE"]; +const SQL_KEYWORDS: &[&str] = &[ + "AS", "BY", "FROM", "GROUP", "INTO", "LIMIT", "ON", "ORDER", "WHERE", "WITH", +]; + +#[derive(Default)] +pub struct CompletionCache { + databases: Vec, + measurements: Vec, +} + +impl CompletionCache { + pub fn clear_measurements(&mut self) { + self.measurements.clear(); + } +} + +pub async fn refresh_databases_cache( + cache: &Arc>, + client: &HyperbytedbClient, +) -> crate::error::Result<()> { + let databases = fetch_databases(client).await?; + cache.write().databases = databases; + Ok(()) +} + +pub async fn refresh_measurements_cache( + cache: &Arc>, + client: &HyperbytedbClient, + db: &str, +) -> crate::error::Result<()> { + let measurements = fetch_measurements(client, db).await?; + cache.write().measurements = measurements; + Ok(()) +} + +pub fn clear_measurements_cache(cache: &Arc>) { + cache.write().clear_measurements(); +} + +async fn fetch_databases(client: &HyperbytedbClient) -> crate::error::Result> { + let resp = client + .query( + "SHOW DATABASES", + &QueryOptions { + db: None, + epoch: None, + pretty: false, + chunked: false, + chunk_size: None, + format: OutputFormat::Json, + params: None, + }, + ) + .await?; + Ok(first_column_values(&resp)) +} + +async fn fetch_measurements( + client: &HyperbytedbClient, + db: &str, +) -> crate::error::Result> { + let resp = client + .query( + "SHOW MEASUREMENTS", + &QueryOptions { + db: Some(db.to_string()), + epoch: None, + pretty: false, + chunked: false, + chunk_size: None, + format: OutputFormat::Json, + params: None, + }, + ) + .await?; + Ok(first_column_values(&resp)) +} + +pub struct CliHelper { + cache: Arc>, +} + +impl CliHelper { + pub fn new(cache: Arc>) -> Self { + Self { cache } + } +} + +impl Helper for CliHelper {} +impl Highlighter for CliHelper {} +impl Hinter for CliHelper { + type Hint = String; +} +impl Validator for CliHelper {} + +impl Completer for CliHelper { + type Candidate = Pair; + + fn complete( + &self, + line: &str, + pos: usize, + _ctx: &Context<'_>, + ) -> RustyResult<(usize, Vec)> { + let cache = self.cache.read(); + Ok(complete_line(line, pos, &cache)) + } +} + +fn complete_line(line: &str, pos: usize, cache: &CompletionCache) -> (usize, Vec) { + let (start, tokens, word) = parse_context(line, pos); + let candidates = if tokens.is_empty() { + complete_first_word(&word) + } else if is_meta_command_context(&tokens) { + complete_meta(&tokens, &word, cache) + } else { + complete_sql(&tokens, &word, cache) + }; + (start, candidates) +} + +fn parse_context(line: &str, pos: usize) -> (usize, Vec, String) { + let safe_pos = pos.min(line.len()); + let before = &line[..safe_pos]; + let start = before + .char_indices() + .rfind(|(_, c)| c.is_whitespace()) + .map(|(idx, c)| idx + c.len_utf8()) + .unwrap_or(0); + let word = line[start..safe_pos].to_string(); + let prefix = line[..start].trim(); + let tokens = prefix + .split_whitespace() + .map(ToString::to_string) + .collect::>(); + (start, tokens, word) +} + +fn complete_first_word(word: &str) -> Vec { + let mut out = filter_prefix(META_COMMANDS, word, false); + out.extend(filter_prefix(SQL_VERBS, word, true)); + out.sort_by(|a, b| { + a.display + .to_ascii_lowercase() + .cmp(&b.display.to_ascii_lowercase()) + }); + out +} + +fn is_meta_command_context(tokens: &[String]) -> bool { + matches!( + tokens[0].to_ascii_lowercase().as_str(), + "auth" + | "chunk" + | "clear" + | "connect" + | "consistency" + | "exit" + | "format" + | "help" + | "history" + | "insert" + | "precision" + | "pretty" + | "quit" + | "settings" + | "timing" + | "use" + ) +} + +fn complete_meta(tokens: &[String], word: &str, cache: &CompletionCache) -> Vec { + match tokens[0].to_ascii_lowercase().as_str() { + "format" if tokens.len() == 1 => filter_prefix(FORMAT_VALUES, word, false), + "clear" if tokens.len() == 1 => filter_prefix(CLEAR_TARGETS, word, false), + "consistency" if tokens.len() == 1 => filter_prefix(CONSISTENCY_VALUES, word, false), + "precision" if tokens.len() == 1 => filter_prefix(PRECISION_VALUES, word, false), + "use" if tokens.len() == 1 => filter_owned(&cache.databases, word), + "chunk" if tokens.len() == 1 => filter_prefix(&["size"], word, false), + "connect" | "insert" | "auth" | "help" | "history" | "settings" | "pretty" | "timing" + | "exit" | "quit" => Vec::new(), + _ if tokens.len() == 1 => complete_first_word(word), + _ => Vec::new(), + } +} + +fn complete_sql(tokens: &[String], word: &str, cache: &CompletionCache) -> Vec { + match tokens[0].to_ascii_uppercase().as_str() { + "SHOW" => complete_show(&tokens[1..], word, cache), + "CREATE" => complete_create(&tokens[1..], word), + "DROP" => complete_drop(&tokens[1..], word), + "SELECT" | "DELETE" => complete_from_context(&tokens[1..], word, cache), + _ => filter_prefix(SQL_KEYWORDS, word, true), + } +} + +fn complete_show(rest: &[String], word: &str, cache: &CompletionCache) -> Vec { + if rest.is_empty() { + return filter_prefix(SHOW_OBJECTS, word, true); + } + if rest.len() == 1 && rest[0].eq_ignore_ascii_case("TAG") { + return filter_prefix(TAG_SUBCOMMANDS, word, true); + } + if rest.len() >= 2 && rest[0].eq_ignore_ascii_case("TAG") { + if rest.len() == 2 { + return filter_prefix(TAG_SUBCOMMANDS, word, true); + } + return complete_name_context(rest, word, cache); + } + if rest.len() == 1 && rest[0].eq_ignore_ascii_case("FIELD") { + return filter_prefix(&["KEYS"], word, true); + } + if rest.len() == 1 + && (rest[0].eq_ignore_ascii_case("CONTINUOUS") + || rest[0].eq_ignore_ascii_case("MATERIALIZED")) + { + return filter_prefix(&["QUERIES", "VIEWS"], word, true); + } + if rest.len() == 1 && rest[0].eq_ignore_ascii_case("RETENTION") { + return filter_prefix(&["POLICIES"], word, true); + } + complete_name_context(rest, word, cache) +} + +fn complete_create(rest: &[String], word: &str) -> Vec { + if rest.is_empty() { + return filter_prefix(CREATE_OBJECTS, word, true); + } + if rest.len() == 1 && rest[0].eq_ignore_ascii_case("RETENTION") { + return filter_prefix(&["POLICY"], word, true); + } + if rest.len() == 1 && rest[0].eq_ignore_ascii_case("CONTINUOUS") { + return filter_prefix(&["QUERY"], word, true); + } + if rest.len() == 1 && rest[0].eq_ignore_ascii_case("MATERIALIZED") { + return filter_prefix(&["VIEW"], word, true); + } + if rest.len() >= 2 + && rest[0].eq_ignore_ascii_case("RETENTION") + && rest[1].eq_ignore_ascii_case("POLICY") + { + return filter_prefix(&["ON"], word, true); + } + filter_prefix(CREATE_TAIL, word, true) +} + +fn complete_drop(rest: &[String], word: &str) -> Vec { + if rest.is_empty() { + return filter_prefix(DROP_OBJECTS, word, true); + } + if rest.len() == 1 && rest[0].eq_ignore_ascii_case("RETENTION") { + return filter_prefix(&["POLICY"], word, true); + } + if rest.len() == 1 && rest[0].eq_ignore_ascii_case("CONTINUOUS") { + return filter_prefix(&["QUERY"], word, true); + } + if rest.len() == 1 && rest[0].eq_ignore_ascii_case("MATERIALIZED") { + return filter_prefix(&["VIEW"], word, true); + } + filter_prefix(SQL_KEYWORDS, word, true) +} + +fn complete_from_context(rest: &[String], word: &str, cache: &CompletionCache) -> Vec { + if rest.last().is_some_and(|t| t.eq_ignore_ascii_case("FROM")) { + return filter_owned(&cache.measurements, word); + } + let mut out = filter_prefix(SELECT_KEYWORDS, word, true); + if rest.is_empty() { + out.extend(filter_prefix(&["*"], word, true)); + } + out +} + +fn complete_name_context(rest: &[String], word: &str, cache: &CompletionCache) -> Vec { + if rest + .last() + .is_some_and(|t| t.eq_ignore_ascii_case("FROM") || t.eq_ignore_ascii_case("ON")) + { + return filter_owned(&cache.measurements, word); + } + filter_prefix(SHOW_MODIFIERS, word, true) +} + +fn filter_prefix(candidates: &[&str], word: &str, uppercase: bool) -> Vec { + let needle = word.to_ascii_lowercase(); + candidates + .iter() + .filter(|candidate| candidate.to_ascii_lowercase().starts_with(&needle)) + .map(|candidate| pair(candidate, uppercase)) + .collect() +} + +fn filter_owned(candidates: &[String], word: &str) -> Vec { + let needle = word.to_ascii_lowercase(); + candidates + .iter() + .filter(|candidate| candidate.to_ascii_lowercase().starts_with(&needle)) + .map(|candidate| pair(candidate, false)) + .collect() +} + +fn pair(text: &str, uppercase: bool) -> Pair { + let replacement = if uppercase { + text.to_ascii_uppercase() + } else { + text.to_string() + }; + Pair { + display: replacement.clone(), + replacement, + } +} + +fn first_column_values(response: &QueryResponse) -> Vec { + let mut out = Vec::new(); + for result in &response.results { + let Some(series_list) = &result.series else { + continue; + }; + for series in series_list { + for row in &series.values { + if let Some(value) = row.first().and_then(|v| v.as_str()) { + out.push(value.to_string()); + } + } + } + } + out.sort(); + out.dedup(); + out +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn completes_meta_command_prefix() { + let cache = CompletionCache::default(); + let (_, candidates) = complete_line("fo", 2, &cache); + assert!(candidates.iter().any(|c| c.replacement == "format")); + } + + #[test] + fn completes_format_values() { + let cache = CompletionCache::default(); + let (_, candidates) = complete_line("format j", 8, &cache); + assert!(candidates.iter().any(|c| c.replacement == "json")); + } + + #[test] + fn completes_show_keywords() { + let cache = CompletionCache::default(); + let (_, candidates) = complete_line("SHOW M", 6, &cache); + assert!(candidates.iter().any(|c| c.replacement == "MEASUREMENTS")); + } + + #[test] + fn completes_show_tag_subcommands() { + let cache = CompletionCache::default(); + let (_, candidates) = complete_line("SHOW TAG K", 10, &cache); + assert!(candidates.iter().any(|c| c.replacement == "KEYS")); + } + + #[test] + fn completes_database_names_for_use() { + let cache = CompletionCache { + databases: vec!["metrics".to_string(), "telemetry".to_string()], + ..Default::default() + }; + let (_, candidates) = complete_line("use te", 6, &cache); + assert!(candidates.iter().any(|c| c.replacement == "telemetry")); + } +} diff --git a/hyperbytedb-cli/src/repl/meta.rs b/hyperbytedb-cli/src/repl/meta.rs index cf4246b..5e6ba36 100644 --- a/hyperbytedb-cli/src/repl/meta.rs +++ b/hyperbytedb-cli/src/repl/meta.rs @@ -1,7 +1,12 @@ use crate::client::{HyperbytedbClient, WriteOptions}; use crate::config::resolve_host; use crate::error::{CliError, Result}; +use crate::repl::complete::{ + CompletionCache, clear_measurements_cache, refresh_measurements_cache, +}; use crate::session::{OutputFormat, Session}; +use parking_lot::RwLock; +use std::sync::Arc; pub enum MetaAction { Continue, @@ -13,6 +18,7 @@ pub async fn handle_meta( session: &mut Session, client: &mut HyperbytedbClient, line: &str, + cache: &Arc>, ) -> Result { let lower = line.to_ascii_lowercase(); let parts: Vec<&str> = line.split_whitespace().collect(); @@ -83,12 +89,18 @@ pub async fn handle_meta( "Using database {}", session.database.as_deref().unwrap_or("") ); + if let Some(db) = session.effective_database() + && let Err(e) = refresh_measurements_cache(cache, client, db).await + { + eprintln!("warning: could not refresh measurement names for tab completion: {e}"); + } return Ok(MetaAction::Continue); } if parts.len() >= 2 && parts[0].eq_ignore_ascii_case("clear") { match parts[1].to_ascii_lowercase().as_str() { "database" | "db" => { session.clear_database(); + clear_measurements_cache(cache); println!("database cleared"); } "retention" | "rp" | "retention policy" => { @@ -192,6 +204,9 @@ fn print_help() { history Show history hint exit, quit Exit the shell +Press Tab to autocomplete meta-commands and TimeseriesQL keywords. +Database and measurement names are suggested when available. + DDL examples (sent to /query): CREATE MATERIALIZED VIEW "mv_5m" ON "db" AS SELECT mean("value") INTO "cpu_5m" FROM "cpu" GROUP BY time(5m), * diff --git a/hyperbytedb-cli/src/repl/mod.rs b/hyperbytedb-cli/src/repl/mod.rs index 894da6d..c28349e 100644 --- a/hyperbytedb-cli/src/repl/mod.rs +++ b/hyperbytedb-cli/src/repl/mod.rs @@ -1,8 +1,12 @@ +mod complete; mod meta; +use std::sync::Arc; use std::time::Instant; -use rustyline::DefaultEditor; +use parking_lot::RwLock; +use rustyline::Config; +use rustyline::Editor; use rustyline::error::ReadlineError; use crate::client::{HyperbytedbClient, QueryOptions}; @@ -11,6 +15,7 @@ use crate::error::{CliError, Result}; use crate::output::format_response; use crate::session::Session; +use complete::{CliHelper, CompletionCache, refresh_databases_cache, refresh_measurements_cache}; use meta::{MetaAction, handle_meta, is_meta_command}; pub async fn run_repl(mut session: Session) -> Result<()> { @@ -32,7 +37,19 @@ pub async fn run_repl(mut session: Session) -> Result<()> { } let history_path = history_file_path(); - let mut rl = DefaultEditor::new().map_err(|e| CliError::Other(e.to_string()))?; + let cache = Arc::new(RwLock::new(CompletionCache::default())); + if let Err(e) = refresh_databases_cache(&cache, &client).await { + eprintln!("warning: could not load database names for tab completion: {e}"); + } + if let Some(db) = session.effective_database() + && let Err(e) = refresh_measurements_cache(&cache, &client, db).await + { + eprintln!("warning: could not load measurement names for tab completion: {e}"); + } + + let config = Config::builder().build(); + let mut rl = Editor::with_config(config).map_err(|e| CliError::Other(e.to_string()))?; + rl.set_helper(Some(CliHelper::new(cache.clone()))); let _ = rl.load_history(&history_path); loop { @@ -52,7 +69,7 @@ pub async fn run_repl(mut session: Session) -> Result<()> { let _ = rl.add_history_entry(trimmed); if is_meta_command(trimmed) { - match handle_meta(&mut session, &mut client, trimmed).await? { + match handle_meta(&mut session, &mut client, trimmed, &cache).await? { MetaAction::Exit => break, MetaAction::Continue | MetaAction::Executed => continue, } diff --git a/hyperbytedb/src/adapters/chdb/native_adapter.rs b/hyperbytedb/src/adapters/chdb/native_adapter.rs index fc5b420..921e8ea 100644 --- a/hyperbytedb/src/adapters/chdb/native_adapter.rs +++ b/hyperbytedb/src/adapters/chdb/native_adapter.rs @@ -47,6 +47,7 @@ use parking_lot::RwLock; use crate::adapters::chdb::catalog; use crate::adapters::chdb::session::{SharedSession, SyncSession}; +use crate::application::ingest_metadata::backfill_tag_metadata; use crate::application::system_trace; use crate::domain::chdb_naming::{ field_column_name, quote_backticks, quoted_series_table_name, quoted_table_name, @@ -654,6 +655,21 @@ impl ChdbNativeAdapter { .await { tracing::warn!(error = %e, "failed to persist series metadata; re-registers after restart"); + } else { + let mut tag_pairs: Vec<(String, String)> = Vec::new(); + for (_, p) in &new_series { + for (k, v) in &p.tags { + tag_pairs.push((k.clone(), v.clone())); + } + } + if let Err(e) = + backfill_tag_metadata(meta, &key.db, &key.measurement, tag_pairs).await + { + tracing::warn!( + error = %e, + "failed to backfill tag metadata from flushed series" + ); + } } } diff --git a/hyperbytedb/src/application/ingest_metadata.rs b/hyperbytedb/src/application/ingest_metadata.rs index 00168e9..eb0ad99 100644 --- a/hyperbytedb/src/application/ingest_metadata.rs +++ b/hyperbytedb/src/application/ingest_metadata.rs @@ -116,6 +116,69 @@ impl IngestSchemaCache { } } +/// Merge tag keys from durable metadata with those seen in the current batch. +async fn merged_tag_keys( + metadata: &Arc, + db: &str, + measurement: &str, + batch_tag_keys: &BTreeSet, +) -> Result, HyperbytedbError> { + let mut tag_keys = batch_tag_keys.clone(); + if let Some(existing) = metadata.get_measurement(db, measurement).await? { + for k in &existing.tag_keys { + tag_keys.insert(k.clone()); + } + } + Ok(tag_keys) +} + +/// Ensure SHOW TAG KEYS/VALUES indexes reflect tags observed on written points. +/// Merges with existing measurement metadata rather than replacing tag keys. +pub async fn backfill_tag_metadata( + metadata: &Arc, + db: &str, + measurement: &str, + tags: impl IntoIterator, +) -> Result<(), HyperbytedbError> { + let mut tag_keys: BTreeSet = BTreeSet::new(); + let mut seen: HashSet<(String, String)> = HashSet::new(); + let mut tag_batch: Vec<(String, String)> = Vec::new(); + for (k, v) in tags { + tag_keys.insert(k.clone()); + if seen.insert((k.clone(), v.clone())) { + tag_batch.push((k, v)); + } + } + if tag_keys.is_empty() { + return Ok(()); + } + + let mut meas_updates: Vec = Vec::new(); + if let Some(mut existing) = metadata.get_measurement(db, measurement).await? { + let before_len = existing.tag_keys.len(); + for k in &tag_keys { + if !existing.tag_keys.contains(k) { + existing.tag_keys.push(k.clone()); + } + } + existing.tag_keys.sort(); + if existing.tag_keys.len() != before_len { + meas_updates.push(existing); + } + } else { + meas_updates.push(MeasurementMeta { + name: measurement.to_string(), + field_types: HashMap::new(), + tag_keys: tag_keys.into_iter().collect(), + }); + } + + let tag_entries = vec![(measurement.to_string(), tag_batch)]; + metadata + .register_metadata_batch(db, &meas_updates, &tag_entries) + .await +} + /// Fast-path metadata preparation for columnar batches. /// /// Works directly from the wire format without expanding to `Vec`, @@ -158,7 +221,6 @@ pub async fn prepare_columnar_metadata( } }) .collect(); - sc.mark_tags(&novel_hashes); if !novel_hashes.is_empty() { let tag_batch: Vec<(String, String)> = batch @@ -171,14 +233,10 @@ pub async fn prepare_columnar_metadata( .map(|(k, v)| (k.clone(), v.clone())) .collect(); if !tag_batch.is_empty() { - let meta = metadata.clone(); - let db_owned = db.to_string(); - let meas = batch.measurement.clone(); - tokio::spawn(async move { - let _ = meta - .register_metadata_batch(&db_owned, &[], &[(meas, tag_batch)]) - .await; - }); + metadata + .register_metadata_batch(db, &[], &[(batch.measurement.clone(), tag_batch)]) + .await?; + sc.mark_tags(&novel_hashes); } } metrics::counter!("hyperbytedb_ingest_schema_cache_hits_total").increment(1); @@ -234,7 +292,10 @@ pub async fn prepare_columnar_metadata( let metas = vec![MeasurementMeta { name: batch.measurement.clone(), field_types: field_types.clone(), - tag_keys: tag_keys.iter().cloned().collect(), + tag_keys: merged_tag_keys(metadata, db, &batch.measurement, &tag_keys) + .await? + .into_iter() + .collect(), }]; let tag_batch: Vec<(String, String)> = batch @@ -251,7 +312,8 @@ pub async fn prepare_columnar_metadata( metadata.register_metadata_batch(db, &metas, &tags).await?; if let Some(sc) = schema_cache { - sc.mark_schema(db, &batch.measurement, &field_types, &tag_keys); + let merged = merged_tag_keys(metadata, db, &batch.measurement, &tag_keys).await?; + sc.mark_schema(db, &batch.measurement, &field_types, &merged); let novel_hashes: Vec<(u64,)> = batch .tags .iter() @@ -303,9 +365,10 @@ pub async fn prepare_batch_metadata( return Ok(()); } - // Schema hit but novel tag values — update cache immediately and - // fire-and-forget the RocksDB persistence. Tag values are only - // needed for SHOW TAG VALUES queries, not write correctness. + // Schema hit but novel tag values — persist before updating the cache. + // Tag values are only needed for SHOW TAG VALUES queries, not write + // correctness, but import-style bulk loads query metadata immediately + // after the write returns. let novel_hashes: Vec<(u64,)> = points .iter() .flat_map(|p| { @@ -319,9 +382,8 @@ pub async fn prepare_batch_metadata( }) }) .collect(); - sc.mark_tags(&novel_hashes); - // Collect only truly novel tag values for async persistence + // Collect only truly novel tag values for persistence let novel_set: HashSet = novel_hashes.iter().map(|(h,)| *h).collect(); let mut tag_batch_for_bg: Vec<(String, Vec<(String, String)>)> = Vec::new(); for meas_name in measurements.keys() { @@ -340,13 +402,10 @@ pub async fn prepare_batch_metadata( } } if !tag_batch_for_bg.is_empty() { - let meta = metadata.clone(); - let db_owned = db.to_string(); - tokio::spawn(async move { - let _ = meta - .register_metadata_batch(&db_owned, &[], &tag_batch_for_bg) - .await; - }); + metadata + .register_metadata_batch(db, &[], &tag_batch_for_bg) + .await?; + sc.mark_tags(&novel_hashes); } metrics::counter!("hyperbytedb_ingest_schema_cache_hits_total").increment(1); return Ok(()); @@ -417,10 +476,11 @@ pub async fn prepare_batch_metadata( let mut all_tags: Vec<(String, Vec<(String, String)>)> = Vec::with_capacity(measurements.len()); for (meas_name, (field_types, _tag_keys)) in &measurements { + let merged = merged_tag_keys(metadata, db, meas_name, &measurements[meas_name].1).await?; all_metas.push(MeasurementMeta { name: meas_name.clone(), field_types: field_types.clone(), - tag_keys: measurements[meas_name].1.iter().cloned().collect(), + tag_keys: merged.into_iter().collect(), }); let mut seen: HashSet<(String, String)> = HashSet::new(); @@ -443,7 +503,8 @@ pub async fn prepare_batch_metadata( if let Some(sc) = schema_cache { for (name, (fields, tags)) in &measurements { - sc.mark_schema(db, name, fields, tags); + let merged = merged_tag_keys(metadata, db, name, tags).await?; + sc.mark_schema(db, name, fields, &merged); } let novel_hashes: Vec<(u64,)> = points .iter() diff --git a/hyperbytedb/tests/compat/metadata_tests.rs b/hyperbytedb/tests/compat/metadata_tests.rs index e1e33a8..1fabda8 100644 --- a/hyperbytedb/tests/compat/metadata_tests.rs +++ b/hyperbytedb/tests/compat/metadata_tests.rs @@ -271,6 +271,84 @@ async fn show_tag_values_response_shape() { assert!(values.contains(&"server02")); } +#[tokio::test] +async fn show_tag_values_visible_after_steady_state_ingest() { + let ctx = TestContext::new_no_chdb().unwrap(); + ctx.metadata.create_database("testdb").await.unwrap(); + ctx.ingestion + .ingest( + "testdb", + None, + None, + b"cpu,host=server01 value=1.0 1000000000", + WritePayloadFormat::LineProtocol, + ) + .await + .unwrap(); + ctx.ingestion + .ingest( + "testdb", + None, + None, + b"cpu,host=server02 value=2.0 2000000000", + WritePayloadFormat::LineProtocol, + ) + .await + .unwrap(); + + let resp = ctx + .query("testdb", "SHOW TAG VALUES FROM cpu WITH KEY = \"host\"") + .await + .unwrap(); + assert!(resp.results[0].error.is_none()); + let values: Vec<&str> = resp.results[0].series.as_ref().unwrap()[0] + .values + .iter() + .filter_map(|row| row.get(1).and_then(|v| v.as_str())) + .collect(); + assert!(values.contains(&"server01")); + assert!(values.contains(&"server02")); +} + +#[tokio::test] +async fn show_tag_keys_not_cleared_by_tagless_batch() { + let ctx = TestContext::new_no_chdb().unwrap(); + ctx.metadata.create_database("testdb").await.unwrap(); + ctx.ingestion + .ingest( + "testdb", + None, + None, + b"cpu,host=server01 value=1.0 1000000000", + WritePayloadFormat::LineProtocol, + ) + .await + .unwrap(); + ctx.ingestion + .ingest( + "testdb", + None, + None, + b"cpu value=2.0 2000000000", + WritePayloadFormat::LineProtocol, + ) + .await + .unwrap(); + + let resp = ctx.query("testdb", "SHOW TAG KEYS FROM cpu").await.unwrap(); + assert!(resp.results[0].error.is_none()); + let keys: Vec<&str> = resp.results[0].series.as_ref().unwrap()[0] + .values + .iter() + .filter_map(|row| row.first().and_then(|v| v.as_str())) + .collect(); + assert!( + keys.contains(&"host"), + "tag keys should survive a tag-less batch: {:?}", + keys + ); +} + #[tokio::test] async fn show_tag_values_from_specific_measurement() { let ctx = TestContext::new_no_chdb().unwrap(); diff --git a/scripts/checkout-chdb-rust.sh b/scripts/checkout-chdb-rust.sh new file mode 100755 index 0000000..889ed5f --- /dev/null +++ b/scripts/checkout-chdb-rust.sh @@ -0,0 +1,41 @@ +#!/usr/bin/env bash +# Clone hyperbyte-cloud/chdb-rust next to the hyperbytedb repo root. +# +# hyperbytedb/hyperbytedb/Cargo.toml uses path = "../../chdb-rust", which resolves +# to a sibling of the hyperbytedb checkout: +# /hyperbytedb/hyperbytedb +# /chdb-rust +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" +WORKSPACE="${GITHUB_WORKSPACE:-${REPO_ROOT}}" +CHDB_RUST_REPO="${CHDB_RUST_REPO:-https://github.com/hyperbyte-cloud/chdb-rust.git}" +CHDB_RUST_REF="${CHDB_RUST_REF:-feat_arrow_insert}" +DEST="$(dirname "${WORKSPACE}")/chdb-rust" + +echo "Checking out ${CHDB_RUST_REPO} @ ${CHDB_RUST_REF} -> ${DEST}" + +if [[ -d "${DEST}/.git" ]]; then + git -C "${DEST}" fetch --depth 1 origin "${CHDB_RUST_REF}" + git -C "${DEST}" checkout FETCH_HEAD +else + git clone --depth 1 --branch "${CHDB_RUST_REF}" "${CHDB_RUST_REPO}" "${DEST}" +fi + +echo "chdb-rust HEAD: $(git -C "${DEST}" rev-parse --short HEAD)" + +if [[ ! -f "${DEST}/Cargo.toml" ]]; then + echo "error: ${DEST}/Cargo.toml not found after checkout" >&2 + exit 1 +fi + +CRATE_MANIFEST="${WORKSPACE}/hyperbytedb/Cargo.toml" +if [[ -f "${CRATE_MANIFEST}" ]]; then + RESOLVED="$(python3 -c "import os; print(os.path.normpath(os.path.join(os.path.dirname('${CRATE_MANIFEST}'), '../../chdb-rust')))")" + if [[ ! -f "${RESOLVED}/Cargo.toml" ]]; then + echo "error: path dependency does not resolve (${RESOLVED})" >&2 + exit 1 + fi + echo "path dependency resolves to: ${RESOLVED}" +fi diff --git a/scripts/install-git-hooks.sh b/scripts/install-git-hooks.sh new file mode 100755 index 0000000..5dfcbd1 --- /dev/null +++ b/scripts/install-git-hooks.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash +# Point this repository at the version-controlled hooks in .githooks/. +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" + +cd "${REPO_ROOT}" + +chmod +x .githooks/pre-commit +git config core.hooksPath .githooks + +echo "Installed git hooks (core.hooksPath=.githooks)" +echo "Pre-commit runs: cargo fmt --check, cargo clippy --all-targets -- -D warnings"