From 147d921c4084735c2bede3620234fa244c7791a7 Mon Sep 17 00:00:00 2001 From: Wojtek Date: Fri, 26 Jun 2026 16:54:50 -0400 Subject: [PATCH] Summarize long channel messages off path --- examples/channel-memory/README.md | 24 ++-- examples/channel-memory/main.go | 2 +- examples/channel-memory/worker.go | 173 +++++++++++++++++++++++-- examples/channel-memory/worker_test.go | 97 ++++++++++++++ site/changelog.md | 1 + 5 files changed, 279 insertions(+), 18 deletions(-) diff --git a/examples/channel-memory/README.md b/examples/channel-memory/README.md index eed8d72..0223c0b 100644 --- a/examples/channel-memory/README.md +++ b/examples/channel-memory/README.md @@ -30,28 +30,33 @@ The deterministic path is intentionally conservative: - preserves obvious hard events as faithful `hard_event` blocks - keeps ordinary retained content as `raw_excerpt` blocks - collapses runtime/status noise into sparse `telemetry_count` blocks +- collapses exact short acknowledgements into sparse `low_value_ack` blocks +- collapses near-identical low-change decisions into sparse `decision_repeat` + blocks - emits coverage-gap metadata from stored gap records - creates tombstone blocks for deleted messages without carrying deleted content -Higher-quality `topic_rollup` and `sequence_rollup` blocks come from the async -LLM worker below. +Higher-quality `message_summary`, `topic_rollup`, and `sequence_rollup` blocks +come from the async LLM worker below. ## Async LLM Digest Worker An optional background worker compresses verbose `raw_excerpt` material into -sparse `topic_rollup` / `sequence_rollup` blocks using an LLM. It is strictly -off the `/digest` hot path — `/digest` only ever reads already-generated blocks -and never calls a model. +sparse `message_summary`, `topic_rollup`, or `sequence_rollup` blocks using an +LLM. It is strictly off the `/digest` hot path — `/digest` only ever reads +already-generated blocks and never calls a model. The worker is conservative by design: -- only `raw_excerpt` windows are summarized; hard events, tombstones, and - telemetry blocks keep their faithful deterministic form +- only `raw_excerpt` long messages and windows are summarized; hard events, + tombstones, telemetry blocks, and deterministic low-value rollups keep their + faithful deterministic form - every generated block requires structured JSON output citing the exact source message ids it summarized; malformed or provenance-free results are rejected and the deterministic blocks keep serving -- work is cached by source-message ids plus content hashes, so an unchanged - window is never re-summarized +- long-message work is cached by channel id, message id, content hash, + provider, model, and compactor version; window rollups are cached by + source-message ids plus content hashes - each block stores its provider, model, version, and cost in `metadata_json` - conservative per-channel and per-pod daily call caps are enforced, with usage tracked in `llm_usage`; over budget, disabled, or failing all fall back to @@ -64,6 +69,7 @@ It is disabled unless `CHANNEL_MEMORY_LLM_ENABLED=true` and are set. Tuning knobs: `CHANNEL_MEMORY_LLM_MODEL`, `CHANNEL_MEMORY_LLM_PROVIDER`, `CHANNEL_MEMORY_LLM_VERSION`, `CHANNEL_MEMORY_LLM_API_KEY`, `CHANNEL_MEMORY_LLM_WINDOW`, `CHANNEL_MEMORY_LLM_MIN_WINDOW`, +`CHANNEL_MEMORY_LLM_LONG_MESSAGE_CHARS`, `CHANNEL_MEMORY_LLM_PER_CHANNEL_DAILY`, `CHANNEL_MEMORY_LLM_PER_POD_DAILY`, `CHANNEL_MEMORY_LLM_PER_CHANNEL_DAILY_USD`, `CHANNEL_MEMORY_LLM_PER_POD_DAILY_USD`, diff --git a/examples/channel-memory/main.go b/examples/channel-memory/main.go index 7d55acf..064d2fd 100644 --- a/examples/channel-memory/main.go +++ b/examples/channel-memory/main.go @@ -1613,7 +1613,7 @@ func preferSparseRollups(blocks []digestBlock, limit int) []digestBlock { } func suppressesRawExcerpt(block digestBlock) bool { - if block.Processor == digestProcessorLLM && (block.Kind == rollupKindTopic || block.Kind == rollupKindSequence) { + if block.Processor == digestProcessorLLM && (block.Kind == rollupKindMessage || block.Kind == rollupKindTopic || block.Kind == rollupKindSequence) { return true } return block.Processor == "deterministic" && block.Kind == "decision_repeat" diff --git a/examples/channel-memory/worker.go b/examples/channel-memory/worker.go index c4b84ed..54b8e55 100644 --- a/examples/channel-memory/worker.go +++ b/examples/channel-memory/worker.go @@ -29,11 +29,13 @@ import ( const ( digestProcessorLLM = "llm" + rollupKindMessage = "message_summary" rollupKindTopic = "topic_rollup" rollupKindSequence = "sequence_rollup" defaultWorkerWindowSize = 8 defaultWorkerMinWindow = 3 + defaultWorkerLongMessageChars = 1200 defaultWorkerPerChannelCalls = 24 defaultWorkerPerPodCalls = 96 defaultWorkerPerChannelCost = 0.50 @@ -144,6 +146,7 @@ type workerConfig struct { Version string WindowSize int MinWindow int + LongMessageChars int PerChannelDailyCalls int PerPodDailyCalls int PerChannelDailyCost float64 @@ -168,6 +171,7 @@ func workerConfigFromEnv() workerConfig { Version: strings.TrimSpace(os.Getenv("CHANNEL_MEMORY_LLM_VERSION")), WindowSize: intEnv("CHANNEL_MEMORY_LLM_WINDOW", defaultWorkerWindowSize), MinWindow: intEnv("CHANNEL_MEMORY_LLM_MIN_WINDOW", defaultWorkerMinWindow), + LongMessageChars: intEnv("CHANNEL_MEMORY_LLM_LONG_MESSAGE_CHARS", defaultWorkerLongMessageChars), PerChannelDailyCalls: intEnv("CHANNEL_MEMORY_LLM_PER_CHANNEL_DAILY", defaultWorkerPerChannelCalls), PerPodDailyCalls: intEnv("CHANNEL_MEMORY_LLM_PER_POD_DAILY", defaultWorkerPerPodCalls), PerChannelDailyCost: floatEnv("CHANNEL_MEMORY_LLM_PER_CHANNEL_DAILY_USD", defaultWorkerPerChannelCost), @@ -194,6 +198,9 @@ func newDigestWorker(store *channelMemoryStore, client llmDigestClient, cfg work if cfg.MinWindow > cfg.WindowSize { cfg.MinWindow = cfg.WindowSize } + if cfg.LongMessageChars < 0 { + cfg.LongMessageChars = 0 + } if cfg.Interval <= 0 { cfg.Interval = defaultWorkerIntervalSeconds * time.Second } @@ -235,10 +242,11 @@ func (w *digestWorker) enabled() bool { return w != nil && w.cfg.Enabled && w.client != nil } -// processOnce scans every channel for raw_excerpt windows that lack a fresh -// sparse rollup and compresses them, oldest window first. It returns the number -// of blocks generated. It never blocks the /digest path and degrades to a -// no-op on any failure so deterministic blocks keep serving. +// processOnce scans every channel for long single messages and raw_excerpt +// windows that lack a fresh sparse summary/rollup, then compresses them oldest +// first. It returns the number of blocks generated. It never blocks the +// /digest path and degrades to a no-op on any failure so deterministic blocks +// keep serving. func (w *digestWorker) processOnce(ctx context.Context) (int, error) { if !w.enabled() { return 0, nil @@ -266,7 +274,81 @@ func (w *digestWorker) processOnce(ctx context.Context) (int, error) { continue } - windows, err := w.store.candidateRollupWindows(ctx, sourceKind, channelID, w.cfg.WindowSize, w.cfg.MinWindow) + podExhausted := false + channelExhausted := false + messageWindows, err := w.store.candidateLongMessageWindows(ctx, sourceKind, channelID, w.cfg.LongMessageChars) + if err != nil { + return generated, err + } + for _, window := range messageWindows { + podOK, err = w.podBudgetAvailable(ctx, day) + if err != nil { + return generated, err + } + if !podOK { + podExhausted = true + break + } + channelOK, err = w.channelBudgetAvailable(ctx, day, channelID) + if err != nil { + return generated, err + } + if !channelOK { + channelExhausted = true + break + } + + meta := blockMetadata{ + Provider: w.cfg.Provider, + Model: w.cfg.Model, + Version: w.cfg.Version, + } + key := llmMessageSummaryBlockKey(sourceKind, channelID, window, meta) + fresh, err := w.store.freshBlockExists(ctx, key) + if err != nil { + return generated, err + } + if fresh { + // Cache hit: identical message id + content hash + compactor + // identity already summarized. No LLM call. + continue + } + + if err := w.store.recordLLMUsage(ctx, day, channelID, 0); err != nil { + return generated, err + } + result, err := w.client.Summarize(ctx, promptForWindow(channelID, window)) + if err != nil { + w.logf("channel-memory digest worker: summarize long message channel %s: %v", channelID, err) + _ = w.store.recordQueueFailure(ctx, sourceKind, channelID, window, err.Error()) + continue + } + costUSD := w.costForResult(result) + result.CostUSD = costUSD + if costUSD != 0 { + if err := w.store.recordLLMCost(ctx, day, channelID, costUSD); err != nil { + return generated, err + } + } + if err := validateRollup(result, window); err != nil { + w.logf("channel-memory digest worker: rejected long-message summary for channel %s: %v", channelID, err) + _ = w.store.recordQueueFailure(ctx, sourceKind, channelID, window, err.Error()) + continue + } + meta.CostUSD = result.CostUSD + if err := w.store.writeSparseRollup(ctx, key, sourceKind, channelID, window, result, meta, w.now()); err != nil { + return generated, err + } + generated++ + } + if podExhausted { + break + } + if channelExhausted { + continue + } + + windows, err := w.store.candidateRollupWindows(ctx, sourceKind, channelID, w.cfg.WindowSize, w.cfg.MinWindow, w.cfg.LongMessageChars) if err != nil { return generated, err } @@ -425,7 +507,14 @@ func promptForWindow(channelID string, window rollupWindow) llmDigestPrompt { func validateRollup(result llmDigestResult, window rollupWindow) error { switch result.Kind { + case rollupKindMessage: + if len(window.Sources) != 1 { + return fmt.Errorf("%s requires exactly one source, got %d", rollupKindMessage, len(window.Sources)) + } case rollupKindTopic, rollupKindSequence: + if len(window.Sources) == 1 { + return fmt.Errorf("single-message summaries must use %s", rollupKindMessage) + } default: return fmt.Errorf("unexpected rollup kind %q", result.Kind) } @@ -468,6 +557,23 @@ func llmRollupBlockKey(sourceKind, channelID string, window rollupWindow) string return strings.Join([]string{"llm", sourceKind, channelID, hex.EncodeToString(sum[:])}, ":") } +func llmMessageSummaryBlockKey(sourceKind, channelID string, window rollupWindow, meta blockMetadata) string { + if len(window.Sources) != 1 { + return llmRollupBlockKey(sourceKind, channelID, window) + } + source := window.Sources[0] + sum := sha256.Sum256([]byte(strings.Join([]string{ + sourceKind, + channelID, + source.MessageID, + source.ContentHash, + meta.Provider, + meta.Model, + meta.Version, + }, "\x00"))) + return strings.Join([]string{"llm_message", sourceKind, channelID, source.MessageID, hex.EncodeToString(sum[:])}, ":") +} + type blockMetadata struct { Provider string `json:"provider"` Model string `json:"model"` @@ -479,7 +585,39 @@ type blockMetadata struct { // non-forgotten raw_excerpt messages (the verbose, low-signal material) into // ordered windows. Hard events and telemetry noise are excluded so they keep // their faithful deterministic blocks. -func (s *channelMemoryStore) candidateRollupWindows(ctx context.Context, sourceKind, channelID string, windowSize, minWindow int) ([]rollupWindow, error) { +func (s *channelMemoryStore) candidateLongMessageWindows(ctx context.Context, sourceKind, channelID string, minChars int) ([]rollupWindow, error) { + if minChars <= 0 { + return nil, nil + } + rows, err := s.db.QueryContext(ctx, ` + SELECT id, source_kind, channel_id, message_id, content_hash, author_id, author_name, + created_at, edited_at, deleted, content, service, surface, guild_id, visibility_scope, + observed_seq, observed_at, is_current + FROM source_messages + WHERE source_kind = ? AND channel_id = ? AND is_current = 1 AND deleted = 0 AND forgotten_at = '' + ORDER BY created_at, observed_seq`, + sourceKind, channelID, + ) + if err != nil { + return nil, err + } + defer rows.Close() + + windows := make([]rollupWindow, 0) + for rows.Next() { + record, err := scanStoredSource(rows) + if err != nil { + return nil, err + } + if !isLongMessageSummaryCandidate(record.Content, minChars) { + continue + } + windows = append(windows, rollupWindow{Sources: []storedSourceMessage{record}}) + } + return windows, rows.Err() +} + +func (s *channelMemoryStore) candidateRollupWindows(ctx context.Context, sourceKind, channelID string, windowSize, minWindow, longMessageChars int) ([]rollupWindow, error) { rows, err := s.db.QueryContext(ctx, ` SELECT id, source_kind, channel_id, message_id, content_hash, author_id, author_name, created_at, edited_at, deleted, content, service, surface, guild_id, visibility_scope, @@ -506,6 +644,9 @@ func (s *channelMemoryStore) candidateRollupWindows(ctx context.Context, sourceK if kind, _, _ := classifySourceContent(record.Content); kind != "raw_excerpt" { continue } + if isLongMessageSummaryCandidate(record.Content, longMessageChars) { + continue + } eligible = append(eligible, record) } if err := rows.Err(); err != nil { @@ -527,6 +668,21 @@ func (s *channelMemoryStore) candidateRollupWindows(ctx context.Context, sourceK return windows, nil } +func isLongMessageSummaryCandidate(content string, minChars int) bool { + if minChars <= 0 { + return false + } + trimmed := strings.TrimSpace(content) + if len([]rune(trimmed)) <= minChars { + return false + } + if isTelemetryNoise(trimmed) || isLowValueAcknowledgement(trimmed) { + return false + } + kind, _, _ := classifySourceContent(trimmed) + return kind == "raw_excerpt" +} + func (s *channelMemoryStore) freshBlockExists(ctx context.Context, blockKey string) (bool, error) { var one int err := s.db.QueryRowContext(ctx, @@ -719,8 +875,9 @@ func (c *httpLLMClient) Summarize(ctx context.Context, prompt llmDigestPrompt) ( for _, m := range prompt.Messages { fmt.Fprintf(&transcript, "[%s] (id=%s) %s: %s\n", m.CreatedAt, m.MessageID, m.Author, m.Content) } - system := "You compress Discord channel transcripts into one compact digest block. " + - "Respond ONLY with a JSON object: {\"kind\":\"topic_rollup\"|\"sequence_rollup\",\"text\":string,\"source_messages\":[message id strings you summarized],\"score\":0..1}. " + + system := "You compress Discord channel transcripts into compact digest blocks. " + + "For one long source message, use kind \"message_summary\". For multi-message windows, use \"topic_rollup\" or \"sequence_rollup\". " + + "Respond ONLY with a JSON object: {\"kind\":\"message_summary\"|\"topic_rollup\"|\"sequence_rollup\",\"text\":string,\"source_messages\":[message id strings you summarized],\"score\":0..1}. " + "source_messages MUST list the exact message ids you used as JSON strings, because Discord ids can be too large for safe JSON numbers. Do not invent ids." payload := map[string]any{ "model": c.model, diff --git a/examples/channel-memory/worker_test.go b/examples/channel-memory/worker_test.go index 81eae12..48e4b8a 100644 --- a/examples/channel-memory/worker_test.go +++ b/examples/channel-memory/worker_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "strings" "sync" "testing" "time" @@ -47,6 +48,19 @@ func citeAllTopicRollup(prompt llmDigestPrompt) (llmDigestResult, error) { }, nil } +func citeSingleMessageSummary(prompt llmDigestPrompt) (llmDigestResult, error) { + if len(prompt.Messages) != 1 { + return llmDigestResult{}, fmt.Errorf("expected one message, got %d", len(prompt.Messages)) + } + return llmDigestResult{ + Kind: rollupKindMessage, + Text: "Summarized long source " + prompt.Messages[0].MessageID + ".", + SourceMessages: []string{prompt.Messages[0].MessageID}, + Score: 0.8, + CostUSD: 0.001, + }, nil +} + func TestLLMDigestResultAcceptsNumericSourceMessageIDsLosslessly(t *testing.T) { raw := []byte(`{"kind":"topic_rollup","text":"summary","source_messages":[1507458107071270996,"1507458143121309849"],"score":0.7}`) var result llmDigestResult @@ -250,6 +264,89 @@ func TestDigestWorkerCompressesVerboseToSparseAndKeepsHardEvents(t *testing.T) { } } +func TestDigestWorkerSummarizesLongMessagesIndividually(t *testing.T) { + store := newTestStore(t) + defer store.Close() + base := time.Date(2026, 5, 21, 18, 0, 0, 0, time.UTC) + + longContent := "Long operational note: " + strings.Repeat("details that should be compacted before feed injection. ", 8) + ingestRaw(t, store, "chan-1", "901", longContent, base, "sha256:long-901") + ingestRaw(t, store, "chan-1", "902", "short context that should remain raw", base.Add(time.Minute), "sha256:short-902") + _, err := store.Ingest(context.Background(), ingestRequest{ + ChannelID: "chan-1", + Message: ingestMessage{ + ID: "903", AuthorName: "lead", CreatedAt: base.Add(2 * time.Minute).Format(time.RFC3339), + Content: "[APPROVED] signal-903 BUY ACME " + strings.Repeat("risk controls stay verbatim. ", 4), + ContentHash: "sha256:hard-903", + }, + }) + if err != nil { + t.Fatalf("ingest hard event: %v", err) + } + + client := &fakeLLMClient{respond: citeSingleMessageSummary} + worker := testWorker(store, client, func(c *workerConfig) { + c.LongMessageChars = 80 + }) + + generated, err := worker.processOnce(context.Background()) + if err != nil { + t.Fatalf("processOnce: %v", err) + } + if generated != 1 { + t.Fatalf("expected 1 long-message summary, got %d", generated) + } + if client.callCount() != 1 { + t.Fatalf("expected exactly 1 LLM call, got %d", client.callCount()) + } + if got := client.prompts[0].Messages; len(got) != 1 || got[0].MessageID != "901" { + t.Fatalf("expected only long raw message 901 summarized, got %+v", got) + } + + resp := digestFor(t, store, "chan-1") + if resp.Cost.DeterministicOnly { + t.Fatalf("expected deterministic_only=false once a message summary exists: %+v", resp.Cost) + } + var summary digestBlock + foundSummary := false + foundShortRaw := false + foundHardEvent := false + for _, b := range resp.Blocks { + if b.Kind == rollupKindMessage { + foundSummary = true + summary = b + } + if b.Kind == "raw_excerpt" && len(b.SourceMessages) == 1 && b.SourceMessages[0] == "901" { + t.Fatalf("long source 901 should be suppressed by message_summary: %+v", b) + } + if b.Kind == "raw_excerpt" && len(b.SourceMessages) == 1 && b.SourceMessages[0] == "902" { + foundShortRaw = true + } + if b.Kind == "hard_event" && len(b.SourceMessages) == 1 && b.SourceMessages[0] == "903" { + foundHardEvent = true + } + } + if !foundSummary { + t.Fatalf("expected message_summary block, got %+v", resp.Blocks) + } + if !summary.Sparse || summary.Processor != digestProcessorLLM || len(summary.SourceMessages) != 1 || summary.SourceMessages[0] != "901" { + t.Fatalf("unexpected message_summary provenance: %+v", summary) + } + if !foundShortRaw { + t.Fatalf("short context-bearing raw message should remain explicit: %+v", resp.Blocks) + } + if !foundHardEvent { + t.Fatalf("hard event should remain explicit: %+v", resp.Blocks) + } + + if _, err := worker.processOnce(context.Background()); err != nil { + t.Fatalf("processOnce #2: %v", err) + } + if client.callCount() != 1 { + t.Fatalf("expected long-message summary cache hit, got %d calls", client.callCount()) + } +} + // TestDigestWorkerRejectsMalformedAndProvenanceFreeOutput proves invalid model // output never becomes a block and the adapter stays deterministic. func TestDigestWorkerRejectsMalformedAndProvenanceFreeOutput(t *testing.T) { diff --git a/site/changelog.md b/site/changelog.md index cb0bba9..332e7a5 100644 --- a/site/changelog.md +++ b/site/changelog.md @@ -31,6 +31,7 @@ outline: deep - **Channel memory collapses repeated low-change decisions** -- deterministic channel-memory now emits a sparse `decision_repeat` block for same-author, same-channel, same-day repeated no-change decisions while preserving source message IDs for lookup. The digest assembler prefers that block over the covered raw excerpts, reducing repetitive context without hiding hard events. Refs [#260](https://github.com/mostlydev/clawdapus/issues/260). - **Channel memory elides low-value acknowledgements** -- exact short acknowledgements such as "ok", "thanks", and "got it" now collapse into sparse `low_value_ack` digest blocks by channel/hour before they can bloat raw awareness feeds. Context-bearing messages and hard events remain explicit. Refs [#260](https://github.com/mostlydev/clawdapus/issues/260). +- **Channel memory summarizes long messages off the hot path** -- the async digest worker now generates cached `message_summary` blocks for individual long raw messages, keyed by message identity, content hash, provider, model, and compactor version. `/digest` prefers the summary over the full raw excerpt while hard events and shorter context-bearing messages remain explicit. Refs [#260](https://github.com/mostlydev/clawdapus/issues/260). ## v0.24.0 {#v0-24-0}