Skip to content

feat: periodic llm-based conversation history compaction#6625

Open
Jacobinwwey wants to merge 29 commits intoAstrBotDevs:masterfrom
Jacobinwwey:feat/scheduled-context-compaction
Open

feat: periodic llm-based conversation history compaction#6625
Jacobinwwey wants to merge 29 commits intoAstrBotDevs:masterfrom
Jacobinwwey:feat/scheduled-context-compaction

Conversation

@Jacobinwwey
Copy link

@Jacobinwwey Jacobinwwey commented Mar 19, 2026

Motivation / 动机

AstrBot 现有 llm_compress 主要在“请求时超限”才触发,长会话治理偏被动,存在:

  1. 会话历史本体持续膨胀,缺少后台收敛。
  2. 工具调用后上下文可能快速增长,缺少就地压缩策略。
  3. 缺少可观测和运维入口(状态查看、手动触发)。
  4. 缺少可控的“顶层记忆注入”能力以稳定长期对话。

This PR introduces a unified context-governance set for local runner flows.
本 PR 为本地 Agent Runner 引入一组“上下文治理能力”,统一补齐以上缺口。


Scope / 范围说明

Although the title focuses periodic compaction, this PR also includes supporting capabilities required for production use:
标题强调了 periodic compaction,但本 PR 同时包含其配套基础能力(token 计数、工具后压缩策略、context memory 注入、管理命令),用于形成完整闭环。


Key Changes / 核心改动

  1. Periodic conversation-history compaction scheduler
  • 新增 astrbot/core/context_compaction_scheduler.py
  • 周期扫描 conversations,按 idle/message/token 条件筛选候选会话。
  • 复用 ContextManager + llm_compress 做多轮压缩,并回写 conversations.contenttoken_usage
  • 新增 get_status() 运行态信息,支持查看最近执行报告与错误。
  1. Lifecycle integration + admin operations
  • astrbot/core/core_lifecycle.py 中接入启动/停止。
  • 新增管理员命令:
    • /ctxcompact status
    • /ctxcompact run [limit]
  • 新增 context memory 管理命令:
    • /ctxmem status
    • /ctxmem ls
    • /ctxmem add <text>
    • /ctxmem rm <index>
    • /ctxmem clear
    • /ctxmem enable [on|off]
    • /ctxmem retrieval [on|off]
  1. Post-tool-call compaction policy
  • 在工具调用结果写回后,支持立即触发压缩检查(可配置)。
  • 新增软/硬阈值与防抖控制,避免高频工具调用导致上下文突增。
  1. Token counting modes
  • 新增 provider_settings.context_token_counter_modeestimate / tokenizer / auto)。
  • tokenizer 模式优先使用 tokenizer 计数,不可用时回退。
  1. Context memory (pinned top-level memory)
  • 新增 provider_settings.context_memory 配置域。
  • 支持手动维护 pinned memories,并注入到 prompt。
  • retrieval_* 为后续扩展预留接口,默认关闭。
  1. Config/schema/dashboard alignment + tests
  • 更新 default config、schema 与 dashboard metadata。
  • 增加并扩展以下测试:
    • tests/unit/test_context_compaction_scheduler.py
    • tests/unit/test_context_compaction_command.py
    • tests/unit/test_context_memory.py
    • tests/unit/test_context_memory_command.py
    • tests/agent/test_token_counter.py
    • tests/unit/test_prompt_assembly_router.py
    • 以及与工具后压缩相关的 runner 测试。

Config Highlights / 配置要点(默认值)

  • provider_settings.context_token_counter_mode = "estimate"
  • provider_settings.compact_context_after_tool_call = false
  • provider_settings.compact_context_soft_ratio = 0.3
  • provider_settings.compact_context_hard_ratio = 0.7
  • provider_settings.compact_context_debounce_seconds = 0
  • provider_settings.periodic_context_compaction.enabled = false
  • provider_settings.periodic_context_compaction.interval_minutes = 30
  • provider_settings.periodic_context_compaction.trigger_tokens = 0 (auto by ratio)
  • provider_settings.periodic_context_compaction.trigger_min_context_ratio = 0.3
  • provider_settings.context_memory.enabled = false
  • provider_settings.context_memory.inject_pinned_memory = true

All defaults are conservative; features are opt-in by configuration.
默认行为偏保守,新增能力均可按需开启。


Compatibility / 兼容性

  • This is NOT a breaking change.
  • Existing flows remain unchanged when new switches stay at defaults.

Verification / 验证

Local verification commands used during development:

  • ruff check for changed core/command/test modules
  • pytest -q tests/unit/test_context_compaction_scheduler.py tests/unit/test_context_compaction_command.py tests/unit/test_config.py

(Additional targeted tests for token counter / prompt assembly / context memory are included in this branch.)

@auto-assign auto-assign bot requested review from Fridemn and advent259141 March 19, 2026 11:49
@dosubot dosubot bot added the size:XL This PR changes 500-999 lines, ignoring generated files. label Mar 19, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances AstrBot's conversation management by upgrading its context handling from reactive, request-time overflow management to a proactive, periodic history compaction system. A new scheduler intelligently identifies and summarizes long-running conversation histories using LLMs, then stores these more concise versions, thereby optimizing resource usage and maintaining efficient context for extended interactions.

Highlights

  • Periodic Context Compaction Scheduler: Implemented a new core scheduler that periodically scans conversation histories, performs multi-round LLM summarization, and persists compacted history back to the database.
  • Core Lifecycle Integration: Integrated the new PeriodicContextCompactionScheduler into the application's core lifecycle, ensuring it starts as a background task and stops gracefully on shutdown.
  • Configurable Parameters: Introduced a comprehensive set of configurable parameters for the compaction process, including interval, startup delay, conversation limits, idle time, message/token thresholds, and dry-run mode.
  • Unit Tests: Added unit tests to validate the functionality of the context compaction scheduler, covering configuration loading, message sanitization, and idle time checks.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@dosubot dosubot bot added the area:core The bug / feature is about astrbot's core, backend label Mar 19, 2026
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 3 issues, and left some high level feedback:

  • In PeriodicContextCompactionScheduler.run, wait_seconds is computed as max(5, int(cfg["interval_minutes"])) * 60, which means configured intervals below 5 minutes are silently ignored; consider relying on the normalized interval_minutes (already clamped with _to_int) so that smaller intervals work as expected.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `PeriodicContextCompactionScheduler.run`, `wait_seconds` is computed as `max(5, int(cfg["interval_minutes"])) * 60`, which means configured intervals below 5 minutes are silently ignored; consider relying on the normalized `interval_minutes` (already clamped with `_to_int`) so that smaller intervals work as expected.

## Individual Comments

### Comment 1
<location path="astrbot/core/config/default.py" line_range="99-108" />
<code_context>
         ),
         "llm_compress_keep_recent": 6,
         "llm_compress_provider_id": "",
+        "periodic_context_compaction": {
+            "enabled": False,
+            "interval_minutes": 30,
</code_context>
<issue_to_address>
**issue (bug_risk):** The schema for `periodic_context_compaction` uses `items` for an object, which may be incorrect and can break validation/GUI generation.

In the provider template, this field is defined as an `object` but uses `items` instead of `properties` (which the rest of the file uses for objects). If the config system expects `properties`, the nested fields here likely won’t be validated or exposed in the UI. Please align this structure with the other object definitions in the schema.
</issue_to_address>

### Comment 2
<location path="tests/unit/test_context_compaction_scheduler.py" line_range="23-32" />
<code_context>
+    )
+
+
+def test_load_config_normalizes_values() -> None:
+    scheduler = _build_scheduler(
+        {
+            "periodic_context_compaction": {
+                "enabled": "true",
+                "interval_minutes": "0",
+                "target_tokens": 1024,
+                "trigger_tokens": 1000,
+                "max_rounds": "2",
+            }
+        }
+    )
+
+    cfg = scheduler._load_config()
+
+    assert cfg["enabled"] is True
+    assert cfg["interval_minutes"] == 1
+    assert cfg["target_tokens"] == 1024
+    assert cfg["trigger_tokens"] == 1025
+    assert cfg["max_rounds"] == 2
+
+
</code_context>
<issue_to_address>
**suggestion (testing):** Add more configuration normalization tests for edge and invalid values.

`test_load_config_normalizes_values` only exercises the happy path, while `_load_config` also handles a lot of bounds/fallback logic (negative/non-numeric values, ordering between `trigger_tokens` and `target_tokens`, minimum thresholds, etc.). Please add parametrized tests to cover:
- Non-boolean/boolean-like `enabled` values (e.g. `"yes"`, `"0"`, unknown strings) and their fallbacks.
- Values below minimums (e.g. `interval_minutes=0`, `scan_page_size=1`, `min_messages=0`, extremely small `target_tokens`) and how they are clamped.
- Cases where `trigger_tokens` is omitted or less than `target_tokens`, and how it is derived/adjusted.
- Non-dict `periodic_context_compaction` values falling back to `_DEFAULTS`.
These will better exercise the config normalization and reduce risk of misconfiguration issues in production.

Suggested implementation:

```python
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace

import pytest

from astrbot.core.context_compaction_scheduler import PeriodicContextCompactionScheduler

```

```python
class DummyConfigManager:
    def __init__(self, default_conf: dict):
        self.default_conf = default_conf


@pytest.mark.parametrize(
    "enabled_value, expected",
    [
        ("true", True),
        ("false", False),
        ("1", True),
        ("0", False),
        ("yes", True),
        ("no", False),
        ("", True),
        ("not-a-bool", True),
    ],
)
def test_load_config_normalizes_enabled_values(enabled_value, expected) -> None:
    scheduler = _build_scheduler(
        {
            "periodic_context_compaction": {
                "enabled": enabled_value,
            }
        }
    )

    cfg = scheduler._load_config()

    assert cfg["enabled"] is expected


@pytest.mark.parametrize(
    "raw, expected_interval, expected_scan_page_size, expected_min_messages",
    [
        (
            {"interval_minutes": 0, "scan_page_size": 1, "min_messages": 0},
            1,
            2,
            1,
        ),
        (
            {"interval_minutes": -5, "scan_page_size": -10, "min_messages": -1},
            1,
            2,
            1,
        ),
        (
            {"interval_minutes": "0", "scan_page_size": "1", "min_messages": "0"},
            1,
            2,
            1,
        ),
    ],
)
def test_load_config_clamps_minimum_numeric_values(
    raw, expected_interval, expected_scan_page_size, expected_min_messages
) -> None:
    scheduler = _build_scheduler({"periodic_context_compaction": raw})

    cfg = scheduler._load_config()

    assert cfg["interval_minutes"] == expected_interval
    assert cfg["scan_page_size"] == expected_scan_page_size
    assert cfg["min_messages"] == expected_min_messages


@pytest.mark.parametrize(
    "raw, expected_target, expected_trigger",
    [
        # trigger omitted -> derived from target
        ({"target_tokens": 1024}, 1024, 1024),
        # trigger explicitly None -> derived from target
        ({"target_tokens": 1024, "trigger_tokens": None}, 1024, 1024),
        # trigger lower than target -> raised to target
        ({"target_tokens": 1024, "trigger_tokens": 512}, 1024, 1024),
        # trigger lower than target, string input -> raised to target
        ({"target_tokens": 1024, "trigger_tokens": "512"}, 1024, 1024),
        # trigger greater than/equal to target -> kept as is
        ({"target_tokens": 1024, "trigger_tokens": 2048}, 1024, 2048),
        # extremely small target -> clamped to minimum
        ({"target_tokens": 10}, 64, 64),
    ],
)
def test_load_config_normalizes_trigger_and_target_tokens(
    raw, expected_target, expected_trigger
) -> None:
    scheduler = _build_scheduler({"periodic_context_compaction": raw})

    cfg = scheduler._load_config()

    assert cfg["target_tokens"] == expected_target
    assert cfg["trigger_tokens"] == expected_trigger


@pytest.mark.parametrize(
    "value",
    [
        None,
        1,
        "not-a-dict",
        [],
    ],
)
def test_load_config_falls_back_to_defaults_for_non_mapping(value) -> None:
    scheduler = _build_scheduler({"periodic_context_compaction": value})

    cfg = scheduler._load_config()

    assert cfg == scheduler._DEFAULTS

```

The expectations in these tests assume specific normalization rules inside `PeriodicContextCompactionScheduler._load_config`:
1. Boolean-like parsing: `"true"/"1"/"yes"``True`, `"false"/"0"/"no"``False`, and unknown/empty values fall back to the default for `enabled` (assumed to be `True` here). If your implementation uses a different default or mapping, update the `expected` column in `test_load_config_normalizes_enabled_values` accordingly.
2. Minimum clamping: `interval_minutes` is clamped to at least `1`, `scan_page_size` to at least `2`, and `min_messages` to at least `1`. If your actual minima differ, change the `expected_*` values in `test_load_config_clamps_minimum_numeric_values`.
3. Token behavior: `target_tokens` is clamped to at least `64`, and `trigger_tokens` defaults to `target_tokens` when omitted/invalid, and is raised to at least `target_tokens` when smaller. If your code uses different minima or derivation rules, adjust the expectations in `test_load_config_normalizes_trigger_and_target_tokens`.
4. Defaults: `scheduler._DEFAULTS` is assumed to be the canonical normalized default configuration for `periodic_context_compaction`. If the defaults are exposed differently (e.g., as a module-level constant), replace `scheduler._DEFAULTS` in `test_load_config_falls_back_to_defaults_for_non_mapping` with the appropriate reference.
5. Ensure that `pytest` is not already imported elsewhere in this file to avoid linter warnings about duplicate imports; if it is, you can omit the added `import pytest` edit.
</issue_to_address>

