Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions examples/channel-memory/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,33 @@ The deterministic path is intentionally conservative:
- preserves obvious hard events as faithful `hard_event` blocks
- keeps ordinary retained content as `raw_excerpt` blocks
- collapses runtime/status noise into sparse `telemetry_count` blocks
- collapses exact short acknowledgements into sparse `low_value_ack` blocks
- collapses near-identical low-change decisions into sparse `decision_repeat`
blocks
- emits coverage-gap metadata from stored gap records
- creates tombstone blocks for deleted messages without carrying deleted content

Higher-quality `topic_rollup` and `sequence_rollup` blocks come from the async
LLM worker below.
Higher-quality `message_summary`, `topic_rollup`, and `sequence_rollup` blocks
come from the async LLM worker below.

## Async LLM Digest Worker

An optional background worker compresses verbose `raw_excerpt` material into
sparse `topic_rollup` / `sequence_rollup` blocks using an LLM. It is strictly
off the `/digest` hot path — `/digest` only ever reads already-generated blocks
and never calls a model.
sparse `message_summary`, `topic_rollup`, or `sequence_rollup` blocks using an
LLM. It is strictly off the `/digest` hot path — `/digest` only ever reads
already-generated blocks and never calls a model.

The worker is conservative by design:

- only `raw_excerpt` windows are summarized; hard events, tombstones, and
telemetry blocks keep their faithful deterministic form
- only `raw_excerpt` long messages and windows are summarized; hard events,
tombstones, telemetry blocks, and deterministic low-value rollups keep their
faithful deterministic form
- every generated block requires structured JSON output citing the exact source
message ids it summarized; malformed or provenance-free results are rejected
and the deterministic blocks keep serving
- work is cached by source-message ids plus content hashes, so an unchanged
window is never re-summarized
- long-message work is cached by channel id, message id, content hash,
provider, model, and compactor version; window rollups are cached by
source-message ids plus content hashes
- each block stores its provider, model, version, and cost in `metadata_json`
- conservative per-channel and per-pod daily call caps are enforced, with usage
tracked in `llm_usage`; over budget, disabled, or failing all fall back to
Expand All @@ -64,6 +69,7 @@ It is disabled unless `CHANNEL_MEMORY_LLM_ENABLED=true` and
are set. Tuning knobs: `CHANNEL_MEMORY_LLM_MODEL`, `CHANNEL_MEMORY_LLM_PROVIDER`,
`CHANNEL_MEMORY_LLM_VERSION`, `CHANNEL_MEMORY_LLM_API_KEY`,
`CHANNEL_MEMORY_LLM_WINDOW`, `CHANNEL_MEMORY_LLM_MIN_WINDOW`,
`CHANNEL_MEMORY_LLM_LONG_MESSAGE_CHARS`,
`CHANNEL_MEMORY_LLM_PER_CHANNEL_DAILY`, `CHANNEL_MEMORY_LLM_PER_POD_DAILY`,
`CHANNEL_MEMORY_LLM_PER_CHANNEL_DAILY_USD`,
`CHANNEL_MEMORY_LLM_PER_POD_DAILY_USD`,
Expand Down
2 changes: 1 addition & 1 deletion examples/channel-memory/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1613,7 +1613,7 @@ func preferSparseRollups(blocks []digestBlock, limit int) []digestBlock {
}