### Comment 3
<location path="astrbot/core/context_compaction_scheduler.py" line_range="198" />
<code_context>
+        except asyncio.TimeoutError:
+            return
+
+    def _load_config(self) -> dict[str, Any]:
+        default_conf = self.config_manager.default_conf
+        provider_settings = default_conf.get("provider_settings", {})
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting small helpers and a configuration type to break up `_load_config`, `_compact_one_conversation`, and `_sanitize_content` so this scheduler stays an orchestration layer rather than a monolith of mixed concerns.

You can reduce complexity without changing behavior by extracting a few focused helpers/types, while keeping this file as the main orchestrator.

### 1. Config normalization repetition

`_load_config` has a lot of repeated int/bool normalization patterns. You can centralize that logic with a small helper or dataclass to both tighten the method and make constraints obvious.

Example using a small helper (minimal change):

```python
def _normalize_int(
    self,
    cfg: dict[str, Any],
    key: str,
    default: int,
    min_value: int,
) -> int:
    cfg[key] = self._to_int(cfg.get(key), default, min_value)
    return cfg[key]

def _load_config(self) -> dict[str, Any]:
    default_conf = self.config_manager.default_conf
    provider_settings = default_conf.get("provider_settings", {})
    raw_cfg = provider_settings.get("periodic_context_compaction", {}) or {}

    if not isinstance(raw_cfg, dict):
        raw_cfg = {}

    cfg = dict(self._DEFAULTS)
    cfg.update(raw_cfg)

    cfg["enabled"] = self._to_bool(cfg.get("enabled"), False)
    self._normalize_int(cfg, "interval_minutes", 30, 1)
    self._normalize_int(cfg, "startup_delay_seconds", 120, 0)
    self._normalize_int(cfg, "max_conversations_per_run", 8, 1)
    self._normalize_int(cfg, "max_scan_per_run", 120, 1)
    self._normalize_int(cfg, "scan_page_size", 40, 10)
    self._normalize_int(cfg, "min_idle_minutes", 15, 0)
    self._normalize_int(cfg, "min_messages", 14, 2)
    self._normalize_int(cfg, "target_tokens", 4096, 512)

    trigger_default = max(int(cfg["target_tokens"] * 1.5), cfg["target_tokens"] + 1)
    self._normalize_int(cfg, "trigger_tokens", trigger_default, 512)
    if cfg["trigger_tokens"] <= cfg["target_tokens"]:
        cfg["trigger_tokens"] = cfg["target_tokens"] + 1

    self._normalize_int(cfg, "max_rounds", 3, 1)
    self._normalize_int(cfg, "truncate_turns", 1, 1)
    self._normalize_int(cfg, "keep_recent", 6, 0)

    cfg["provider_id"] = str(cfg.get("provider_id", "") or "").trim()
    cfg["instruction"] = str(cfg.get("instruction", "") or "").strip()
    cfg["dry_run"] = self._to_bool(cfg.get("dry_run"), False)

    return cfg
```

Or go one step further with a dataclass for stronger typing and clearer semantics, while still using the existing dict config externally:

```python
@dataclass
class CompactionConfig:
    enabled: bool
    interval_minutes: int
    startup_delay_seconds: int
    max_conversations_per_run: int
    max_scan_per_run: int
    scan_page_size: int
    min_idle_minutes: int
    min_messages: int
    target_tokens: int
    trigger_tokens: int
    max_rounds: int
    truncate_turns: int
    keep_recent: int
    provider_id: str
    instruction: str
    dry_run: bool

    @classmethod
    def from_raw(cls, raw: dict[str, Any], defaults: dict[str, Any]) -> "CompactionConfig":
        cfg = dict(defaults)
        cfg.update(raw or {})

        def to_int(v: Any, d: int, min_v: int) -> int:
            try:
                val = int(v)
            except Exception:
                val = d
            return max(val, min_v)

        enabled = _PeriodicUtils.to_bool(cfg.get("enabled"), False)
        interval_minutes = to_int(cfg.get("interval_minutes"), 30, 1)
        # ...other fields...

        # compute trigger_tokens using interval_minutes/target_tokens as now
        # ...

        return cls(
            enabled=enabled,
            interval_minutes=interval_minutes,
            # ...
            dry_run=_PeriodicUtils.to_bool(cfg.get("dry_run"), False),
        )
```

Then `_load_config` becomes a thin wrapper:

```python
def _load_config(self) -> CompactionConfig:
    provider_settings = self.config_manager.default_conf.get("provider_settings", {})
    raw_cfg = provider_settings.get("periodic_context_compaction", {})
    return CompactionConfig.from_raw(raw_cfg if isinstance(raw_cfg, dict) else {}, self._DEFAULTS)
```

You can still use it like a dict in existing calls by either:
- updating call sites to use attributes (`cfg.max_rounds`), or
- exposing `to_dict()` if you want to keep the rest of the code unchanged for now.

### 2. Breaking down `_compact_one_conversation`

`_compact_one_conversation` mixes eligibility checks, compaction loop, and persistence. Splitting the phases into small private helpers will reduce nesting and clarify the flow while preserving behavior.

For example:

```python
async def _compact_one_conversation(
    self,
    conv: ConversationV2,
    cfg: dict[str, Any],
) -> str:
    eligible, messages, before_tokens = self._check_eligibility(conv, cfg)
    if not eligible:
        return "skipped"

    provider = await self._resolve_provider(cfg, conv.user_id)
    if not provider:
        return "failed"

    compressed, changed, rounds = await self._run_compaction_rounds(
        messages, provider, cfg
    )
    if not changed:
        return "skipped"

    after_tokens = self._token_counter.count_tokens(compressed)
    if after_tokens >= before_tokens:
        return "skipped"

    if cfg["dry_run"]:
        self._log_dry_run(conv, before_tokens, after_tokens, rounds)
        return "compacted"

    if not await self._persist_compaction(conv, compressed, after_tokens):
        return "failed"

    self._log_compaction(conv, before_tokens, after_tokens, rounds)
    return "compacted"
```

With very small helpers:

```python
def _check_eligibility(
    self, conv: ConversationV2, cfg: dict[str, Any]
) -> tuple[bool, list[Message], int]:
    history = conv.content
    if not isinstance(history, list) or len(history) < cfg["min_messages"]:
        return False, [], 0
    if not self._is_idle_enough(conv.updated_at, cfg["min_idle_minutes"]):
        return False, [], 0

    messages = self._parse_history(history)
    if len(messages) < cfg["min_messages"]:
        return False, [], 0

    trusted_usage = conv.token_usage if isinstance(conv.token_usage, int) else 0
    before_tokens = self._token_counter.count_tokens(messages, trusted_usage)
    if before_tokens < cfg["trigger_tokens"]:
        return False, [], 0

    return True, messages, before_tokens
```

```python
async def _run_compaction_rounds(
    self,
    messages: list[Message],
    provider: Provider,
    cfg: dict[str, Any],
) -> tuple[list[Message], bool, int]:
    compressed = messages
    changed = False
    rounds = 0

    for _ in range(cfg["max_rounds"]):
        current_tokens = self._token_counter.count_tokens(compressed)
        if current_tokens <= cfg["target_tokens"]:
            break

        manager = ContextManager(
            ContextConfig(
                max_context_tokens=cfg["target_tokens"],
                enforce_max_turns=-1,
                truncate_turns=cfg["truncate_turns"],
                llm_compress_keep_recent=cfg["keep_recent"],
                llm_compress_instruction=self._resolve_instruction(cfg),
                llm_compress_provider=provider,
            )
        )

        rounds += 1
        next_messages = await manager.process(compressed)
        if self._messages_equal(compressed, next_messages):
            break

        compressed = next_messages
        changed = True

    return compressed, changed, rounds
```

This keeps all logic, but makes each piece easier to test and reason about.

### 3. Isolating the content sanitization logic

`_sanitize_content` is handling multiple shapes and part types; it can be split into content-type-specific helpers without changing behavior.

For example:

```python
def _sanitize_content(self, content: Any, role: str) -> str | list[dict] | None:
    if isinstance(content, str):
        return content
    if isinstance(content, list):
        return self._sanitize_list_content(content)
    if content is None:
        return None if role == "assistant" else ""
    dumped = self._safe_json(content)
    return dumped if dumped is not None else str(content)

def _sanitize_list_content(self, content: list[Any]) -> str | list[dict]:
    parts: list[dict[str, Any]] = []
    fallback_texts: list[str] = []

    for part in content:
        if isinstance(part, str):
            if part.strip():
                fallback_texts.append(part)
            continue
        if not isinstance(part, dict):
            txt = self._safe_json(part)
            if txt:
                fallback_texts.append(txt)
            continue
        self._sanitize_part(part, parts, fallback_texts)

    if fallback_texts:
        parts.insert(0, {"type": "text", "text": "\n".join(fallback_texts)})

    if parts:
        return parts
    return ""
```

```python
def _sanitize_part(
    self,
    part: dict[str, Any],
    parts: list[dict[str, Any]],
    fallback_texts: list[str],
) -> None:
    part_type = str(part.get("type", "")).strip()
    if part_type == "text":
        text_val = part.get("text")
        if text_val is not None:
            parts.append({"type": "text", "text": str(text_val)})
        return

    if part_type == "image_url":
        image_obj = part.get("image_url")
        if isinstance(image_obj, dict) and image_obj.get("url"):
            image_part: dict[str, Any] = {
                "type": "image_url",
                "image_url": {"url": str(image_obj.get("url"))},
            }
            if image_obj.get("id"):
                image_part["image_url"]["id"] = str(image_obj.get("id"))
            parts.append(image_part)
        return

    if part_type == "audio_url":
        audio_obj = part.get("audio_url")
        if isinstance(audio_obj, dict) and audio_obj.get("url"):
            audio_part: dict[str, Any] = {
                "type": "audio_url",
                "audio_url": {"url": str(audio_obj.get("url"))},
            }
            if audio_obj.get("id"):
                audio_part["audio_url"]["id"] = str(audio_obj.get("id"))
            parts.append(audio_part)
        return

    if part_type == "think":
        think = part.get("think")
        if think:
            fallback_texts.append(str(think))
        return

    raw_text = part.get("text") or part.get("content")
    if raw_text:
        fallback_texts.append(str(raw_text))
    else:
        dumped = self._safe_json(part)
        if dumped:
            fallback_texts.append(dumped)
```

This keeps the behavior intact, but flattens some of the nested `if` logic and makes it easier to extend (e.g., add new `type` handlers) without growing `_sanitize_content` further.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new feature for periodic LLM-based conversation history compaction. The implementation includes a new scheduler, configuration defaults, schema definitions, and dashboard metadata. The core logic for scanning, compacting, and persisting conversation history is well-structured, with appropriate error handling and logging. The integration into the core lifecycle for startup and graceful shutdown is also correctly handled. Unit tests cover important aspects of configuration normalization and message sanitization, which is good for ensuring the reliability of the new feature.

Comment on lines +235 to +236
if cfg["trigger_tokens"] <= cfg["target_tokens"]:
cfg["trigger_tokens"] = cfg["target_tokens"] + 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic here to ensure trigger_tokens is greater than target_tokens is redundant. The max function on line 232 already ensures that trigger_tokens will be at least target_tokens + 1, making the subsequent if statement unnecessary.

Suggested change
if cfg["trigger_tokens"] <= cfg["target_tokens"]:
cfg["trigger_tokens"] = cfg["target_tokens"] + 1
cfg["trigger_tokens"] = self._to_int(
cfg.get("trigger_tokens"),
max(int(cfg["target_tokens"] * 1.5), cfg["target_tokens"] + 1),
512,
)

@dosubot dosubot bot added size:XXL This PR changes 1000+ lines, ignoring generated files. and removed size:XL This PR changes 500-999 lines, ignoring generated files. labels Mar 19, 2026
@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Mar 19, 2026

Sure! I'm generating a new review now.

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="astrbot/core/context_compaction_scheduler.py" line_range="333-335" />
<code_context>
+        if after_tokens >= eligibility.before_tokens:
+            return "skipped"
+
+        if cfg["dry_run"]:
+            self._log_dry_run(conv, eligibility.before_tokens, after_tokens, round_result)
+            return "compacted"
+
+        persisted = await self._persist_compacted_history(
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Dry-run mode is reported as `compacted`, which can make metrics/logs ambiguous.

Because this path returns "compacted" in dry-run mode, `run_once` will treat these as successful compactions even though no state changed. That skews metrics and makes operational monitoring confusing. Please either introduce a distinct outcome (e.g., "dry_run") or reuse an existing non-compaction outcome (e.g., "skipped") while keeping the detailed dry-run logging, so stats only represent real persistence work.

Suggested implementation:

```python
        if cfg["dry_run"]:
            self._log_dry_run(conv, eligibility.before_tokens, after_tokens, round_result)
            return "skipped"

```

If there are any tests or metrics dashboards that assert/visualize a `"compacted"` outcome count including dry-run executions, they should be updated to expect `"skipped"` for dry-run mode so that only real persistence work is counted as `"compacted"`.
</issue_to_address>

### Comment 2
<location path="astrbot/core/context_compaction_scheduler.py" line_range="153" />
<code_context>
+    async def stop(self) -> None:
+        self._stop_event.set()
+
+    async def run_once(
+        self,
+        reason: str = "manual",
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the pagination/compaction loop in `run_once` and the history parsing/sanitization logic into focused helpers or a separate parser class to make the scheduler easier to read and maintain without changing behavior.

You can reduce the complexity meaningfully without changing behavior by extracting a couple of focused helpers. Two places give you the most value for the least churn: the `run_once` loop and the history parsing.

---

### 1. Flatten `run_once` by extracting iteration + compaction helpers

`run_once` currently mixes pagination, limits, eligibility, compaction, and stats. You already have `_compact_one_conversation`; a small iterator plus a “status” wrapper makes the method much easier to scan.

Example refactor:

```python
# new helper
async def _iter_candidate_conversations(
    self,
    max_to_scan: int,
    scan_page_size: int,
) -> Iterable[ConversationV2]:
    page = 1
    scanned = 0

    while not self._stop_event.is_set() and scanned < max_to_scan:
        conversations, total = await self.conversation_manager.db.get_filtered_conversations(
            page=page,
            page_size=scan_page_size,
        )
        if not conversations:
            break

        for conv in conversations:
            if self._stop_event.is_set() or scanned >= max_to_scan:
                return
            scanned += 1
            yield conv

        if page * scan_page_size >= total:
            break
        page += 1
```

Then you can flatten `run_once` to something like:

```python
async def run_once(
    self,
    reason: str = "manual",
    max_conversations_override: int | None = None,
) -> dict[str, Any]:
    async with self._running_lock:
        self._last_started_at = self._now_iso()
        cfg = self._load_config()
        started = time.monotonic()
        stats = _CompactionStats()

        if not cfg["enabled"] and reason == "scheduled":
            # ... existing early-return block unchanged ...

        max_to_compact = max(1, int(max_conversations_override or cfg["max_conversations_per_run"]))
        max_to_scan = max(max_to_compact, int(cfg["max_scan_per_run"]))
        scan_page_size = max(10, int(cfg["scan_page_size"]))

        async for conv in self._iter_candidate_conversations(max_to_scan, scan_page_size):
            if stats.compacted >= max_to_compact:
                break

            outcome = await self._compact_one_conversation(conv, cfg)
            stats.scanned += 1

            if outcome == "compacted":
                stats.compacted += 1
            elif outcome == "skipped":
                stats.skipped += 1
            else:
                stats.failed += 1

        elapsed = time.monotonic() - started
        report = {
            "reason": reason,
            "scanned": stats.scanned,
            "compacted": stats.compacted,
            "skipped": stats.skipped,
            "failed": stats.failed,
            "elapsed_sec": elapsed,
        }
        self._last_report = report
        self._last_finished_at = self._now_iso()
        self._last_error = None
        return report
```

This keeps all behavior but makes the main loop more obviously “limits → iterate → compact → stats”.

---

### 2. Extract message parsing/sanitization into a helper class

The `_parse_history` / `_sanitize_*` methods are a sizable chunk of this class and are conceptually separate from scheduling/compaction. Pulling them behind a tiny interface removes a lot of noise from `PeriodicContextCompactionScheduler` and makes them independently testable.

Example minimal extraction:

```python
# new module/class, e.g. message_history_parser.py

from collections.abc import Iterable
from typing import Any
from astrbot.core.agent.message import Message

class MessageHistoryParser:
    def parse(self, history: Iterable[Any]) -> list[Message]:
        parsed: list[Message] = []
        for item in history:
            if not isinstance(item, dict):
                continue

            try:
                parsed.append(Message.model_validate(item))
                continue
            except Exception:
                pass

            fallback = self._sanitize_message_dict(item)
            if not fallback:
                continue
            try:
                parsed.append(Message.model_validate(fallback))
            except Exception:
                continue

        return parsed

    # move _sanitize_message_dict, _sanitize_content, _sanitize_list_content,
    # _sanitize_content_part, _safe_json here as methods (unchanged bodies)
```

Then inject/use it in the scheduler:

```python
class PeriodicContextCompactionScheduler:
    def __init__(
        self,
        config_manager: AstrBotConfigManager,
        conversation_manager: ConversationManager,
        provider_manager: ProviderManager,
        history_parser: MessageHistoryParser | None = None,
    ) -> None:
        self.config_manager = config_manager
        self.conversation_manager = conversation_manager
        self.provider_manager = provider_manager
        self.history_parser = history_parser or MessageHistoryParser()
        # ...

    def _check_eligibility(
        self,
        conv: ConversationV2,
        cfg: dict[str, Any],
    ) -> _EligibilityResult:
        history = conv.content
        if not isinstance(history, list) or len(history) < cfg["min_messages"]:
            return _EligibilityResult(eligible=False, messages=[], before_tokens=0)

        if not self._is_idle_enough(conv.updated_at, cfg["min_idle_minutes"]):
            return _EligibilityResult(eligible=False, messages=[], before_tokens=0)

        messages = self.history_parser.parse(history)
        # ... rest unchanged ...
```

This keeps all behavior intact but shrinks the scheduler’s surface area and clusters all parsing/sanitization logic in one dedicated place.

---

If you want a further small win later, `_load_config` could build a simple `@dataclass` config object instead of mutating a dict in place; that would also let you move `_to_bool` / `_to_int` to a shared config utils module.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Mar 19, 2026

Hey @Jacobinwwey, I've posted a new review for you!

@Jacobinwwey
Copy link
Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues, and left some high level feedback:

  • The periodic context compaction defaults are hard-coded both in config/default.py and in PeriodicContextCompactionScheduler._DEFAULTS; consider deriving the scheduler defaults from the config (or a shared constant) to avoid silent divergence when one side is changed.
  • _iter_candidate_conversations paginates and then filters purely in Python; if possible, push the idle-time (updated_at) and min-message thresholds down into get_filtered_conversations (or add a dedicated query) so the DB can pre-filter candidates and avoid scanning lots of ineligible rows each run.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The periodic context compaction defaults are hard-coded both in `config/default.py` and in `PeriodicContextCompactionScheduler._DEFAULTS`; consider deriving the scheduler defaults from the config (or a shared constant) to avoid silent divergence when one side is changed.
- `_iter_candidate_conversations` paginates and then filters purely in Python; if possible, push the idle-time (`updated_at`) and min-message thresholds down into `get_filtered_conversations` (or add a dedicated query) so the DB can pre-filter candidates and avoid scanning lots of ineligible rows each run.

## Individual Comments

### Comment 1
<location path="tests/unit/test_context_compaction_command.py" line_range="42-51" />
<code_context>
+    assert "compacted=2" in text
+
+
+@pytest.mark.asyncio
+async def test_run_with_invalid_limit() -> None:
+    scheduler = _build_scheduler()
+    scheduler.run_once = AsyncMock()
+
+    command = ContextCompactionCommands(
+        context=SimpleNamespace(context_compaction_scheduler=scheduler)
+    )
+    event = SimpleNamespace(send=AsyncMock())
+
+    await command.run(event, 0)
+
+    scheduler.run_once.assert_not_awaited()
+    chain = event.send.await_args.args[0]
+    assert "limit 必须 >= 1" in chain.get_plain_text(with_other_comps_mark=True)
+
+
+@pytest.mark.asyncio
+async def test_run_triggers_scheduler_once() -> None:
+    scheduler = _build_scheduler()
+    scheduler.run_once = AsyncMock(
</code_context>
<issue_to_address>
**suggestion (testing):** Extend command tests to cover error reporting and status `last_error` formatting

Current tests cover the happy path for `status`/`run`, plus "scheduler unavailable" and invalid `limit`. To better validate the admin interface, please also add:

- A `run` test where `scheduler.run_once` raises, asserting `event.send` is called with a message starting with `"触发压缩失败:"` and that `run_once` is awaited exactly once.
- A `status` test where `get_status()` returns a non-empty `last_error` (with `last_report` still set), asserting the output includes the `最近错误:...` line alongside the normal summary.

This will exercise the error-reporting paths, not just successful execution.
</issue_to_address>

### Comment 2
<location path="astrbot/core/context_compaction_scheduler.py" line_range="697" />
<code_context>
+            at = at.replace(tzinfo=timezone.utc)
+        return (now - at).total_seconds() >= (min_idle_minutes * 60)
+
+    def _parse_history(self, history: Iterable[Any]) -> list[Message]:
+        return self._history_parser.parse(history)
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider inlining the trivial history parser wrappers and encapsulating config normalization in a dedicated typed config dataclass to reduce the scheduler’s API surface and mental overhead.

You can cut a good amount of surface-area and cognitive load with a couple of small, local refactors without changing behavior.

### 1. Drop the redundant parser wrappers

These methods don’t add invariants or behavior:

```python
def _parse_history(self, history: Iterable[Any]) -> list[Message]:
    return self._history_parser.parse(history)

def _sanitize_message_dict(self, item: dict[str, Any]) -> dict[str, Any] | None:
    return self._history_parser.sanitize_message_dict(item)

def _sanitize_content(self, content: Any, role: str) -> str | list[dict] | None:
    return self._history_parser.sanitize_content(content, role)

def _sanitize_list_content(self, content: list[Any]) -> str | list[dict]:
    return self._history_parser.sanitize_list_content(content)

def _sanitize_content_part(
    self,
    part: dict[str, Any],
    parts: list[dict[str, Any]],
    fallback_texts: list[str],
) -> None:
    self._history_parser.sanitize_content_part(part, parts, fallback_texts)
```

You can call the parser directly where used and delete these wrappers:

```python
def _check_eligibility(
    self,
    conv: ConversationV2,
    cfg: dict[str, Any],
) -> _EligibilityResult:
    history = conv.content
    if not isinstance(history, list) or len(history) < cfg["min_messages"]:
        return _EligibilityResult(eligible=False, messages=[], before_tokens=0)

    if not self._is_idle_enough(conv.updated_at, cfg["min_idle_minutes"]):
        return _EligibilityResult(eligible=False, messages=[], before_tokens=0)

    messages = self._history_parser.parse(history)  # direct call
    ...
```

This reduces the class API and makes it clearer that parsing is a separate concern.

If you want a slightly stronger separation, you can also move `_MessageHistoryParser` into its own module (no behavior change, just location):

```python
# astrbot/core/agent/context/history_parser.py
class MessageHistoryParser:
    ...

# periodic scheduler module
from astrbot.core.agent.context.history_parser import MessageHistoryParser

class PeriodicContextCompactionScheduler:
    def __init__(...):
        ...
        self._history_parser = MessageHistoryParser()
```

### 2. Simplify config normalization into a typed config object

Right now `_load_config` spreads logic across multiple helpers. You can keep behavior but localize the normalization into a small dataclass:

```python
@dataclass
class PeriodicContextCompactionConfig:
    enabled: bool
    interval_minutes: int
    startup_delay_seconds: int
    max_conversations_per_run: int
    max_scan_per_run: int
    scan_page_size: int
    min_idle_minutes: int
    min_messages: int
    target_tokens: int
    trigger_tokens: int
    max_rounds: int
    truncate_turns: int
    keep_recent: int
    provider_id: str
    instruction: str
    dry_run: bool

    @classmethod
    def from_raw(cls, raw: dict[str, Any], defaults: dict[str, Any]) -> "PeriodicContextCompactionConfig":
        cfg = {**defaults, **(raw or {})}

        def to_bool(value: Any, default: bool) -> bool:
            if isinstance(value, bool):
                return value
            if isinstance(value, (int, float)):
                return bool(value)
            if isinstance(value, str):
                lowered = value.strip().lower()
                if lowered in {"1", "true", "yes", "on"}:
                    return True
                if lowered in {"0", "false", "no", "off"}:
                    return False
            return default

        def to_int(value: Any, default: int, min_value: int) -> int:
            try:
                parsed = int(value)
            except Exception:
                parsed = default
            return max(parsed, min_value)

        target_tokens = to_int(cfg.get("target_tokens"), 4096, 512)
        raw_trigger = raw.get("trigger_tokens")
        trigger_default = max(int(target_tokens * 1.5), target_tokens + 1)
        if raw_trigger is None or (isinstance(raw_trigger, str) and not raw_trigger):
            trigger_tokens = trigger_default
        else:
            trigger_tokens = to_int(raw_trigger, trigger_default, 512)
        if trigger_tokens <= target_tokens:
            trigger_tokens = target_tokens + 1

        return cls(
            enabled=to_bool(cfg.get("enabled"), False),
            interval_minutes=to_int(cfg.get("interval_minutes"), 30, 1),
            startup_delay_seconds=to_int(cfg.get("startup_delay_seconds"), 120, 0),
            max_conversations_per_run=to_int(cfg.get("max_conversations_per_run"), 8, 1),
            max_scan_per_run=to_int(cfg.get("max_scan_per_run"), 120, 1),
            scan_page_size=to_int(cfg.get("scan_page_size"), 40, 10),
            min_idle_minutes=to_int(cfg.get("min_idle_minutes"), 15, 0),
            min_messages=to_int(cfg.get("min_messages"), 14, 2),
            target_tokens=target_tokens,
            trigger_tokens=trigger_tokens,
            max_rounds=to_int(cfg.get("max_rounds"), 3, 1),
            truncate_turns=to_int(cfg.get("truncate_turns"), 1, 1),
            keep_recent=to_int(cfg.get("keep_recent"), 6, 0),
            provider_id=str(cfg.get("provider_id", "") or "").strip(),
            instruction=str(cfg.get("instruction", "") or "").strip(),
            dry_run=to_bool(cfg.get("dry_run"), False),
        )
```

Then `_load_config` becomes a thin wrapper:

```python
def _load_config(self) -> PeriodicContextCompactionConfig:
    raw = self._load_raw_config()
    return PeriodicContextCompactionConfig.from_raw(raw, self._DEFAULTS)
```

You can keep the external `get_status` contract by converting back to `dict` where needed:

```python
def get_status(self) -> dict[str, Any]:
    cfg = self._load_config()
    return {
        "running": self._running_lock.locked(),
        "bootstrapped": self._bootstrapped,
        "stop_requested": self._stop_event.is_set(),
        "config": cfg.__dict__,
        ...
    }
```

This keeps all existing behavior but moves the normalization into a single, typed place and lets you delete `_normalize_int`, `_normalize_trigger_tokens`, `_to_bool`, and `_to_int` from the scheduler class, reducing its responsibilities.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@Jacobinwwey
Copy link
Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="tests/unit/test_context_compaction_command.py" line_range="42-51" />
<code_context>
+    )
+
+
+@pytest.mark.asyncio
+async def test_status_when_scheduler_unavailable() -> None:
+    command = ContextCompactionCommands(context=SimpleNamespace())
+    event = SimpleNamespace(send=AsyncMock())
+
+    await command.status(event)
+
+    event.send.assert_awaited_once()
+    chain = event.send.await_args.args[0]
+    assert "不可用" in chain.get_plain_text(with_other_comps_mark=True)
+
+
</code_context>
<issue_to_address>
**suggestion (testing):** Consider testing `status` when scheduler exists but has no recent report

Right now you test only the scheduler-unavailable case and the path where `_last_report` is set. Please add a test for the case where the scheduler exists but has never run:
- Create a scheduler with `enabled=True` and `last_report` / `last_error` unset.
- Call `status` and assert the output includes the base config and the "最近任务:暂无" fallback.

This will lock in the behavior and formatting for the `last_report is None` branch.

Suggested implementation:

```python
from astrbot.builtin_stars.builtin_commands.commands.context_compaction import (
    ContextCompactionCommands,
)
from astrbot.core.context_compaction_scheduler import PeriodicContextCompactionScheduler


@pytest.mark.asyncio
async def test_status_when_scheduler_unavailable() -> None:
    command = ContextCompactionCommands(context=SimpleNamespace())
    event = SimpleNamespace(send=AsyncMock())

    await command.status(event)

    event.send.assert_awaited_once()
    chain = event.send.await_args.args[0]
    assert "不可用" in chain.get_plain_text(with_other_comps_mark=True)


@pytest.mark.asyncio
async def test_status_when_scheduler_exists_but_has_no_report() -> None:
    # 创建一个启用但从未运行过(last_report / last_error 均为 None)的调度器
    scheduler = PeriodicContextCompactionScheduler(
        config_manager=SimpleNamespace(),
        conversation_manager=SimpleNamespace(),
        provider_manager=SimpleNamespace(),
    )
    # 确保启用状态,且没有任何报告
    scheduler.enabled = True
    if hasattr(scheduler, "_last_report"):
        scheduler._last_report = None  # type: ignore[attr-defined]
    if hasattr(scheduler, "_last_error"):
        scheduler._last_error = None  # type: ignore[attr-defined]

    context = SimpleNamespace(context_compaction_scheduler=scheduler)
    command = ContextCompactionCommands(context=context)
    event = SimpleNamespace(send=AsyncMock())

    await command.status(event)

    event.send.assert_awaited_once()
    chain = event.send.await_args.args[0]
    text = chain.get_plain_text(with_other_comps_mark=True)

    # 断言包含基本配置输出(与已有 “有最近报告” 的测试保持一致)
    # 以及 “最近任务:暂无” 的回退文案
    assert "最近任务:暂无" in text

```

1. 如果当前文件中已经有用于构建 `PeriodicContextCompactionScheduler` 的帮助函数或 pytest fixture(例如带完整 `config_manager` 配置),建议在新测试中复用该帮助函数 / fixture,而不是直接实例化 `PeriodicContextCompactionScheduler`。将上面测试中对 `scheduler` 的构造替换为与其他测试一致的构造方式。
2. 为了满足“包含 base config 的断言”这一要求,可以参考已有测试中针对 “有最近报告” 分支的断言,把那里的基础配置断言(例如包含某些固定前缀 / 字段名)一并拷贝到本测试中,与 `assert "最近任务:暂无" in text` 一起使用。
3. 如果 `PeriodicContextCompactionScheduler` 不允许直接设置 `enabled` 或内部 `_last_report` / `_last_error`,请改用其公开 API(例如 `scheduler.enable()` 或重置 / 清理方法)来保证“启用但无任何报告”的状态,再保留后续对 `status` 文本的断言。
</issue_to_address>

### Comment 2
<location path="astrbot/core/context_compaction_scheduler.py" line_range="411" />
<code_context>
+    def _resolve_wait_seconds(cfg: dict[str, Any]) -> int:
+        return max(1, int(cfg["interval_minutes"])) * 60
+
+    def _load_config(self) -> dict[str, Any]:
+        raw_cfg = self._load_raw_config()
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider introducing an immutable CompactionConfig dataclass and using typed attributes instead of a free-form cfg dict to simplify configuration handling and downstream logic.

You can reduce the complexity meaningfully by introducing a small immutable config object and using it instead of a free‑form dict. This removes a lot of repeated `cfg[...]` indexing and makes the logic in `_check_eligibility`, `_run_compaction_rounds`, `_iter_candidate_conversations`, etc. much easier to follow without changing functionality.

### 1. Introduce a `CompactionConfig` dataclass

```python
from dataclasses import dataclass

@dataclass(frozen=True)
class CompactionConfig:
    enabled: bool
    interval_minutes: int
    startup_delay_seconds: int
    max_conversations_per_run: int
    max_scan_per_run: int
    scan_page_size: int
    min_idle_minutes: int
    min_messages: int
    target_tokens: int
    trigger_tokens: int
    max_rounds: int
    truncate_turns: int
    keep_recent: int
    provider_id: str
    instruction: str
    dry_run: bool
```

### 2. Have `_load_config` return `CompactionConfig`

You can reuse your existing normalization logic but assign into the dataclass at the end, so the parsing complexity stays localized:

```python
def _load_config(self) -> CompactionConfig:
    raw_cfg = self._load_raw_config()

    cfg = dict(self._DEFAULTS)
    cfg.update(raw_cfg)

    cfg["enabled"] = self._to_bool(cfg.get("enabled"), False)
    self._normalize_int(cfg, "interval_minutes", 30, 1)
    self._normalize_int(cfg, "startup_delay_seconds", 120, 0)
    self._normalize_int(cfg, "max_conversations_per_run", 8, 1)
    self._normalize_int(cfg, "max_scan_per_run", 120, 1)
    self._normalize_int(cfg, "scan_page_size", 40, 10)
    self._normalize_int(cfg, "min_idle_minutes", 15, 0)
    self._normalize_int(cfg, "min_messages", 14, 2)
    self._normalize_int(cfg, "target_tokens", 4096, 512)
    self._normalize_trigger_tokens(cfg, raw_cfg)
    self._normalize_int(cfg, "max_rounds", 3, 1)
    self._normalize_int(cfg, "truncate_turns", 1, 1)
    self._normalize_int(cfg, "keep_recent", 6, 0)
    cfg["provider_id"] = str(cfg.get("provider_id", "") or "").strip()
    cfg["instruction"] = str(cfg.get("instruction", "") or "").strip()
    cfg["dry_run"] = self._to_bool(cfg.get("dry_run"), False)

    return CompactionConfig(
        enabled=cfg["enabled"],
        interval_minutes=cfg["interval_minutes"],
        startup_delay_seconds=cfg["startup_delay_seconds"],
        max_conversations_per_run=cfg["max_conversations_per_run"],
        max_scan_per_run=cfg["max_scan_per_run"],
        scan_page_size=cfg["scan_page_size"],
        min_idle_minutes=cfg["min_idle_minutes"],
        min_messages=cfg["min_messages"],
        target_tokens=cfg["target_tokens"],
        trigger_tokens=cfg["trigger_tokens"],
        max_rounds=cfg["max_rounds"],
        truncate_turns=cfg["truncate_turns"],
        keep_recent=cfg["keep_recent"],
        provider_id=cfg["provider_id"],
        instruction=cfg["instruction"],
        dry_run=cfg["dry_run"],
    )
```

### 3. Use `CompactionConfig` in the main logic

This immediately simplifies the hot paths without changing behavior:

```python
def get_status(self) -> dict[str, Any]:
    cfg = self._load_config()
    return {
        "running": self._running_lock.locked(),
        "bootstrapped": self._bootstrapped,
        "stop_requested": self._stop_event.is_set(),
        "config": cfg.__dict__,
        "last_started_at": self._last_started_at,
        "last_finished_at": self._last_finished_at,
        "last_error": self._last_error,
        "last_report": self._last_report,
    }
```

```python
@staticmethod
def _resolve_run_limits(
    cfg: CompactionConfig,
    max_conversations_override: int | None,
) -> tuple[int, int, int]:
    max_to_compact = max(1, cfg.max_conversations_per_run)
    if max_conversations_override is not None:
        max_to_compact = max(1, int(max_conversations_override))
    max_to_scan = max(max_to_compact, cfg.max_scan_per_run)
    scan_page_size = max(10, cfg.scan_page_size)
    return max_to_compact, max_to_scan, scan_page_size
```

```python
def _check_eligibility(
    self,
    conv: ConversationV2,
    cfg: CompactionConfig,
) -> _EligibilityResult:
    history = conv.content
    if not isinstance(history, list) or len(history) < cfg.min_messages:
        return _EligibilityResult(eligible=False, messages=[], before_tokens=0)

    if not self._is_idle_enough(conv.updated_at, cfg.min_idle_minutes):
        return _EligibilityResult(eligible=False, messages=[], before_tokens=0)

    messages = self._history_parser.parse(history)
    if len(messages) < cfg.min_messages:
        return _EligibilityResult(eligible=False, messages=[], before_tokens=0)

    trusted_usage = conv.token_usage if isinstance(conv.token_usage, int) else 0
    before_tokens = self._token_counter.count_tokens(messages, trusted_usage)
    if before_tokens < cfg.trigger_tokens:
        return _EligibilityResult(eligible=False, messages=[], before_tokens=0)

    return _EligibilityResult(
        eligible=True,
        messages=messages,
        before_tokens=before_tokens,
    )
```

```python
async def _run_compaction_rounds(
    self,
    messages: list[Message],
    provider: Provider,
    cfg: CompactionConfig,
) -> _RoundResult:
    compressed = messages
    changed = False
    rounds = 0
    instruction = self._resolve_instruction(cfg)

    for _ in range(cfg.max_rounds):
        current_tokens = self._token_counter.count_tokens(compressed)
        if current_tokens <= cfg.target_tokens:
            break

        manager = self._build_context_manager(cfg, provider, instruction)
        rounds += 1
        next_messages = await manager.process(compressed)
        if self._messages_equal(compressed, next_messages):
            break

        compressed = next_messages
        changed = True

    return _RoundResult(messages=compressed, changed=changed, rounds=rounds)
```

This refactor keeps all existing behavior but:

- Localizes config normalization to one place.
- Reduces repeated string key lookups all over the class.
- Makes the compaction and eligibility logic more declarative and easier to read/test.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@Jacobinwwey
Copy link
Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue, and left some high level feedback:

  • The helper _safe_json on PeriodicContextCompactionScheduler is currently unused and just delegates to _MessageHistoryParser.safe_json; consider removing it or inlining calls to _MessageHistoryParser.safe_json to avoid dead code.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The helper `_safe_json` on `PeriodicContextCompactionScheduler` is currently unused and just delegates to `_MessageHistoryParser.safe_json`; consider removing it or inlining calls to `_MessageHistoryParser.safe_json` to avoid dead code.

## Individual Comments

### Comment 1
<location path="astrbot/core/context_compaction_scheduler.py" line_range="69" />
<code_context>
+    dry_run: bool
+
+
+class _MessageHistoryParser:
+    def parse(self, history: Iterable[Any]) -> list[Message]:
+        parsed: list[Message] = []
</code_context>
<issue_to_address>
**issue (complexity):** Consider removing the unused `_safe_json` wrapper and extracting `_MessageHistoryParser` into a separate utility module to decouple parsing from scheduling and reduce class size.

You can trim a bit of complexity without changing behavior by:

### 1. Removing dead `_safe_json` wrapper

`PeriodicContextCompactionScheduler._safe_json` is just a pass‑through to `_MessageHistoryParser.safe_json` and is not used anywhere in this file. Dropping it reduces surface area without changing functionality.

```python
class PeriodicContextCompactionScheduler:
    ...
-    @staticmethod
-    def _safe_json(value: Any) -> str | None:
-        return _MessageHistoryParser.safe_json(value)
```

If you keep it for future use, consider at least inlining callers to `_MessageHistoryParser.safe_json` when they appear instead of going through an extra indirection.

### 2. Moving `_MessageHistoryParser` into a dedicated utility

The message parsing/sanitization logic is sizeable and orthogonal to scheduling. Extracting it into a small utility module shrinks this class and clarifies responsibilities without changing behavior.

For example:

```python
# new file: astrbot/core/agent/message_history_parser.py

from __future__ import annotations
import json
from collections.abc import Iterable
from dataclasses import dataclass
from typing import Any

from astrbot.core.agent.message import Message

class MessageHistoryParser:
    def parse(self, history: Iterable[Any]) -> list[Message]:
        ...
    def sanitize_message_dict(self, item: dict[str, Any]) -> dict[str, Any] | None:
        ...
    def sanitize_content(self, content: Any, role: str) -> str | list[dict] | None:
        ...
    def sanitize_list_content(self, content: list[Any]) -> str | list[dict]:
        ...
    def sanitize_content_part(...):
        ...
    @staticmethod
    def safe_json(value: Any) -> str | None:
        ...
```

Then in the scheduler:

```python
-from astrbot.core.agent.message import Message
+from astrbot.core.agent.message import Message
+from astrbot.core.agent.message_history_parser import MessageHistoryParser

class PeriodicContextCompactionScheduler:
    def __init__(...):
        ...
-        self._history_parser = _MessageHistoryParser()
+        self._history_parser = MessageHistoryParser()
```

and remove the `_MessageHistoryParser` class from this file.

This keeps all behavior intact while making the scheduler easier to read and the parser reusable elsewhere.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@Jacobinwwey
Copy link
Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue, and left some high level feedback:

  • In the ChatProviderTemplate schema for periodic_context_compaction, the object definition uses an items field with keyed children instead of a properties/equivalent map (as used elsewhere for objects); double-check this matches your config validation engine’s expected shape, otherwise these nested fields may not be validated or surfaced correctly in the UI.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In the ChatProviderTemplate schema for `periodic_context_compaction`, the object definition uses an `items` field with keyed children instead of a `properties`/equivalent map (as used elsewhere for objects); double-check this matches your config validation engine’s expected shape, otherwise these nested fields may not be validated or surfaced correctly in the UI.

## Individual Comments

### Comment 1
<location path="astrbot/core/context_compaction_scheduler.py" line_range="283" />
<code_context>
+    def _resolve_wait_seconds(cfg: CompactionConfig) -> int:
+        return max(1, int(cfg.interval_minutes)) * 60
+
+    def _load_config(self) -> CompactionConfig:
+        raw_cfg = self._load_raw_config()
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting pure config-handling logic into a factory and constructing the ContextManager once per compaction to isolate responsibilities and simplify the scheduler’s control flow.

You can reduce the “god object” feel and simplify reasoning by extracting two self‑contained responsibilities without changing behavior:

1. **Config loading/normalization → dedicated factory**
2. **Multi‑round compaction → avoid rebuilding static pieces every round**

---

### 1. Extract config loading into a standalone factory

`_load_config`, `_load_raw_config`, `_resolve_trigger_tokens`, `_to_bool`, `_to_int` are all pure helpers and don’t need access to scheduler state. Pulling them out makes the scheduler smaller and lets you reuse them elsewhere.

For example, at module level:

```python
@dataclass(frozen=True)
class CompactionConfig:
    enabled: bool
    interval_minutes: int
    startup_delay_seconds: int
    max_conversations_per_run: int
    max_scan_per_run: int
    scan_page_size: int
    min_idle_minutes: int
    min_messages: int
    target_tokens: int
    trigger_tokens: int
    max_rounds: int
    truncate_turns: int
    keep_recent: int
    provider_id: str
    instruction: str
    dry_run: bool


def load_compaction_config(default_conf: dict[str, Any]) -> CompactionConfig:
    provider_settings = default_conf.get("provider_settings", {})
    raw_cfg = provider_settings.get("periodic_context_compaction", {})
    if not isinstance(raw_cfg, dict):
        raw_cfg = {}

    cfg = dict(PERIODIC_CONTEXT_COMPACTION_DEFAULTS)
    cfg.update(raw_cfg)

    enabled = _to_bool(cfg.get("enabled"), False)
    interval_minutes = _to_int(cfg.get("interval_minutes"), 30, 1)
    startup_delay_seconds = _to_int(cfg.get("startup_delay_seconds"), 120, 0)
    max_conversations_per_run = _to_int(cfg.get("max_conversations_per_run"), 8, 1)
    max_scan_per_run = _to_int(cfg.get("max_scan_per_run"), 120, 1)
    scan_page_size = _to_int(cfg.get("scan_page_size"), 40, 10)
    min_idle_minutes = _to_int(cfg.get("min_idle_minutes"), 15, 0)
    min_messages = _to_int(cfg.get("min_messages"), 14, 2)
    target_tokens = _to_int(cfg.get("target_tokens"), 4096, 512)
    trigger_tokens = _resolve_trigger_tokens(target_tokens, raw_cfg)
    max_rounds = _to_int(cfg.get("max_rounds"), 3, 1)
    truncate_turns = _to_int(cfg.get("truncate_turns"), 1, 1)
    keep_recent = _to_int(cfg.get("keep_recent"), 6, 0)
    provider_id = str(cfg.get("provider_id", "") or "").strip()
    instruction = str(cfg.get("instruction", "") or "").strip()
    dry_run = _to_bool(cfg.get("dry_run"), False)

    return CompactionConfig(
        enabled=enabled,
        interval_minutes=interval_minutes,
        startup_delay_seconds=startup_delay_seconds,
        max_conversations_per_run=max_conversations_per_run,
        max_scan_per_run=max_scan_per_run,
        scan_page_size=scan_page_size,
        min_idle_minutes=min_idle_minutes,
        min_messages=min_messages,
        target_tokens=target_tokens,
        trigger_tokens=trigger_tokens,
        max_rounds=max_rounds,
        truncate_turns=truncate_turns,
        keep_recent=keep_recent,
        provider_id=provider_id,
        instruction=instruction,
        dry_run=dry_run,
    )


def _to_bool(value: Any, default: bool) -> bool:
    # same implementation as now
    ...


def _to_int(value: Any, default: int, min_value: int) -> int:
    # same implementation as now
    ...


def _resolve_trigger_tokens(target_tokens: int, raw_cfg: dict[str, Any]) -> int:
    # same implementation as now
    ...
```

Then inside the scheduler:

```python
class PeriodicContextCompactionScheduler:
    ...

    def _load_config(self) -> CompactionConfig:
        return load_compaction_config(self.config_manager.default_conf)
```

This shrinks the class surface and makes the config behavior explicit and testable with a simple unit test on `load_compaction_config` without instantiating the scheduler.

---

### 2. Avoid rebuilding `ContextManager` and re‑resolving instruction each round

`_run_compaction_rounds` currently recomputes `instruction` and builds a new `ContextManager` every loop iteration, even though `cfg`, `provider`, and `instruction` are invariant across rounds.

You can construct the manager once per compaction:

```python
async def _run_compaction_rounds(
    self,
    messages: list[Message],
    provider: Provider,
    cfg: CompactionConfig,
) -> _RoundResult:
    compressed = messages
    changed = False
    rounds = 0

    instruction = self._resolve_instruction(cfg)
    manager = self._build_context_manager(cfg, provider, instruction)

    for _ in range(cfg.max_rounds):
        current_tokens = self._token_counter.count_tokens(compressed)
        if current_tokens <= cfg.target_tokens:
            break

        rounds += 1
        next_messages = await manager.process(compressed)
        if self._messages_equal(compressed, next_messages):
            break

        compressed = next_messages
        changed = True

    return _RoundResult(messages=compressed, changed=changed, rounds=rounds)
```

This keeps behavior identical but reduces per‑round overhead and makes the control flow easier to follow: “prepare manager once → loop until under target or no change.”
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@Jacobinwwey
Copy link
Author

@sourcery-ai review

Copy link

@SourceryAI SourceryAI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="astrbot/core/context_compaction_scheduler.py" line_range="103" />
<code_context>
+    return trigger_tokens
+
+
+def load_compaction_config(
+    default_conf: dict[str, Any],
+    defaults: dict[str, Any] | None = None,
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the scheduler’s configuration and bookkeeping by removing defensive parsing and small helper types to make the control flow more direct and readable.

You can drop quite a bit of indirection here without losing any functionality.

### 1. Simplify config normalization

Given `default_conf` is already schema‑validated, you can assume proper types and avoid the very defensive `_to_bool`/`_to_int`/`_resolve_trigger_tokens` logic.

Example:

```python
def load_compaction_config(default_conf: dict[str, Any]) -> CompactionConfig:
    cfg = {
        **PERIODIC_CONTEXT_COMPACTION_DEFAULTS,
        **default_conf.get("provider_settings", {}).get("periodic_context_compaction", {}),
    }

    interval_minutes = max(cfg.get("interval_minutes", 30), 1)
    startup_delay_seconds = max(cfg.get("startup_delay_seconds", 120), 0)
    max_conversations_per_run = max(cfg.get("max_conversations_per_run", 8), 1)
    max_scan_per_run = max(cfg.get("max_scan_per_run", 120), 1)
    scan_page_size = max(cfg.get("scan_page_size", 40), 10)
    min_idle_minutes = max(cfg.get("min_idle_minutes", 15), 0)
    min_messages = max(cfg.get("min_messages", 14), 2)
    target_tokens = max(cfg.get("target_tokens", 4096), 512)

    raw_trigger = cfg.get("trigger_tokens")
    trigger_default = max(int(target_tokens * 1.5), target_tokens + 1)
    trigger_tokens = max(raw_trigger or trigger_default, 512)
    if trigger_tokens <= target_tokens:
        trigger_tokens = target_tokens + 1

    return CompactionConfig(
        enabled=bool(cfg.get("enabled", False)),
        interval_minutes=interval_minutes,
        startup_delay_seconds=startup_delay_seconds,
        max_conversations_per_run=max_conversations_per_run,
        max_scan_per_run=max_scan_per_run,
        scan_page_size=scan_page_size,
        min_idle_minutes=min_idle_minutes,
        min_messages=min_messages,
        target_tokens=target_tokens,
        trigger_tokens=trigger_tokens,
        max_rounds=max(cfg.get("max_rounds", 3), 1),
        truncate_turns=max(cfg.get("truncate_turns", 1), 1),
        keep_recent=max(cfg.get("keep_recent", 6), 0),
        provider_id=(cfg.get("provider_id") or "").strip(),
        instruction=(cfg.get("instruction") or "").strip(),
        dry_run=bool(cfg.get("dry_run", False)),
    )
```

This keeps clamping behavior but removes string parsing and most branching.

### 2. Remove duplicated defaults

`_DEFAULTS = dict(PERIODIC_CONTEXT_COMPACTION_DEFAULTS)` adds indirection without extra behavior. You can read directly from the shared constant:

```python
class PeriodicContextCompactionScheduler:
    def _load_config(self) -> CompactionConfig:
        return load_compaction_config(self.config_manager.default_conf)
```

And drop `_DEFAULTS` and the `defaults` parameter from `load_compaction_config`.

### 3. Collapse status bookkeeping into a single object

Instead of four separate attributes and a dict, you can track the last run in one dataclass. This makes `get_status` smaller and easier to scan:

```python
@dataclass
class _LastRun:
    started_at: str | None = None
    finished_at: str | None = None
    error: str | None = None
    report: dict[str, Any] | None = None

class PeriodicContextCompactionScheduler:
    def __init__(...):
        ...
        self._last_run = _LastRun()

    def get_status(self) -> dict[str, Any]:
        cfg = self._load_config()
        return {
            "running": self._running_lock.locked(),
            "bootstrapped": self._bootstrapped,
            "stop_requested": self._stop_event.is_set(),
            "config": asdict(cfg),
            **asdict(self._last_run),
        }

    async def run_once(...):
        async with self._running_lock:
            self._last_run.started_at = self._now_iso()
            ...
            self._last_run.report = report
            self._last_run.finished_at = self._now_iso()
            self._last_run.error = None
            return report
```

### 4. Inline very small helpers and drop micro‑dataclasses

Helpers that are one line and used once/twice add more jump points than value. Same for `_EligibilityResult` and `_RoundResult`, which are purely internal and very narrow in use.

Example: inline `_resolve_wait_seconds` and `_record_outcome`, and replace `_EligibilityResult` with a simple `None` / dict:

```python
async def run(self) -> None:
    ...
    while not self._stop_event.is_set():
        cfg = self._load_config()
        wait_seconds = max(1, cfg.interval_minutes) * 60
        ...

def _check_eligibility(
    self, conv: ConversationV2, cfg: CompactionConfig
) -> dict[str, Any] | None:
    history = conv.content
    if not isinstance(history, list) or len(history) < cfg.min_messages:
        return None
    if not self._is_idle_enough(conv.updated_at, cfg.min_idle_minutes):
        return None

    messages = self._history_parser.parse(history)
    if len(messages) < cfg.min_messages:
        return None

    trusted_usage = conv.token_usage if isinstance(conv.token_usage, int) else 0
    before_tokens = self._token_counter.count_tokens(messages, trusted_usage)
    if before_tokens < cfg.trigger_tokens:
        return None

    return {"messages": messages, "before_tokens": before_tokens}

async def _compact_one_conversation(...):
    eligibility = self._check_eligibility(conv, cfg)
    if not eligibility:
        return "skipped"
    ...
    after_tokens = self._token_counter.count_tokens(compressed)
    ...
```

Similarly, `_RoundResult` can be replaced with `(messages, changed, rounds)` or a small dict, and `round_result.rounds` / `.messages` updated accordingly. This removes two internal types and a few helper methods, making the control flow easier to follow without changing behavior.
</issue_to_address>

Hi @Jacobinwwey! 👋

Thanks for trying out Sourcery by commenting with @sourcery-ai review! 🚀

Install the sourcery-ai bot to get automatic code reviews on every pull request ✨

Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="astrbot/core/context_memory.py" line_range="8-17" />
<code_context>
+
+from astrbot.core.utils.config_normalization import to_bool, to_int
+
+DEFAULT_CONTEXT_MEMORY_SETTINGS: dict[str, Any] = {
+    # Global switch for context-memory related features.
+    "enabled": False,
+    # Manually maintained top-level memories injected into system prompt.
+    "inject_pinned_memory": True,
+    "pinned_memories": [],
+    "pinned_max_items": 8,
+    "pinned_max_chars_per_item": 400,
+    # Retrieval enhancement is intentionally reserved for future PRs.
+    "retrieval_enabled": False,
+    "retrieval_backend": "",
+    "retrieval_provider_id": "",
+    "retrieval_top_k": 5,
+}
+
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Using a dict with a list value as a default template is safe only because you always copy; consider making this more explicit.

`DEFAULT_CONTEXT_MEMORY_SETTINGS` includes a mutable `"pinned_memories": []`, but `normalize_context_memory_settings` first copies the dict and then immediately overwrites `normalized["pinned_memories"]`, so the shared list isn’t mutated today.

To keep this obviously safe and future-proof, you could either:
- remove `"pinned_memories"` from the defaults and always set it in `normalize_context_memory_settings`, or
- deep-copy the defaults (e.g. `copy.deepcopy`) so any future mutable fields aren’t accidentally shared across configs.

Suggested implementation:

```python
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any, Protocol, runtime_checkable

from astrbot.core.utils.config_normalization import to_bool, to_int


from dataclasses import dataclass, field
from typing import Any, Protocol, runtime_checkable

from astrbot.core.utils.config_normalization import to_bool, to_int

DEFAULT_CONTEXT_MEMORY_SETTINGS: dict[str, Any] = {
    # Global switch for context-memory related features.
    "enabled": False,
    # Manually maintained top-level memories injected into system prompt.
    "inject_pinned_memory": True,
    "pinned_max_items": 8,
    "pinned_max_chars_per_item": 400,
    # Retrieval enhancement is intentionally reserved for future PRs.
    "retrieval_enabled": False,
    "retrieval_backend": "",
    "retrieval_provider_id": "",
    "retrieval_top_k": 5,
}

```

To fully implement the suggestion (removing the mutable default while keeping behavior):

1. In `normalize_context_memory_settings` (or the equivalent normalization function in this file), ensure that `"pinned_memories"` is *always* set explicitly on the normalized settings, e.g.:
   - If the user provides a list, validate/normalize it and assign it.
   - Otherwise, assign a new empty list: `normalized["pinned_memories"] = []`.
2. Remove any reliance on `"pinned_memories"` being present in `DEFAULT_CONTEXT_MEMORY_SETTINGS`; treat it as a field that is always initialized during normalization instead.
3. If there are tests asserting the shape of default settings, update them to expect `"pinned_memories"` to be present only after normalization (not necessarily in the raw defaults).
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@Jacobinwwey
Copy link
Author

@sourcery-ai review

Copy link

@SourceryAI SourceryAI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue, and left some high level feedback:

  • The context-memory defaults are currently defined in multiple places (CONTEXT_MEMORY_DEFAULTS in core/config/default.py and DEFAULT_CONTEXT_MEMORY_SETTINGS in core/context_memory.py); consider consolidating these into a single source of truth to avoid future drift between schema/config and runtime behavior.
  • In PostToolCompactionController.should_compact, _last_check_at is updated even when the call is rejected by the debounce window, which effectively extends the debounce period on every call; if the intent is a fixed cool-down between actual compaction checks, you may want to update _last_check_at only when the debounce condition is not hit.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The context-memory defaults are currently defined in multiple places (`CONTEXT_MEMORY_DEFAULTS` in `core/config/default.py` and `DEFAULT_CONTEXT_MEMORY_SETTINGS` in `core/context_memory.py`); consider consolidating these into a single source of truth to avoid future drift between schema/config and runtime behavior.
- In `PostToolCompactionController.should_compact`, `_last_check_at` is updated even when the call is rejected by the debounce window, which effectively extends the debounce period on every call; if the intent is a fixed cool-down between actual compaction checks, you may want to update `_last_check_at` only when the debounce condition is not hit.

## Individual Comments

### Comment 1
<location path="astrbot/core/agent/runners/tool_loop_agent_runner.py" line_range="127-133" />
<code_context>
+        if not self.config.enabled:
+            return False
+
+        now = time.monotonic()
+        if (
+            self.config.debounce_seconds > 0
+            and self._last_check_at > 0
+            and (now - self._last_check_at) < self.config.debounce_seconds
+        ):
+            self._last_check_at = now
+            return False
+        self._last_check_at = now
</code_context>
<issue_to_address>
**issue (bug_risk):** Debounce window is effectively extended on each suppressed check and may starve compaction under frequent tool calls

In `PostToolCompactionController.should_compact`, the debounce branch both returns `False` and updates `self._last_check_at` to `now`. Under frequent tool calls, this keeps sliding the reference time forward and can prevent compaction from ever running while traffic is busy.

To avoid this, only update `_last_check_at` when a check is actually performed (i.e., in the non-early-return path), so the debounce window is measured from the last real check instead of the last suppressed one.
</issue_to_address>

Hi @Jacobinwwey! 👋

Thanks for trying out Sourcery by commenting with @sourcery-ai review! 🚀

Install the sourcery-ai bot to get automatic code reviews on every pull request ✨

Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="astrbot/core/context_memory.py" line_range="8-17" />
<code_context>
+
+from astrbot.core.utils.config_normalization import to_bool, to_int
+
+DEFAULT_CONTEXT_MEMORY_SETTINGS: dict[str, Any] = {
+    # Global switch for context-memory related features.
+    "enabled": False,
+    # Manually maintained top-level memories injected into system prompt.
+    "inject_pinned_memory": True,
+    "pinned_max_items": 8,
+    "pinned_max_chars_per_item": 400,
+    # Retrieval enhancement is intentionally reserved for future PRs.
+    "retrieval_enabled": False,
+    "retrieval_backend": "",
+    "retrieval_provider_id": "",
+    "retrieval_top_k": 5,
+}
+
</code_context>
<issue_to_address>
**suggestion:** Context-memory defaults are duplicated between `CONTEXT_MEMORY_DEFAULTS` and `DEFAULT_CONTEXT_MEMORY_SETTINGS`, which risks drift.

`CONTEXT_MEMORY_DEFAULTS` in `core/config/default.py` and `DEFAULT_CONTEXT_MEMORY_SETTINGS` here both define the same defaults, which can drift over time (e.g., missing keys or mismatched values) and cause subtle inconsistencies between persisted config and normalization. It’d be better to derive one from the other or move these defaults into a single shared module so there’s only one source of truth.

Suggested implementation:

```python
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any, Protocol, runtime_checkable

from astrbot.core.config.default import CONTEXT_MEMORY_DEFAULTS
from astrbot.core.utils.config_normalization import to_bool, to_int

# Keep a local copy so this module can freely mutate or extend settings at runtime
# without affecting the global defaults object.
DEFAULT_CONTEXT_MEMORY_SETTINGS: dict[str, Any] = dict(CONTEXT_MEMORY_DEFAULTS)

```

To fully apply this approach, confirm that:
1. `CONTEXT_MEMORY_DEFAULTS` is defined and exported from `astrbot/core/config/default.py` with all the keys previously present in `DEFAULT_CONTEXT_MEMORY_SETTINGS` (e.g., `enabled`, `inject_pinned_memory`, `pinned_max_items`, `pinned_max_chars_per_item`, `retrieval_enabled`, `retrieval_backend`, `retrieval_provider_id`, `retrieval_top_k`).
2. Any future changes to context-memory defaults are made only in `CONTEXT_MEMORY_DEFAULTS`, so `DEFAULT_CONTEXT_MEMORY_SETTINGS` stays in sync automatically.
</issue_to_address>

### Comment 2
<location path="astrbot/core/context_memory.py" line_range="23" />
<code_context>
+}
+
+
+@runtime_checkable
+class VectorLongTermMemoryRetriever(Protocol):
+    """Reserved protocol for future vector-DB long-term memory retrieval."""
</code_context>
<issue_to_address>
**issue (complexity):** Consider moving the protocol interfaces and global backend state into a separate `context_memory_backends` module so this file focuses solely on configuration and pinned-memory rendering.

You can keep all current functionality and future hooks while reducing complexity by factoring out the unused backends/interfaces into a dedicated module so this file only focuses on the shipped feature (config + pinned-memory system block).

### 1. Move protocols and globals into a separate module

Create a new module, e.g. `context_memory_backends.py`:

```python
# context_memory_backends.py
from __future__ import annotations

from typing import Any, Protocol, runtime_checkable


@runtime_checkable
class VectorLongTermMemoryRetriever(Protocol):
    async def retrieve(
        self,
        *,
        unified_msg_origin: str,
        query: str,
        top_k: int,
    ) -> list[str]:
        ...


@runtime_checkable
class ContextMemoryEvolutionBackend(Protocol):
    async def evolve(
        self,
        *,
        unified_msg_origin: str,
        turns: list[str],
        metadata: dict[str, Any] | None = None,
    ) -> dict[str, Any]:
        ...

    async def retrieve(
        self,
        *,
        unified_msg_origin: str,
        query: str,
        top_k: int,
    ) -> list[str]:
        ...


@runtime_checkable
class ContextMemoryMigrationAdapter(Protocol):
    async def export_session(
        self,
        *,
        unified_msg_origin: str,
    ) -> dict[str, Any]:
        ...

    async def import_session(
        self,
        *,
        unified_msg_origin: str,
        payload: dict[str, Any],
    ) -> None:
        ...


_context_memory_evolution_backend: ContextMemoryEvolutionBackend | None = None
_context_memory_migration_adapter: ContextMemoryMigrationAdapter | None = None


def set_context_memory_evolution_backend(
    backend: ContextMemoryEvolutionBackend | None,
) -> None:
    global _context_memory_evolution_backend
    _context_memory_evolution_backend = backend


def get_context_memory_evolution_backend() -> ContextMemoryEvolutionBackend | None:
    return _context_memory_evolution_backend


def set_context_memory_migration_adapter(
    adapter: ContextMemoryMigrationAdapter | None,
) -> None:
    global _context_memory_migration_adapter
    _context_memory_migration_adapter = adapter


def get_context_memory_migration_adapter() -> ContextMemoryMigrationAdapter | None:
    return _context_memory_migration_adapter
```

Then trim the current file down to just config/normalization/rendering:

```python
# context_memory.py
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any

from astrbot.core.utils.config_normalization import to_bool, to_int

DEFAULT_CONTEXT_MEMORY_SETTINGS: dict[str, Any] = {
    "enabled": False,
    "inject_pinned_memory": True,
    "pinned_max_items": 8,
    "pinned_max_chars_per_item": 400,
    # keep retrieval settings, but they no longer clutter this module with backends
    "retrieval_enabled": False,
    "retrieval_backend": "",
    "retrieval_provider_id": "",
    "retrieval_top_k": 5,
}

# ... keep ContextMemoryConfig, normalize_context_memory_settings,
# load_context_memory_config, ensure_context_memory_settings,
# build_pinned_memory_system_block as-is ...
```

If/when you wire in evolution/migration/retrieval in a future PR, you can import from `context_memory_backends` where needed. This keeps the public APIs and future hooks intact, but makes the current module easier to read and focused on the shipped behavior.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@Jacobinwwey
Copy link
Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues, and left some high level feedback:

  • In PeriodicContextCompactionScheduler._compact_one_conversation, dry-run executions are reported as "skipped", which makes it hard to distinguish true skips from dry-run compactions in stats; consider using a distinct outcome flag or including a dry-run marker in the report so operators can see how many conversations would have been compacted.
  • You’ve introduced generic normalization helpers in core/utils/config_normalization.py, but ContextMemoryCommands._parse_switch and several spots where ints/bools are parsed (e.g. ContextMemoryCommands.add/ls and parts of the scheduler) still reimplement parsing logic; reusing to_bool/to_int there would reduce duplication and keep behavior consistent across features.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `PeriodicContextCompactionScheduler._compact_one_conversation`, dry-run executions are reported as `"skipped"`, which makes it hard to distinguish true skips from dry-run compactions in stats; consider using a distinct outcome flag or including a dry-run marker in the report so operators can see how many conversations would have been compacted.
- You’ve introduced generic normalization helpers in `core/utils/config_normalization.py`, but `ContextMemoryCommands._parse_switch` and several spots where ints/bools are parsed (e.g. `ContextMemoryCommands.add/ls` and parts of the scheduler) still reimplement parsing logic; reusing `to_bool`/`to_int` there would reduce duplication and keep behavior consistent across features.

## Individual Comments

### Comment 1
<location path="tests/agent/test_token_counter.py" line_range="110-119" />
<code_context>
         assert tokens > text_only
+
+
+class TestCounterFactory:
+    def test_create_estimate_mode(self):
+        created = create_token_counter("estimate")
+        assert isinstance(created, EstimateTokenCounter)
+
+    def test_create_unknown_mode_fallback(self):
+        created = create_token_counter("unknown-mode")
+        assert isinstance(created, EstimateTokenCounter)
+
+    def test_create_tokenizer_mode_returns_valid_counter_type(self):
+        created = create_token_counter("tokenizer", model="gpt-4")
+        assert isinstance(created, (TokenizerTokenCounter, EstimateTokenCounter))
+
</code_context>
<issue_to_address>
**suggestion (testing):** Strengthen tests around `TokenizerTokenCounter.count_tokens`, especially trusted usage and fallback paths

The new tests cover factory behavior and a broken `tiktoken` import. To more fully exercise `TokenizerTokenCounter`, please also add:

1) A test where `TokenizerTokenCounter.count_tokens` is called with `trusted_token_usage > 0` and mixed message parts, asserting the trusted value is returned and `_encode` is never called.
2) A test where `_encode` is patched to raise once, asserting `_encode_len` falls back to `EstimateTokenCounter.estimate_text_tokens` instead of propagating the error.
3) An integration-style test with `TextPart`, `ThinkPart`, `ImageURLPart`, and `AudioURLPart` in `messages` to confirm non-text modalities use their fixed estimates in tokenizer mode.