func suppressesRawExcerpt(block digestBlock) bool {
if block.Processor == digestProcessorLLM && (block.Kind == rollupKindTopic || block.Kind == rollupKindSequence) {
if block.Processor == digestProcessorLLM && (block.Kind == rollupKindMessage || block.Kind == rollupKindTopic || block.Kind == rollupKindSequence) {
return true
}
return block.Processor == "deterministic" && block.Kind == "decision_repeat"
Expand Down
173 changes: 165 additions & 8 deletions examples/channel-memory/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ import (

const (
digestProcessorLLM = "llm"
rollupKindMessage = "message_summary"
rollupKindTopic = "topic_rollup"
rollupKindSequence = "sequence_rollup"

defaultWorkerWindowSize = 8
defaultWorkerMinWindow = 3
defaultWorkerLongMessageChars = 1200
defaultWorkerPerChannelCalls = 24
defaultWorkerPerPodCalls = 96
defaultWorkerPerChannelCost = 0.50
Expand Down Expand Up @@ -144,6 +146,7 @@ type workerConfig struct {
Version string
WindowSize int
MinWindow int
LongMessageChars int
PerChannelDailyCalls int
PerPodDailyCalls int
PerChannelDailyCost float64
Expand All @@ -168,6 +171,7 @@ func workerConfigFromEnv() workerConfig {
Version: strings.TrimSpace(os.Getenv("CHANNEL_MEMORY_LLM_VERSION")),
WindowSize: intEnv("CHANNEL_MEMORY_LLM_WINDOW", defaultWorkerWindowSize),
MinWindow: intEnv("CHANNEL_MEMORY_LLM_MIN_WINDOW", defaultWorkerMinWindow),
LongMessageChars: intEnv("CHANNEL_MEMORY_LLM_LONG_MESSAGE_CHARS", defaultWorkerLongMessageChars),
PerChannelDailyCalls: intEnv("CHANNEL_MEMORY_LLM_PER_CHANNEL_DAILY", defaultWorkerPerChannelCalls),
PerPodDailyCalls: intEnv("CHANNEL_MEMORY_LLM_PER_POD_DAILY", defaultWorkerPerPodCalls),
PerChannelDailyCost: floatEnv("CHANNEL_MEMORY_LLM_PER_CHANNEL_DAILY_USD", defaultWorkerPerChannelCost),
Expand All @@ -194,6 +198,9 @@ func newDigestWorker(store *channelMemoryStore, client llmDigestClient, cfg work
if cfg.MinWindow > cfg.WindowSize {
cfg.MinWindow = cfg.WindowSize
}
if cfg.LongMessageChars < 0 {
cfg.LongMessageChars = 0
}
if cfg.Interval <= 0 {
cfg.Interval = defaultWorkerIntervalSeconds * time.Second
}
Expand Down Expand Up @@ -235,10 +242,11 @@ func (w *digestWorker) enabled() bool {
return w != nil && w.cfg.Enabled && w.client != nil
}

// processOnce scans every channel for raw_excerpt windows that lack a fresh
// sparse rollup and compresses them, oldest window first. It returns the number
// of blocks generated. It never blocks the /digest path and degrades to a
// no-op on any failure so deterministic blocks keep serving.
// processOnce scans every channel for long single messages and raw_excerpt
// windows that lack a fresh sparse summary/rollup, then compresses them oldest
// first. It returns the number of blocks generated. It never blocks the
// /digest path and degrades to a no-op on any failure so deterministic blocks
// keep serving.
func (w *digestWorker) processOnce(ctx context.Context) (int, error) {
if !w.enabled() {
return 0, nil
Expand Down Expand Up @@ -266,7 +274,81 @@ func (w *digestWorker) processOnce(ctx context.Context) (int, error) {
continue
}

windows, err := w.store.candidateRollupWindows(ctx, sourceKind, channelID, w.cfg.WindowSize, w.cfg.MinWindow)
podExhausted := false
channelExhausted := false
messageWindows, err := w.store.candidateLongMessageWindows(ctx, sourceKind, channelID, w.cfg.LongMessageChars)
if err != nil {
return generated, err
}
for _, window := range messageWindows {
podOK, err = w.podBudgetAvailable(ctx, day)
if err != nil {
return generated, err
}
if !podOK {
podExhausted = true
break
}
channelOK, err = w.channelBudgetAvailable(ctx, day, channelID)
if err != nil {
return generated, err
}
if !channelOK {
channelExhausted = true
break
}

meta := blockMetadata{
Provider: w.cfg.Provider,
Model: w.cfg.Model,
Version: w.cfg.Version,
}
key := llmMessageSummaryBlockKey(sourceKind, channelID, window, meta)
fresh, err := w.store.freshBlockExists(ctx, key)
if err != nil {
return generated, err
}
if fresh {
// Cache hit: identical message id + content hash + compactor
// identity already summarized. No LLM call.
continue
}

if err := w.store.recordLLMUsage(ctx, day, channelID, 0); err != nil {
return generated, err
}
result, err := w.client.Summarize(ctx, promptForWindow(channelID, window))
if err != nil {
w.logf("channel-memory digest worker: summarize long message channel %s: %v", channelID, err)
_ = w.store.recordQueueFailure(ctx, sourceKind, channelID, window, err.Error())
continue
}
costUSD := w.costForResult(result)
result.CostUSD = costUSD
if costUSD != 0 {
if err := w.store.recordLLMCost(ctx, day, channelID, costUSD); err != nil {
return generated, err
}
}
if err := validateRollup(result, window); err != nil {
w.logf("channel-memory digest worker: rejected long-message summary for channel %s: %v", channelID, err)
_ = w.store.recordQueueFailure(ctx, sourceKind, channelID, window, err.Error())
continue
}
meta.CostUSD = result.CostUSD
if err := w.store.writeSparseRollup(ctx, key, sourceKind, channelID, window, result, meta, w.now()); err != nil {
return generated, err
}
generated++
}
if podExhausted {
break
}
if channelExhausted {
continue
}

windows, err := w.store.candidateRollupWindows(ctx, sourceKind, channelID, w.cfg.WindowSize, w.cfg.MinWindow, w.cfg.LongMessageChars)
if err != nil {
return generated, err
}
Expand Down Expand Up @@ -425,7 +507,14 @@ func promptForWindow(channelID string, window rollupWindow) llmDigestPrompt {

func validateRollup(result llmDigestResult, window rollupWindow) error {
switch result.Kind {
case rollupKindMessage:
if len(window.Sources) != 1 {
return fmt.Errorf("%s requires exactly one source, got %d", rollupKindMessage, len(window.Sources))
}
case rollupKindTopic, rollupKindSequence:
if len(window.Sources) == 1 {
return fmt.Errorf("single-message summaries must use %s", rollupKindMessage)
}
default:
return fmt.Errorf("unexpected rollup kind %q", result.Kind)
}
Expand Down Expand Up @@ -468,6 +557,23 @@ func llmRollupBlockKey(sourceKind, channelID string, window rollupWindow) string
return strings.Join([]string{"llm", sourceKind, channelID, hex.EncodeToString(sum[:])}, ":")
}

func llmMessageSummaryBlockKey(sourceKind, channelID string, window rollupWindow, meta blockMetadata) string {
if len(window.Sources) != 1 {
return llmRollupBlockKey(sourceKind, channelID, window)
}
source := window.Sources[0]
sum := sha256.Sum256([]byte(strings.Join([]string{
sourceKind,
channelID,
source.MessageID,
source.ContentHash,
meta.Provider,
meta.Model,
meta.Version,
}, "\x00")))
return strings.Join([]string{"llm_message", sourceKind, channelID, source.MessageID, hex.EncodeToString(sum[:])}, ":")
}

type blockMetadata struct {
Provider string `json:"provider"`
Model string `json:"model"`
Expand All @@ -479,7 +585,39 @@ type blockMetadata struct {
// non-forgotten raw_excerpt messages (the verbose, low-signal material) into
// ordered windows. Hard events and telemetry noise are excluded so they keep
// their faithful deterministic blocks.
func (s *channelMemoryStore) candidateRollupWindows(ctx context.Context, sourceKind, channelID string, windowSize, minWindow int) ([]rollupWindow, error) {
func (s *channelMemoryStore) candidateLongMessageWindows(ctx context.Context, sourceKind, channelID string, minChars int) ([]rollupWindow, error) {
if minChars <= 0 {
return nil, nil
}
rows, err := s.db.QueryContext(ctx, `
SELECT id, source_kind, channel_id, message_id, content_hash, author_id, author_name,
created_at, edited_at, deleted, content, service, surface, guild_id, visibility_scope,
observed_seq, observed_at, is_current
FROM source_messages
WHERE source_kind = ? AND channel_id = ? AND is_current = 1 AND deleted = 0 AND forgotten_at = ''
ORDER BY created_at, observed_seq`,
sourceKind, channelID,
)
if err != nil {
return nil, err
}
defer rows.Close()

windows := make([]rollupWindow, 0)
for rows.Next() {
record, err := scanStoredSource(rows)
if err != nil {
return nil, err
}
if !isLongMessageSummaryCandidate(record.Content, minChars) {
continue
}
windows = append(windows, rollupWindow{Sources: []storedSourceMessage{record}})
}
return windows, rows.Err()
}

func (s *channelMemoryStore) candidateRollupWindows(ctx context.Context, sourceKind, channelID string, windowSize, minWindow, longMessageChars int) ([]rollupWindow, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, source_kind, channel_id, message_id, content_hash, author_id, author_name,
created_at, edited_at, deleted, content, service, surface, guild_id, visibility_scope,
Expand All @@ -506,6 +644,9 @@ func (s *channelMemoryStore) candidateRollupWindows(ctx context.Context, sourceK
if kind, _, _ := classifySourceContent(record.Content); kind != "raw_excerpt" {
continue
}
if isLongMessageSummaryCandidate(record.Content, longMessageChars) {
continue
}
eligible = append(eligible, record)
}
if err := rows.Err(); err != nil {
Expand All @@ -527,6 +668,21 @@ func (s *channelMemoryStore) candidateRollupWindows(ctx context.Context, sourceK
return windows, nil
}

func isLongMessageSummaryCandidate(content string, minChars int) bool {
if minChars <= 0 {
return false
}
trimmed := strings.TrimSpace(content)
if len([]rune(trimmed)) <= minChars {
return false
}
if isTelemetryNoise(trimmed) || isLowValueAcknowledgement(trimmed) {
return false
}
kind, _, _ := classifySourceContent(trimmed)
return kind == "raw_excerpt"
}

func (s *channelMemoryStore) freshBlockExists(ctx context.Context, blockKey string) (bool, error) {
var one int
err := s.db.QueryRowContext(ctx,
Expand Down Expand Up @@ -719,8 +875,9 @@ func (c *httpLLMClient) Summarize(ctx context.Context, prompt llmDigestPrompt) (
for _, m := range prompt.Messages {
fmt.Fprintf(&transcript, "[%s] (id=%s) %s: %s\n", m.CreatedAt, m.MessageID, m.Author, m.Content)
}
system := "You compress Discord channel transcripts into one compact digest block. " +
"Respond ONLY with a JSON object: {\"kind\":\"topic_rollup\"|\"sequence_rollup\",\"text\":string,\"source_messages\":[message id strings you summarized],\"score\":0..1}. " +
system := "You compress Discord channel transcripts into compact digest blocks. " +
"For one long source message, use kind \"message_summary\". For multi-message windows, use \"topic_rollup\" or \"sequence_rollup\". " +
"Respond ONLY with a JSON object: {\"kind\":\"message_summary\"|\"topic_rollup\"|\"sequence_rollup\",\"text\":string,\"source_messages\":[message id strings you summarized],\"score\":0..1}. " +
"source_messages MUST list the exact message ids you used as JSON strings, because Discord ids can be too large for safe JSON numbers. Do not invent ids."
payload := map[string]any{
"model": c.model,
Expand Down
Loading
Loading