This will better specify tokenizer-based counting and its resilience guarantees.

Suggested implementation:

```python
from astrbot.core.agent.context.token_counter import (
    AUDIO_TOKEN_ESTIMATE,
    IMAGE_TOKEN_ESTIMATE,
    EstimateTokenCounter,
    TokenizerTokenCounter,
    create_token_counter,
)
from astrbot.core.agent.context.message import (
    TextPart,
    ThinkPart,
    ImageURLPart,
    AudioURLPart,
)


class TestCounterFactory:
    def test_create_estimate_mode(self):
        created = create_token_counter("estimate")
        assert isinstance(created, EstimateTokenCounter)

    def test_create_unknown_mode_fallback(self):
        created = create_token_counter("unknown-mode")
        assert isinstance(created, EstimateTokenCounter)

    def test_create_tokenizer_mode_returns_valid_counter_type(self):
        created = create_token_counter("tokenizer", model="gpt-4")
        assert isinstance(created, (TokenizerTokenCounter, EstimateTokenCounter))


class TestTokenizerTokenCounterBehavior:
    def test_count_tokens_uses_trusted_usage_and_skips_encode(self, monkeypatch):
        counter = TokenizerTokenCounter(model="gpt-4")

        # if this is ever called, the test should fail
        def _encode_should_not_be_called(*args, **kwargs):  # pragma: no cover - safety
            raise AssertionError("_encode should not be called when trusted_token_usage is provided")

        monkeypatch.setattr(counter, "_encode", _encode_should_not_be_called)

        messages = [
            {
                "role": "user",
                "content": [
                    TextPart(text="hello"),
                    ThinkPart(text="internal thoughts"),
                ],
            },
            {
                "role": "assistant",
                "content": [
                    TextPart(text="answer"),
                ],
            },
        ]

        trusted_usage = 123
        result = counter.count_tokens(messages, trusted_token_usage=trusted_usage)
        assert result == trusted_usage

    def test_encode_error_falls_back_to_estimate_text_tokens(self, monkeypatch):
        counter = TokenizerTokenCounter(model="gpt-4")

        def broken_encode(_text):
            raise RuntimeError("tiktoken failure")

        monkeypatch.setattr(counter, "_encode", broken_encode)

        captured = {}

        def fake_estimate(text: str) -> int:
            captured["text"] = text
            return 42

        # estimate_text_tokens is used as a fallback path
        monkeypatch.setattr(
            EstimateTokenCounter,
            "estimate_text_tokens",
            staticmethod(fake_estimate),
        )

        result = counter._encode_len("fallback text")
        assert result == 42
        assert captured["text"] == "fallback text"

    def test_tokenizer_mode_mixed_modalities_use_fixed_estimates(self, monkeypatch):
        counter = TokenizerTokenCounter(model="gpt-4")

        # Make tokenizer behavior deterministic and independent of real tiktoken
        def fake_encode_len(text: str) -> int:
            # count tokens as simple word count to make the expectation clear
            return len(text.split())

        monkeypatch.setattr(counter, "_encode_len", fake_encode_len)

        messages = [
            {
                "role": "user",
                "content": [
                    TextPart(text="hello world"),
                    ImageURLPart(url="http://example.com/image.png"),
                ],
            },
            {
                "role": "assistant",
                "content": [
                    ThinkPart(text="thinking hard"),
                    AudioURLPart(url="http://example.com/audio.mp3"),
                ],
            },
        ]

        # text tokens via tokenizer, non-text via fixed estimates
        expected_text_tokens = fake_encode_len("hello world") + fake_encode_len("thinking hard")
        expected_non_text_tokens = IMAGE_TOKEN_ESTIMATE + AUDIO_TOKEN_ESTIMATE

        tokens = counter.count_tokens(messages)
        assert tokens == expected_text_tokens + expected_non_text_tokens

```

These edits assume:
1. `TextPart`, `ThinkPart`, `ImageURLPart`, and `AudioURLPart` are imported from `astrbot.core.agent.context.message` and are valid content elements for `messages` as used by `TokenizerTokenCounter.count_tokens`.
2. `TokenizerTokenCounter.count_tokens` accepts a `trusted_token_usage` keyword argument and short-circuits to it without invoking `_encode`/`_encode_len` when `trusted_token_usage > 0`.
3. `TokenizerTokenCounter` exposes a `_encode_len` helper that is used by `count_tokens` to compute text token lengths.
If any of these APIs differ in your codebase, adjust the tests’ `messages` structure, method names, or the import path for content-part classes to match the actual implementations.
</issue_to_address>

### Comment 2
<location path="astrbot/core/context_memory_backends.py" line_range="6" />
<code_context>
+from typing import Any, Protocol, runtime_checkable
+
+
+@runtime_checkable
+class VectorLongTermMemoryRetriever(Protocol):
+    """Reserved protocol for future vector-DB long-term memory retrieval."""
</code_context>
<issue_to_address>
**issue (complexity):** Consider moving these protocol and global hook definitions into a separate clearly marked experimental/internal module so the main context-memory module presents a smaller, more focused API surface.

You can keep the hooks while shrinking the conceptual surface by moving them into a clearly marked “experimental”/internal module so they don’t distract from the core context‑memory feature.

For example, instead of defining all three protocols and globals in the main context‑memory module, extract them:

```python
# context_memory_experimental_backends.py (or similar internal/experimental module)

from __future__ import annotations
from typing import Any, Protocol, runtime_checkable

@runtime_checkable
class VectorLongTermMemoryRetriever(Protocol):
    async def retrieve(
        self,
        *,
        unified_msg_origin: str,
        query: str,
        top_k: int,
    ) -> list[str]:
        ...

@runtime_checkable
class ContextMemoryEvolutionBackend(Protocol):
    async def evolve(
        self,
        *,
        unified_msg_origin: str,
        turns: list[str],
        metadata: dict[str, Any] | None = None,
    ) -> dict[str, Any]:
        ...

    async def retrieve(
        self,
        *,
        unified_msg_origin: str,
        query: str,
        top_k: int,
    ) -> list[str]:
        ...

@runtime_checkable
class ContextMemoryMigrationAdapter(Protocol):
    async def export_session(
        self,
        *,
        unified_msg_origin: str,
    ) -> dict[str, Any]:
        ...

    async def import_session(
        self,
        *,
        unified_msg_origin: str,
        payload: dict[str, Any],
    ) -> None:
        ...

_context_memory_evolution_backend: ContextMemoryEvolutionBackend | None = None
_context_memory_migration_adapter: ContextMemoryMigrationAdapter | None = None

def set_context_memory_evolution_backend(
    backend: ContextMemoryEvolutionBackend | None,
) -> None:
    global _context_memory_evolution_backend
    _context_memory_evolution_backend = backend

def get_context_memory_evolution_backend() -> ContextMemoryEvolutionBackend | None:
    return _context_memory_evolution_backend

def set_context_memory_migration_adapter(
    adapter: ContextMemoryMigrationAdapter | None,
) -> None:
    global _context_memory_migration_adapter
    _context_memory_migration_adapter = adapter

def get_context_memory_migration_adapter() -> ContextMemoryMigrationAdapter | None:
    return _context_memory_migration_adapter
```

Then in the main context‑memory module, only surface these as explicitly experimental hooks (or not at all, if no call sites yet), so readers can focus on the shipped behavior:

```python
# main context memory module

from .context_memory_experimental_backends import (
    VectorLongTermMemoryRetriever,
    ContextMemoryEvolutionBackend,
    ContextMemoryMigrationAdapter,
    get_context_memory_evolution_backend,
    set_context_memory_evolution_backend,
    get_context_memory_migration_adapter,
    set_context_memory_migration_adapter,
)

# ...core pinned system memory logic here...
```

This keeps all current functionality and extension points but makes the main feature easier to understand by clearly cordoning off future‑facing abstractions.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@Jacobinwwey
Copy link
Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues, and left some high level feedback:

  • In PeriodicContextCompactionScheduler._compact_one_conversation, treating a missing/unresolvable provider as a "failed" outcome conflates infra/config issues with genuine compaction failures; consider classifying this path as "skipped" (and logging) so failure stats better reflect actual compaction errors.
  • The ContextMemoryCommands._parse_switch logic duplicates the semantics of to_bool in core.utils.config_normalization; you could reuse that helper (with an explicit default) to keep switch parsing consistent with the rest of the config-normalization code.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `PeriodicContextCompactionScheduler._compact_one_conversation`, treating a missing/unresolvable provider as a `"failed"` outcome conflates infra/config issues with genuine compaction failures; consider classifying this path as `"skipped"` (and logging) so failure stats better reflect actual compaction errors.
- The `ContextMemoryCommands._parse_switch` logic duplicates the semantics of `to_bool` in `core.utils.config_normalization`; you could reuse that helper (with an explicit default) to keep switch parsing consistent with the rest of the config-normalization code.

## Individual Comments

### Comment 1
<location path="astrbot/core/context_memory.py" line_range="26" />
<code_context>
+    return defaults
+
+
+DEFAULT_CONTEXT_MEMORY_SETTINGS: dict[str, Any] = _clone_context_memory_defaults()
+
+__all__ = [
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying this module by letting ContextMemoryConfig handle normalization directly, removing the mutable global defaults, and keeping backend exports in their own module to reduce indirection and coupling.

You can keep the same behavior while reducing indirection and shared mutable state by:

1. Collapsing `DEFAULT_CONTEXT_MEMORY_SETTINGS` + `normalize_context_memory_settings` + `load_context_memory_config` into a single normalization/constructor entry point on `ContextMemoryConfig`.
2. Eliminating the mutable global `DEFAULT_CONTEXT_MEMORY_SETTINGS` and `_clone_context_memory_defaults`.
3. Decoupling the config module from the experimental backend exports.

### 1. Let the dataclass own normalization

Instead of a free `normalize_context_memory_settings` + `load_context_memory_config`, you can push this into a `@classmethod` on `ContextMemoryConfig` that:
- pulls from `provider_settings["context_memory"]`
- applies all the current `to_bool` / `to_int` logic
- returns an immutable config instance

Example:

```python
@dataclass(frozen=True)
class ContextMemoryConfig:
    enabled: bool = False
    inject_pinned_memory: bool = True
    pinned_memories: list[str] = field(default_factory=list)
    pinned_max_items: int = 8
    pinned_max_chars_per_item: int = 400
    retrieval_enabled: bool = False
    retrieval_backend: str = ""
    retrieval_provider_id: str = ""
    retrieval_top_k: int = 5

    @classmethod
    def from_settings(cls, provider_settings: dict[str, Any] | None) -> "ContextMemoryConfig":
        raw = (provider_settings or {}).get("context_memory") or {}
        # Use CONTEXT_MEMORY_DEFAULTS directly as immutable defaults
        d = CONTEXT_MEMORY_DEFAULTS

        enabled = to_bool(raw.get("enabled"), bool(d["enabled"]))
        inject_pinned = to_bool(raw.get("inject_pinned_memory"), bool(d["inject_pinned_memory"]))
        pinned_max_items = to_int(raw.get("pinned_max_items"), int(d["pinned_max_items"]), 1)
        pinned_max_chars = to_int(
            raw.get("pinned_max_chars_per_item"),
            int(d["pinned_max_chars_per_item"]),
            1,
        )
        retrieval_enabled = to_bool(raw.get("retrieval_enabled"), bool(d["retrieval_enabled"]))
        retrieval_backend = str(raw.get("retrieval_backend", "") or "").strip()
        retrieval_provider_id = str(raw.get("retrieval_provider_id", "") or "").strip()
        retrieval_top_k = to_int(raw.get("retrieval_top_k"), int(d["retrieval_top_k"]), 1)

        pinned_raw = raw.get("pinned_memories", []) or []
        pinned_memories: list[str] = []
        if isinstance(pinned_raw, list):
            for item in pinned_raw:
                text = str(item or "").strip()
                if not text:
                    continue
                if len(text) > pinned_max_chars:
                    text = text[:pinned_max_chars]
                pinned_memories.append(text)
                if len(pinned_memories) >= pinned_max_items:
                    break

        return cls(
            enabled=enabled,
            inject_pinned_memory=inject_pinned,
            pinned_memories=pinned_memories,
            pinned_max_items=pinned_max_items,
            pinned_max_chars_per_item=pinned_max_chars,
            retrieval_enabled=retrieval_enabled,
            retrieval_backend=retrieval_backend,
            retrieval_provider_id=retrieval_provider_id,
            retrieval_top_k=retrieval_top_k,
        )
```

Then `load_context_memory_config` can just delegate:

```python
def load_context_memory_config(provider_settings: dict[str, Any] | None) -> ContextMemoryConfig:
    return ContextMemoryConfig.from_settings(provider_settings)
```

This removes `normalize_context_memory_settings` as a public concept and makes the dataclass the single source of truth.

### 2. Avoid mutable default settings

You can drop `_clone_context_memory_defaults` and `DEFAULT_CONTEXT_MEMORY_SETTINGS` entirely if `CONTEXT_MEMORY_DEFAULTS` is treated as immutable and you always create new lists in `from_settings` as shown above.

If you still want a “normalized dict” form for persistence (the current `ensure_context_memory_settings` behavior), you can build it from the config instance instead of passing around normalized dicts:

```python
def ensure_context_memory_settings(provider_settings: dict[str, Any]) -> dict[str, Any]:
    config = ContextMemoryConfig.from_settings(provider_settings)
    provider_settings["context_memory"] = {
        "enabled": config.enabled,
        "inject_pinned_memory": config.inject_pinned_memory,
        "pinned_memories": list(config.pinned_memories),
        "pinned_max_items": config.pinned_max_items,
        "pinned_max_chars_per_item": config.pinned_max_chars_per_item,
        "retrieval_enabled": config.retrieval_enabled,
        "retrieval_backend": config.retrieval_backend,
        "retrieval_provider_id": config.retrieval_provider_id,
        "retrieval_top_k": config.retrieval_top_k,
    }
    return provider_settings["context_memory"]
```

This keeps the “persist normalized subtree” behavior without a separate normalization pipeline and without a mutable global default dict.

### 3. Decouple experimental backend exports

The config module currently re-exports:

```python
__all__ = [
    "ContextMemoryConfig",
    "DEFAULT_CONTEXT_MEMORY_SETTINGS",
    "normalize_context_memory_settings",
    "load_context_memory_config",
    "ensure_context_memory_settings",
    "build_pinned_memory_system_block",
    "VectorLongTermMemoryRetriever",
    "ContextMemoryEvolutionBackend",
    "ContextMemoryMigrationAdapter",
    "set_context_memory_evolution_backend",
    "get_context_memory_evolution_backend",
    "set_context_memory_migration_adapter",
    "get_context_memory_migration_adapter",
]
```

To reduce conceptual surface area for “how does context_memory config work?”, you can stop re-exporting the backend extension points here and only import them where they are actually needed:

```python
__all__ = [
    "ContextMemoryConfig",
    "load_context_memory_config",
    "ensure_context_memory_settings",
    "build_pinned_memory_system_block",
]
```

Call sites that need `VectorLongTermMemoryRetriever` or the evolution/migration hooks can import them directly from `astrbot.core.context_memory_experimental_backends`, which keeps this module focused purely on configuration and pinned-memory behavior.
</issue_to_address>

### Comment 2
<location path="astrbot/core/context_memory_experimental_backends.py" line_range="68" />
<code_context>
+        ...
+
+
+_context_memory_evolution_backend: ContextMemoryEvolutionBackend | None = None
+_context_memory_migration_adapter: ContextMemoryMigrationAdapter | None = None
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider encapsulating the experimental backends behind a single registry/facade so callers interact with one cohesive entry point instead of multiple globals and functions.

You can keep the extension points while reducing surface/mental load by:

1. **Hiding the globals behind a small registry object**, so readers see “one thing” instead of multiple global singletons.
2. **Grouping the experimental hooks under an explicit “experimental” façade**, so they don’t look like core context-memory API.

For example, instead of exporting two separate setters/getters and two module-level globals, encapsulate them:

```python
# Keep the Protocols as-is

class _ContextMemoryBackends:
    def __init__(self) -> None:
        self.evolution_backend: ContextMemoryEvolutionBackend | None = None
        self.migration_adapter: ContextMemoryMigrationAdapter | None = None

_backends = _ContextMemoryBackends()


def configure_context_memory_backends(
    *,
    evolution_backend: ContextMemoryEvolutionBackend | None = None,
    migration_adapter: ContextMemoryMigrationAdapter | None = None,
) -> None:
    """Experimental: configure optional context-memory backends in one place."""
    _backends.evolution_backend = evolution_backend
    _backends.migration_adapter = migration_adapter


def get_context_memory_evolution_backend() -> ContextMemoryEvolutionBackend | None:
    return _backends.evolution_backend


def get_context_memory_migration_adapter() -> ContextMemoryMigrationAdapter | None:
    return _backends.migration_adapter
```

If you want to **further narrow the surface** and make the experimental nature explicit, you can expose a single accessor:

```python
class ExperimentalContextMemoryBackends(Protocol):
    evolution_backend: ContextMemoryEvolutionBackend | None
    migration_adapter: ContextMemoryMigrationAdapter | None


def get_experimental_context_memory_backends() -> ExperimentalContextMemoryBackends:
    return _backends
```

Callers that need the hooks use `configure_context_memory_backends` and `get_experimental_context_memory_backends()`, while the main context-memory flow only has to be aware of a single “experimental backends” concept instead of multiple globals and functions.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@Jacobinwwey
Copy link
Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="astrbot/core/context_memory_experimental_backends.py" line_range="80" />
<code_context>
+_backends = ExperimentalContextMemoryBackends()
+
+
+def configure_context_memory_backends(
+    *,
+    evolution_backend: ContextMemoryEvolutionBackend | None = None,
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the public configuration API to a single configure/read pair instead of multiple getters/setters for the same global backends state.

The main complexity bump is the number of configuration entry points for the same global state. You can keep the Protocols and container as‑is but reduce cognitive load by exposing a single cohesive API and removing the duplicate setters/getters.

Concretely, you can:

1. Keep only `configure_context_memory_backends` as the way to configure, and
2. Keep `get_experimental_context_memory_backends` as the way to read, and
3. Drop the individual `set_*` / `get_*` helpers from the public API.

This preserves all capabilities while making the API smaller and more obvious.

Example:

```python
@dataclass
class ExperimentalContextMemoryBackends:
    evolution_backend: ContextMemoryEvolutionBackend | None = None
    migration_adapter: ContextMemoryMigrationAdapter | None = None


_backends = ExperimentalContextMemoryBackends()


def configure_context_memory_backends(
    *,
    evolution_backend: ContextMemoryEvolutionBackend | None = None,
    migration_adapter: ContextMemoryMigrationAdapter | None = None,
) -> None:
    _backends.evolution_backend = evolution_backend
    _backends.migration_adapter = migration_adapter


def get_experimental_context_memory_backends() -> ExperimentalContextMemoryBackends:
    return _backends
```

Call sites then use:

```python
configure_context_memory_backends(evolution_backend=my_evo_backend)

backends = get_experimental_context_memory_backends()
evo = backends.evolution_backend
migration = backends.migration_adapter
```

And simplify `__all__` to:

```python
__all__ = [
    "VectorLongTermMemoryRetriever",
    "ContextMemoryEvolutionBackend",
    "ContextMemoryMigrationAdapter",
    "ExperimentalContextMemoryBackends",
    "configure_context_memory_backends",
    "get_experimental_context_memory_backends",
]
```

If you still want the fine‑grained setters for internal use or tests, you can keep them but make them private and non‑exported (e.g. `_set_context_memory_evolution_backend`), so they don’t expand the public surface area.
</issue_to_address>

### Comment 2
<location path="astrbot/core/context_memory_backends.py" line_range="8" />
<code_context>
+explicitly isolated from stable context-memory config logic.
+"""
+
+from astrbot.core.context_memory_experimental_backends import (
+    ContextMemoryEvolutionBackend,
+    ContextMemoryMigrationAdapter,
</code_context>
<issue_to_address>
**issue (complexity):** Consider reducing duplication in this compatibility module by dynamically re-exporting symbols from the original experimental backend module instead of manually mirroring each one.

You can keep the compatibility layer while reducing duplication and future drift by delegating to the original module instead of manually mirroring every symbol.

For example, either:

**1. Re-export the module namespace directly**

```python
"""Compatibility re-exports for experimental context-memory backend hooks."""

from astrbot.core import context_memory_experimental_backends as _exp

# Re-export all public names from the experimental module
__all__ = _exp.__all__
globals().update({name: getattr(_exp, name) for name in __all__})
```

This keeps the stable import path while ensuring any new exports added to `context_memory_experimental_backends` are automatically exposed here.

**2. Or, if you only want a subset, avoid double listing**

```python
"""Compatibility re-exports for experimental context-memory backend hooks."""

from astrbot.core import context_memory_experimental_backends as _exp

__all__ = [
    "VectorLongTermMemoryRetriever",
    "ContextMemoryEvolutionBackend",
    "ContextMemoryMigrationAdapter",
    "ExperimentalContextMemoryBackends",
    "configure_context_memory_backends",
    "get_experimental_context_memory_backends",
    "set_context_memory_evolution_backend",
    "get_context_memory_evolution_backend",
    "set_context_memory_migration_adapter",
    "get_context_memory_migration_adapter",
]

# Single source of truth for attribute access
globals().update({name: getattr(_exp, name) for name in __all__})
```

Both approaches preserve the public API and compatibility intent but avoid the maintenance burden and potential mismatches of 1:1 manual re-exports.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@Jacobinwwey
Copy link
Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location path="astrbot/core/context_memory_experimental_backends.py" line_range="77" />
<code_context>
+    migration_adapter: ContextMemoryMigrationAdapter | None = None
+
+
+_backends = ExperimentalContextMemoryBackends()
+
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider replacing the mutable global `_backends` singleton and its configure/get API with an explicit factory that constructs `ExperimentalContextMemoryBackends` instances on demand.

You can keep the experimental backends and protocols while reducing complexity by removing the mutable global singleton and configuration API. That keeps the feature surface smaller and makes the abstraction more obviously “just types” until they’re wired in.

### 1. Replace the global singleton with a factory

Instead of a module‑level `_backends` and `configure_*/get_*` indirection, expose a simple constructor. This preserves all capabilities (you can still build a container of backends) but avoids shared mutable state and lifecycle concerns.

```python
from dataclasses import dataclass
from typing import Any, Protocol, runtime_checkable

@runtime_checkable
class VectorLongTermMemoryRetriever(Protocol):
    async def retrieve(
        self,
        *,
        unified_msg_origin: str,
        query: str,
        top_k: int,
    ) -> list[str]:
        ...

@runtime_checkable
class ContextMemoryEvolutionBackend(Protocol):
    async def evolve(
        self,
        *,
        unified_msg_origin: str,
        turns: list[str],
        metadata: dict[str, Any] | None = None,
    ) -> dict[str, Any]:
        ...

    async def retrieve(
        self,
        *,
        unified_msg_origin: str,
        query: str,
        top_k: int,
    ) -> list[str]:
        ...

@runtime_checkable
class ContextMemoryMigrationAdapter(Protocol):
    async def export_session(
        self,
        *,
        unified_msg_origin: str,
    ) -> dict[str, Any]:
        ...

    async def import_session(
        self,
        *,
        unified_msg_origin: str,
        payload: dict[str, Any],
    ) -> None:
        ...

@dataclass
class ExperimentalContextMemoryBackends:
    evolution_backend: ContextMemoryEvolutionBackend | None = None
    migration_adapter: ContextMemoryMigrationAdapter | None = None

def make_experimental_context_memory_backends(
    *,
    evolution_backend: ContextMemoryEvolutionBackend | None = None,
    migration_adapter: ContextMemoryMigrationAdapter | None = None,
) -> ExperimentalContextMemoryBackends:
    """Create an experimental backend bundle without module‑level state."""
    return ExperimentalContextMemoryBackends(
        evolution_backend=evolution_backend,
        migration_adapter=migration_adapter,
    )

__all__ = [
    "VectorLongTermMemoryRetriever",
    "ContextMemoryEvolutionBackend",
    "ContextMemoryMigrationAdapter",
    "ExperimentalContextMemoryBackends",
    "make_experimental_context_memory_backends",
]
```

Usage at future call sites becomes explicit and easier to reason about:

```python
backends = make_experimental_context_memory_backends(
    evolution_backend=my_evolution_backend,
    migration_adapter=my_migration_adapter,
)

# then pass `backends` or its fields where needed
await backends.evolution_backend.evolve(...)
```

This keeps the experimental design while:

- Eliminating the need to think about global state, thread‑safety, and lifecycle.
- Making the module purely about type/contract definitions plus a light factory, which is simpler until you have concrete integration points that might justify a registry or configurable singleton.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@Jacobinwwey
Copy link
Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've left some high level feedback:

  • In PeriodicContextCompactionScheduler._resolve_token_counter you always read context_token_counter_mode from the global default_conf.provider_settings, ignoring any per-provider/user overrides; if different providers or tenants need different counting modes, consider resolving the mode from the actual runtime provider settings tied to the conversation instead of a single global value.
  • The idle filtering logic for periodic compaction is currently implemented both in _iter_candidate_conversations (via updated_before) and again in CompactionPolicy.is_idle_enough; you could simplify and avoid surprises by centralizing the idle check into a single place to make the effective condition easier to reason about.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `PeriodicContextCompactionScheduler._resolve_token_counter` you always read `context_token_counter_mode` from the global `default_conf.provider_settings`, ignoring any per-provider/user overrides; if different providers or tenants need different counting modes, consider resolving the mode from the actual runtime provider settings tied to the conversation instead of a single global value.
- The idle filtering logic for periodic compaction is currently implemented both in `_iter_candidate_conversations` (via `updated_before`) and again in `CompactionPolicy.is_idle_enough`; you could simplify and avoid surprises by centralizing the idle check into a single place to make the effective condition easier to reason about.

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@Jacobinwwey
Copy link
Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue, and left some high level feedback:

  • In PeriodicContextCompactionScheduler._iter_candidate_conversations you always pass updated_before=None and then enforce idle thresholds in Python; consider pushing min_idle_minutes down into the DB via updated_before to reduce rows scanned per run on large datasets.
  • The _token_counter_cache in PeriodicContextCompactionScheduler is keyed by (mode, model) and never pruned; if many distinct models are used over time this could grow unbounded, so it may be worth adding a simple size cap or LRU-style eviction.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `PeriodicContextCompactionScheduler._iter_candidate_conversations` you always pass `updated_before=None` and then enforce idle thresholds in Python; consider pushing `min_idle_minutes` down into the DB via `updated_before` to reduce rows scanned per run on large datasets.
- The `_token_counter_cache` in `PeriodicContextCompactionScheduler` is keyed by `(mode, model)` and never pruned; if many distinct models are used over time this could grow unbounded, so it may be worth adding a simple size cap or LRU-style eviction.

## Individual Comments

### Comment 1
<location path="astrbot/core/context_memory_experimental_backends.py" line_range="7" />
<code_context>
+from typing import Any, Protocol, runtime_checkable
+
+
+@runtime_checkable
+class VectorLongTermMemoryRetriever(Protocol):
+    """Experimental protocol for future vector-DB long-term memory retrieval."""
</code_context>
<issue_to_address>
**issue (complexity):** Consider collapsing the multiple experimental context-memory protocols and container into a single unified backend protocol to simplify the experimental API surface.

You can reduce surface area by collapsing the three experimental interfaces and the container into a single backend protocol that still supports all the same capabilities (evolve, retrieve, export, import) but avoids extra types and indirection until real use cases appear.

For example, instead of three Protocols + a dataclass + a factory:

```python
@runtime_checkable
class ContextMemoryBackend(Protocol):
    """Experimental protocol for context-memory evolution + migration."""

    async def evolve(
        self,
        *,
        unified_msg_origin: str,
        turns: list[str],
        metadata: dict[str, Any] | None = None,
    ) -> dict[str, Any]:
        ...

    async def retrieve(
        self,
        *,
        unified_msg_origin: str,
        query: str,
        top_k: int,
    ) -> list[str]:
        ...

    async def export_session(
        self,
        *,
        unified_msg_origin: str,
    ) -> dict[str, Any]:
        ...

    async def import_session(
        self,
        *,
        unified_msg_origin: str,
        payload: dict[str, Any],
    ) -> None:
        ...
```

Then the “bundle” can just be a single optional backend instead of a dedicated dataclass + factory:

```python
ContextMemoryBackend | None  # used where ExperimentalContextMemoryBackends is planned

__all__ = [
    "ContextMemoryBackend",
]
```

This keeps all experimental capabilities (evolution, retrieval, migration) while:

- Removing one level of indirection (no container + factory).
- Avoiding three separate protocols for concepts that may later be merged/split based on real usage.
- Making it easier for future contributors to see “the” experimental backend surface in one place.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@Jacobinwwey
Copy link
Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've reviewed your changes and they look great!


Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@AstrBotDevs AstrBotDevs deleted a comment Mar 22, 2026
@Jacobinwwey
Copy link
Author

@advent259141 @Fridemn [Passed the review by Sourcery] Need review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:core The bug / feature is about astrbot's core, backend size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants