Skip to content

perf: 优化上下文压缩模块 (减少重复计算/更精确token估算)#6655

Closed
rin259 wants to merge 2 commits intoAstrBotDevs:masterfrom
rin259:perf/context-compress-optimization
Closed

perf: 优化上下文压缩模块 (减少重复计算/更精确token估算)#6655
rin259 wants to merge 2 commits intoAstrBotDevs:masterfrom
rin259:perf/context-compress-optimization

Conversation

@rin259
Copy link
Contributor

@rin259 rin259 commented Mar 20, 2026

优化内容

1. Token 估算算法改进

  • 更精确的中英文混合文本估算
  • 区分中英数特字符使用不同比率
  • 中文: ~0.55 tokens/char
  • 英文: ~0.25 tokens/char
  • 数字: ~0.4 tokens/char

2. 添加缓存机制

  • Token 计数缓存 (EstimateTokenCounter)
  • 摘要缓存 (LLMSummaryCompressor)
  • 减少重复计算

3. ContextManager 优化

  • 减少重复 token 计算
  • 添加压缩统计信息 (get_stats())
  • 支持重置统计 (reset_stats())

4. 新增单元测试

  • Token 估算准确性测试
  • 缓存功能测试
  • 压缩器功能测试
  • ContextManager 集成测试

优化效果

  • 减少 API 调用时的 token 计算开销
  • 提升长对话场景下的响应速度
  • 更精确的上下文管理

测试

运行测试: pytest tests/test_context_compression.py -v


关联 issue: (如有)

Summary by Sourcery

Optimize the context compression pipeline with more accurate token estimation, caching, and statistics for better performance and observability.

New Features:

  • Introduce multimodal-aware token estimation that accounts for text, images, audio, and tool calls.
  • Add caching for token counts and LLM-generated summaries to avoid repeated computation.
  • Expose context manager statistics and reset APIs for monitoring compression behavior.

Enhancements:

  • Refine the token estimation algorithm for mixed Chinese/English text and different character types to improve context size control.
  • Improve truncate-by-turns compression behavior to support incremental, less aggressive truncation based on usage.
  • Reduce redundant token counting in the context manager by reusing cached counts and optimizing post-compression checks.

Tests:

  • Add comprehensive unit and integration tests covering token estimation accuracy, caching behavior, compressors, and context manager workflows.

Chores:

  • Simplify pytest configuration and add shared fixtures for context compression tests.

主要优化:
1. Token 估算算法改进
   - 更精确的中英文混合文本估算
   - 区分中英数特字符使用不同比率

2. 添加缓存机制
   - Token 计数缓存 (EstimateTokenCounter)
   - 摘要缓存 (LLMSummaryCompressor)
   - 减少重复计算

3. ContextManager 优化
   - 减少重复 token 计算
   - 添加压缩统计信息

4. 新增单元测试
   - Token 估算准确性测试
   - 缓存功能测试
   - 压缩器功能测试

优化效果:
- 减少 API 调用时的 token 计算开销
- 提升长对话场景下的响应速度
- 更精确的上下文管理
@auto-assign auto-assign bot requested review from Soulter and advent259141 March 20, 2026 01:49
@dosubot dosubot bot added the size:XXL This PR changes 1000+ lines, ignoring generated files. label Mar 20, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

此拉取请求旨在通过引入更精确的 Token 估算算法、实现多层缓存机制以及优化上下文管理器的内部逻辑,全面提升上下文压缩模块的性能和效率。这些改进将有效降低 API 调用中的 Token 计算开销,加快长对话场景下的响应速度,并提供更精细的上下文管理能力。

Highlights

  • Token 估算算法改进: 对中英文混合文本、数字和特殊字符采用更精确的估算比率,例如中文约0.55 tokens/char,英文约0.25 tokens/char,数字约0.4 tokens/char。
  • 添加缓存机制: 为 Token 计数器 (EstimateTokenCounter) 和 LLM 摘要模块 (LLMSummaryCompressor) 引入了缓存,以减少重复计算和 API 调用开销。
  • ContextManager 优化: 减少了重复的 Token 计算,支持增量压缩,并新增了获取和重置压缩统计信息的功能 (get_stats(), reset_stats())。
  • 新增单元测试: 增加了全面的单元测试,覆盖了 Token 估算准确性、缓存功能、压缩器功能以及 ContextManager 的集成测试。
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@dosubot dosubot bot added the area:core The bug / feature is about astrbot's core, backend label Mar 20, 2026
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 7 issues, and left some high level feedback:

  • The cache keys in EstimateTokenCounter._get_cache_key and LLMSummaryCompressor._generate_cache_key rely only on message count and a short preview of the last message, which can easily collide between different histories; consider including a hash of all message contents or a rolling checksum to avoid returning wrong cached results.
  • In ContextManager.process, _last_token_count is reused whenever len(result) == len(messages), but content can change without a length change; it would be safer to tie the token cache to a content-based key (e.g., hash of messages) instead of just the list length.
  • The new tests/conftest.py replaces a large set of shared fixtures and pytest configuration with context-specific fixtures at the repo root, which may unintentionally break or change behavior for other tests; consider moving these context-compression-specific fixtures into a test-module-local conftest.py or preserving the previous global fixtures and markers.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The cache keys in `EstimateTokenCounter._get_cache_key` and `LLMSummaryCompressor._generate_cache_key` rely only on message count and a short preview of the last message, which can easily collide between different histories; consider including a hash of all message contents or a rolling checksum to avoid returning wrong cached results.
- In `ContextManager.process`, `_last_token_count` is reused whenever `len(result) == len(messages)`, but content can change without a length change; it would be safer to tie the token cache to a content-based key (e.g., hash of messages) instead of just the list length.
- The new `tests/conftest.py` replaces a large set of shared fixtures and pytest configuration with context-specific fixtures at the repo root, which may unintentionally break or change behavior for other tests; consider moving these context-compression-specific fixtures into a test-module-local `conftest.py` or preserving the previous global fixtures and markers.

## Individual Comments

### Comment 1
<location path="astrbot/core/agent/context/token_counter.py" line_range="72-75" />
<code_context>
+        
+        Uses message content hash for quick cache lookup.
+        """
+        # 使用消息数量和最后一条消息的内容作为简单缓存键
+        if not messages:
+            return 0
+        return hash((len(messages), str(messages[-1])[:100]))
+
     def count_tokens(
</code_context>
<issue_to_address>
**issue (bug_risk):** Cache key is too coarse and can return stale token counts for different histories.

Using only `len(messages)` and a truncated `str(messages[-1])` makes collisions likely: different histories with the same length and similar last message can share a cache entry and get a badly wrong token estimate, which will skew compression decisions.

Please consider a stronger key (e.g., a hash over roles+contents for all or several recent messages, or a monotonic `message_id`/timestamp if available). If that’s too expensive, narrowing the cache scope (e.g., only for the latest request) or providing a way to disable caching would reduce the risk of stale results.
</issue_to_address>

### Comment 2
<location path="astrbot/core/agent/context/token_counter.py" line_range="145-154" />
<code_context>
+        digit_count = 0
+        special_count = 0
+        
+        for c in text:
+            if "\u4e00" <= c <= "\u9fff":
+                chinese_count += 1
+            elif c.isdigit():
+                digit_count += 1
+            elif c.isalpha():
+                english_count += 1
+            else:
+                special_count += 1
+        
+        # 使用更精确的估算比率
+        # 中文: ~0.55 tokens/char (考虑标点和空格)
+        # 英文: ~0.25 tokens/char
+        # 数字: ~0.4 tokens/char
+        # 特殊字符: ~0.2 tokens/char
+        
+        chinese_tokens = int(chinese_count * 0.55)
+        english_tokens = int(english_count * 0.25)
+        digit_tokens = int(digit_count * 0.4)
+        special_tokens = int(special_count * 0.2)
+        
+        # 添加消息格式开销 (role, content wrapper 等)
+        overhead = 4
+        
+        return chinese_tokens + english_tokens + digit_tokens + special_tokens + overhead
+
+    def get_cache_stats(self) -> dict:
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Per-call `overhead` in `_estimate_tokens` likely overcounts when a single message has multiple parts.

Because `_count_tokens_internal` calls `_estimate_tokens` for each `TextPart`/`ThinkPart`, this per-call `overhead = 4` is effectively charged once per part instead of once per message. If the overhead is meant for per-message structures (role, wrappers, etc.), it should be applied at the message level in `_count_tokens_internal`, and `_estimate_tokens` should remain purely text-based.

Suggested implementation:

```python
        # 使用更精确的估算比率
        # 中文: ~0.55 tokens/char (考虑标点和空格)
        # 英文: ~0.25 tokens/char
        # 数字: ~0.4 tokens/char
        # 特殊字符: ~0.2 tokens/char

        chinese_tokens = int(chinese_count * 0.55)
        english_tokens = int(english_count * 0.25)
        digit_tokens = int(digit_count * 0.4)
        special_tokens = int(special_count * 0.2)

        # 仅返回基于文本内容的估算结果,不包含消息级别开销
        return chinese_tokens + english_tokens + digit_tokens + special_tokens

```

To fully implement your suggestion, `_estimate_tokens` is now purely text-based. You should also:

1. In `_count_tokens_internal`, after summing the token estimates for all `TextPart` / `ThinkPart` instances belonging to a single message, add a per-message overhead (e.g. `PER_MESSAGE_OVERHEAD = 4`) **once** per message:
   - Define a constant (class-level or module-level) such as `PER_MESSAGE_OVERHEAD = 4`.
   - When computing tokens for a single message, do something like:
     `message_tokens = sum(self._estimate_tokens(part.text) for part in message.parts) + PER_MESSAGE_OVERHEAD`.
2. Ensure that `_count_tokens_internal` does **not** call `_estimate_tokens` with any additional per-part overhead, so the overhead is only applied at the message level, not per part.

Adjust the exact placement of this logic to match how messages and parts are represented in the rest of `token_counter.py`.
</issue_to_address>

### Comment 3
<location path="astrbot/core/agent/context/compressor.py" line_range="255-264" />
<code_context>
-        except Exception as e:
-            logger.error(f"Failed to generate summary: {e}")
-            return messages
+        # 生成缓存键
+        cache_key = self._generate_cache_key(messages_to_summarize)
+        
</code_context>
<issue_to_address>
**issue (bug_risk):** Summary cache key can collide for different histories, leading to incorrect summaries being reused.

As implemented, the key based only on message count and a hash of the last message preview can collide when earlier messages change but the last message and count do not, causing stale summaries to be reused for different histories. Consider deriving the key from more of the history (e.g., hash of roles+contents for all, or at least last N messages) or from a monotonic conversation identifier if you have one.
</issue_to_address>

### Comment 4
<location path="astrbot/core/agent/context/compressor.py" line_range="123-124" />
<code_context>
+        
+        基于消息数量和当前使用率,智能调整截断策略。
+        """
+        # 简单场景: 使用配置的截断轮数
+        return max(1, self.truncate_turns)


</code_context>
<issue_to_address>
**suggestion:** New state (`min_keep_turns`, `_last_truncate_turns`) and dynamic truncation comment are not reflected in the implementation.

The new fields and comments describe dynamic truncate behavior, but `_calculate_truncate_turns` still returns `max(1, self.truncate_turns)` and ignores `min_keep_turns`/`_last_truncate_turns`. Either integrate these fields into the calculation (e.g., limit `drop_turns` based on `len(messages)` and `min_keep_turns`) or remove/defer them to avoid misleading documentation and unused state.

Suggested implementation:

```python
    def _calculate_truncate_turns(self, messages: list[Message]) -> int:
        """动态计算需要截断的轮数。

        基于消息数量和当前使用率,智能调整截断策略。
        """
        total_turns = len(messages)

        # 没有足够轮数可截断时,直接返回 0
        if total_turns <= self.min_keep_turns:
            return 0

        # 最大可截断轮数(至少保留 min_keep_turns 轮)
        max_drop_turns = max(0, total_turns - self.min_keep_turns)

        # 基础截断策略: 使用配置的 truncate_turns,确保至少 1 轮
        base_turns = max(1, self.truncate_turns)

        # 结合上一次截断结果做简单平滑,避免截断轮数抖动过大
        last_turns = getattr(self, "_last_truncate_turns", None)
        if last_turns is not None:
            smoothed_turns = max(1, int((base_turns + last_turns) / 2))
        else:
            smoothed_turns = base_turns

        # 实际截断轮数不能超过本次可截断上限
        truncate_turns = min(max_drop_turns, smoothed_turns)

        # 允许在 max_drop_turns 非零但 very small 的情况下,返回 0
        # (如 max_drop_turns == 0 时已经在上面被约束)
        return max(0, truncate_turns)

```

```python
        self.truncate_turns = truncate_turns
        self.compression_threshold = compression_threshold
        # 新增: 最小保留轮数,避免截断过多
        self.min_keep_turns = max(0, min_keep_turns)
        # 记录上一次实际截断的轮数,用于平滑调整
        self._last_truncate_turns = max(1, truncate_turns)

```

1. Ensure `__init__` 的函数签名中包含 `min_keep_turns: int = 0`(或你期望的默认值),例如:
   `def __init__(..., truncate_turns: int = 1, compression_threshold: int = 0, min_keep_turns: int = 0) -> None:`
2. 如果有类型注解类属性区域(如在类体顶部用 `self.min_keep_turns: int` 等),请补充上 `min_keep_turns``_last_truncate_turns` 的注解以保持类型一致性。
3. 确认调用 `_calculate_truncate_turns` 的地方已经在截断完成后设置 `self._last_truncate_turns = truncate_turns`(从你提供的片段来看已经存在),以便平滑逻辑生效。
</issue_to_address>

### Comment 5
<location path="astrbot/core/agent/context/manager.py" line_range="82-85" />
<code_context>
+                # 优化: 使用缓存的 token 计数或计算新值
+                if trusted_token_usage > 0:
+                    total_tokens = trusted_token_usage
+                elif self._last_token_count is not None:
+                    # 简单检查:如果消息数量没变,使用缓存
+                    if len(result) == len(messages):
+                        total_tokens = self._last_token_count
+                    else:
+                        total_tokens = self.token_counter.count_tokens(result)
</code_context>
<issue_to_address>
**issue (bug_risk):** Reusing `_last_token_count` based only on list length can give incorrect token counts when content changes.

The reuse check `len(result) == len(messages)` ignores message content. If any message text changes while the count stays the same, `_last_token_count` may be reused incorrectly and the compressor will operate on stale token data.

Consider also tracking a simple fingerprint of the messages (e.g., a hash of `(role, content)` tuples or a monotonically increasing message version) and only reusing `_last_token_count` when that fingerprint matches, or limit reuse to workflows where messages are guaranteed immutable.
</issue_to_address>

### Comment 6
<location path="astrbot/core/agent/context/token_counter.py" line_range="67" />
<code_context>
+        self._hit_count = 0
+        self._miss_count = 0
+
+    def _get_cache_key(self, messages: list[Message]) -> int:
+        """Generate a cache key for messages.
+        
</code_context>
<issue_to_address>
**issue (complexity):** Consider strengthening the cache key and simplifying eviction logic so the token counter’s caching remains effective without introducing hard-to-debug incorrect counts and branching complexity.

You’re right that multimodal support is the main feature; the main risk now is that the cache can return *incorrect* counts because the key is too weak. You can keep caching and stats while reducing complexity/bugs by tightening the cache key and simplifying eviction.

### 1. Make the cache key deterministic and collision‑resistant

Currently:

```python
def _get_cache_key(self, messages: list[Message]) -> int:
    if not messages:
        return 0
    return hash((len(messages), str(messages[-1])[:100]))
```

Different histories with the same length and similar last message text can collide, giving wrong counts. You can still keep a cheap key but base it on all messages in a structured way:

```python
def _get_cache_key(self, messages: list[Message]) -> int:
    """Generate a cache key for messages based on full history structure."""
    if not messages:
        return 0

    h = 0
    for msg in messages:
        # Only use stable, cheap fields to build the key
        content_repr = (
            msg.content
            if isinstance(msg.content, str)
            else str(msg.content)
        )
        tool_repr = (
            tuple(sorted(tc.items())) if isinstance(tc, dict) else str(tc)
            for tc in (msg.tool_calls or [])
        )
        h = hash((
            h,
            msg.role,
            content_repr,
            tuple(tool_repr),
        ))
    return h
```

This keeps the cache but drastically lowers collision risk and makes behavior easier to reason about.

### 2. Simplify eviction logic

Current eviction:

```python
if len(self._cache) < self._cache_size:
    self._cache[cache_key] = total
elif self._cache_size > 0:
    # 简单的缓存淘汰: 清空一半
    keys_to_remove = list(self._cache.keys())[:self._cache_size // 2]
    for key in keys_to_remove:
        del self._cache[key]
    self._cache[cache_key] = total
```

You can keep a simple FIFO-ish eviction while reducing branching and temporary allocations:

```python
if self._cache_size > 0:
    if len(self._cache) >= self._cache_size:
        # Drop a few arbitrary entries to keep memory bounded
        for _ in range(max(1, self._cache_size // 10)):
            try:
                self._cache.pop(next(iter(self._cache)))
            except StopIteration:
                break
    self._cache[cache_key] = total
```

This removes the “clear half the cache” behavior and large `list(self._cache.keys())` allocations, while still bounding cache size and keeping the class behavior (caching + eviction) intact.
</issue_to_address>

### Comment 7
<location path="astrbot/core/agent/context/manager.py" line_range="79" />
<code_context>
-                total_tokens = self.token_counter.count_tokens(
-                    result, trusted_token_usage
-                )
+                # 优化: 使用缓存的 token 计数或计算新值
+                if trusted_token_usage > 0:
+                    total_tokens = trusted_token_usage
</code_context>
<issue_to_address>
**issue (complexity):** Consider encapsulating token-count caching and metrics tracking into dedicated helper classes so that `ContextManager` remains a thin orchestration layer with minimal branching and internal state.

You can keep all new behavior (caching, stats, logging) but move most of the state/branching out of `ContextManager` so it stays “lean” and focused on orchestration.

### 1. Move token-count caching into `EstimateTokenCounter`

Right now `process()` carries a fragile heuristic and branching:

```python
if trusted_token_usage > 0:
    total_tokens = trusted_token_usage
elif self._last_token_count is not None:
    if len(result) == len(messages):
        total_tokens = self._last_token_count
    else:
        total_tokens = self.token_counter.count_tokens(result)
else:
    total_tokens = self.token_counter.count_tokens(result)

self._last_token_count = total_tokens
```

This can become a simple call if the caching logic is encapsulated in `EstimateTokenCounter`, keyed by something stable (e.g. `id`s or a digest) rather than `len()`:

```python
# in EstimateTokenCounter
class EstimateTokenCounter:
    def __init__(self, ...):
        self._last_key: tuple[int, int] | None = None  # (len, hash)
        self._last_count: int | None = None

    def count_tokens_cached(
        self,
        messages: list[Message],
        trusted_token_usage: int = 0,
    ) -> int:
        if trusted_token_usage > 0:
            return trusted_token_usage

        key = (len(messages), hash(tuple(m.id for m in messages)))  # adjust to your msg model
        if self._last_key == key and self._last_count is not None:
            return self._last_count

        count = self.count_tokens(messages)
        self._last_key = key
        self._last_count = count
        return count
```

Then `ContextManager.process` becomes:

```python
# in ContextManager.process
if self.config.max_context_tokens > 0:
    total_tokens = self.token_counter.count_tokens_cached(
        result,
        trusted_token_usage=trusted_token_usage,
    )

    if self.compressor.should_compress(
        result,
        total_tokens,
        self.config.max_context_tokens,
    ):
        result = await self._run_compression(result, total_tokens)
```

This removes branches and state from `ContextManager` while keeping the optimization and avoiding the fragile `len(result) == len(messages)` heuristic.

### 2. Keep stats but move them into a small metrics object

Instead of `ContextManager` owning `_compression_count`, `_last_token_count`, `get_stats`, `reset_stats`, and reaching into `token_counter` via `hasattr`, you can centralize stats in a dedicated, lightweight helper:

```python
# metrics.py
@dataclass
class ContextMetrics:
    compression_count: int = 0
    last_token_count: int | None = None

    def as_dict(self, token_counter) -> dict:
        stats = {
            "compression_count": self.compression_count,
            "last_token_count": self.last_token_count,
        }
        if hasattr(token_counter, "get_cache_stats"):
            stats["token_counter_cache"] = token_counter.get_cache_stats()
        return stats

    def reset(self, token_counter) -> None:
        self.compression_count = 0
        self.last_token_count = None
        if hasattr(token_counter, "clear_cache"):
            token_counter.clear_cache()
```

`ContextManager` then becomes:

```python
class ContextManager:
    def __init__(...):
        ...
        self.metrics = ContextMetrics()

    async def _run_compression(...):
        logger.debug("Compress triggered, starting compression...")
        self.metrics.compression_count += 1

        messages = await self.compressor(messages)
        tokens_after = self.token_counter.count_tokens(messages)
        self.metrics.last_token_count = tokens_after

        ...

        if self.compressor.should_compress(...):
            ...
            self.metrics.last_token_count = self.token_counter.count_tokens(messages)
        return messages

    def get_stats(self) -> dict:
        return self.metrics.as_dict(self.token_counter)

    def reset_stats(self) -> None:
        self.metrics.reset(self.token_counter)
```

This keeps all existing behavior (stats, logging, cache stats exposure) while:

- Removing raw counters and bookkeeping fields from `ContextManager`.
- Making `_run_compression` again “readable at a glance” as compress → count → maybe truncate.
- Making future metric additions localized to `ContextMetrics`.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

total_tokens = self.token_counter.count_tokens(
result, trusted_token_usage
)
# 优化: 使用缓存的 token 计数或计算新值
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider encapsulating token-count caching and metrics tracking into dedicated helper classes so that ContextManager remains a thin orchestration layer with minimal branching and internal state.

You can keep all new behavior (caching, stats, logging) but move most of the state/branching out of ContextManager so it stays “lean” and focused on orchestration.

1. Move token-count caching into EstimateTokenCounter

Right now process() carries a fragile heuristic and branching:

if trusted_token_usage > 0:
    total_tokens = trusted_token_usage
elif self._last_token_count is not None:
    if len(result) == len(messages):
        total_tokens = self._last_token_count
    else:
        total_tokens = self.token_counter.count_tokens(result)
else:
    total_tokens = self.token_counter.count_tokens(result)

self._last_token_count = total_tokens

This can become a simple call if the caching logic is encapsulated in EstimateTokenCounter, keyed by something stable (e.g. ids or a digest) rather than len():

# in EstimateTokenCounter
class EstimateTokenCounter:
    def __init__(self, ...):
        self._last_key: tuple[int, int] | None = None  # (len, hash)
        self._last_count: int | None = None

    def count_tokens_cached(
        self,
        messages: list[Message],
        trusted_token_usage: int = 0,
    ) -> int:
        if trusted_token_usage > 0:
            return trusted_token_usage

        key = (len(messages), hash(tuple(m.id for m in messages)))  # adjust to your msg model
        if self._last_key == key and self._last_count is not None:
            return self._last_count

        count = self.count_tokens(messages)
        self._last_key = key
        self._last_count = count
        return count

Then ContextManager.process becomes:

# in ContextManager.process
if self.config.max_context_tokens > 0:
    total_tokens = self.token_counter.count_tokens_cached(
        result,
        trusted_token_usage=trusted_token_usage,
    )

    if self.compressor.should_compress(
        result,
        total_tokens,
        self.config.max_context_tokens,
    ):
        result = await self._run_compression(result, total_tokens)

This removes branches and state from ContextManager while keeping the optimization and avoiding the fragile len(result) == len(messages) heuristic.

2. Keep stats but move them into a small metrics object

Instead of ContextManager owning _compression_count, _last_token_count, get_stats, reset_stats, and reaching into token_counter via hasattr, you can centralize stats in a dedicated, lightweight helper:

# metrics.py
@dataclass
class ContextMetrics:
    compression_count: int = 0
    last_token_count: int | None = None

    def as_dict(self, token_counter) -> dict:
        stats = {
            "compression_count": self.compression_count,
            "last_token_count": self.last_token_count,
        }
        if hasattr(token_counter, "get_cache_stats"):
            stats["token_counter_cache"] = token_counter.get_cache_stats()
        return stats

    def reset(self, token_counter) -> None:
        self.compression_count = 0
        self.last_token_count = None
        if hasattr(token_counter, "clear_cache"):
            token_counter.clear_cache()

ContextManager then becomes:

class ContextManager:
    def __init__(...):
        ...
        self.metrics = ContextMetrics()

    async def _run_compression(...):
        logger.debug("Compress triggered, starting compression...")
        self.metrics.compression_count += 1

        messages = await self.compressor(messages)
        tokens_after = self.token_counter.count_tokens(messages)
        self.metrics.last_token_count = tokens_after

        ...

        if self.compressor.should_compress(...):
            ...
            self.metrics.last_token_count = self.token_counter.count_tokens(messages)
        return messages

    def get_stats(self) -> dict:
        return self.metrics.as_dict(self.token_counter)

    def reset_stats(self) -> None:
        self.metrics.reset(self.token_counter)

This keeps all existing behavior (stats, logging, cache stats exposure) while:

  • Removing raw counters and bookkeeping fields from ContextManager.
  • Making _run_compression again “readable at a glance” as compress → count → maybe truncate.
  • Making future metric additions localized to ContextMetrics.

@firenick15
Copy link

为什么不用tiktoken来估算token?这样应该更精准吧

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request introduces significant optimizations to context compression, including an improved token counter with caching and multimodal support, and enhanced summary and truncation compressors. Specifically, the EstimateTokenCounter now supports caching, more accurate token estimation for mixed content, and multimodal message parts (images, audio, thinking parts). The LLMSummaryCompressor now includes a summary cache to avoid redundant LLM calls, and the TruncateByTurnsCompressor has new attributes for dynamic truncation. The ContextManager has been updated to leverage these new features, including caching token counts and tracking compression statistics. New test fixtures and comprehensive unit tests have been added to validate these changes. However, several issues and improvements were noted in the review: the cache key generation in both LLMSummaryCompressor and EstimateTokenCounter is too simplistic and prone to collisions, potentially leading to incorrect cached data; the ContextManager's internal token count caching (_last_token_count) is problematic due to staleness and should be removed in favor of EstimateTokenCounter's more robust caching; an unused min_keep_turns variable in TruncateByTurnsCompressor should be removed; and the docstring for _calculate_truncate_turns in TruncateByTurnsCompressor is misleading and needs to be updated to reflect its current simple implementation.

Comment on lines +308 to +318
def _generate_cache_key(self, messages: list[Message]) -> str:
"""生成缓存键。

使用消息数量和最后一条消息的哈希作为缓存键。
"""
if not messages:
return ""
# 使用简洁的方式生成缓存键
msg_count = len(messages)
last_msg_preview = str(messages[-1])[:50] if messages else ""
return f"{msg_count}:{hash(last_msg_preview)}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

当前的缓存键生成逻辑 _generate_cache_key 仅依赖于消息数量和最后一条消息内容的预览。这可能导致缓存键冲突。例如,如果两个不同的消息列表长度相同,且最后一条消息的前50个字符也相同,它们将获得相同的缓存键,但需要生成的摘要内容却不同,这会导致返回错误的缓存摘要。
建议使用更能唯一标识整个消息列表内容的方式来生成缓存键,例如对所有消息内容进行哈希。
一个更健壮的实现示例:

import hashlib
import json

def _generate_cache_key(self, messages: list[Message]) -> str:
    """生成缓存键。"""
    if not messages:
        return ""
    
    # 将所有消息转换为可序列化的形式并计算哈希
    messages_json = json.dumps([msg.model_dump() for msg in messages], sort_keys=True)
    return hashlib.sha256(messages_json.encode('utf-8')).hexdigest()

Comment on lines +67 to +75
def _get_cache_key(self, messages: list[Message]) -> int:
"""Generate a cache key for messages.

Uses message content hash for quick cache lookup.
"""
# 使用消息数量和最后一条消息的内容作为简单缓存键
if not messages:
return 0
return hash((len(messages), str(messages[-1])[:100]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

当前的缓存键生成逻辑 _get_cache_key 仅依赖于消息列表的长度和最后一条消息内容的预览。这可能导致缓存键冲突。如果列表中间或早期的消息发生变化,或者最后一条消息的变化部分在前100个字符之后,缓存键不会改变,从而导致返回错误的(过期的)token 计数。
建议采用更健壮的哈希策略,以确保缓存的准确性。
例如,可以考虑对所有消息的内容进行哈希:

import hashlib
import json

def _get_cache_key(self, messages: list[Message]) -> int:
    """Generate a cache key for messages."""
    if not messages:
        return 0
    
    # 对所有消息进行哈希以获得可靠的缓存键
    # 注意:这可能会有性能影响,需要进行测试
    messages_json = json.dumps([msg.model_dump() for msg in messages], sort_keys=True)
    return hash(messages_json)

Comment on lines +75 to +76
# 新增: 最小保留轮数,避免截断过多
self.min_keep_turns = 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

min_keep_turns 变量在这里被初始化了,但在 TruncateByTurnsCompressor 类中似乎没有被使用。如果这个变量未来没有计划使用,建议移除以保持代码整洁。

Comment on lines +118 to +124
def _calculate_truncate_turns(self, messages: list[Message]) -> int:
"""动态计算需要截断的轮数。

基于消息数量和当前使用率,智能调整截断策略。
"""
# 简单场景: 使用配置的截断轮数
return max(1, self.truncate_turns)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

这个方法的文档字符串描述了“基于消息数量和当前使用率,智能调整截断策略”,但目前的实现 return max(1, self.truncate_turns) 比较简单,并未实现该复杂逻辑。为了避免误解,建议更新文档字符串以匹配当前实现,或者在未来实现该智能逻辑。

@rin259
Copy link
Contributor Author

rin259 commented Mar 20, 2026

好问题!👋

这个 PR 的定位是在不增加外部依赖的情况下优化现有算法。主要改进:

  1. 缓存机制 - 减少重复 token 计算,这是主要性能提升
  2. 估算算法优化 - 比原来的简单除以4更精确
  3. 统计功能 - 方便监控压缩效果

后续改进方向

  • 可以考虑引入 或 做精确计数(需要添加依赖)
  • 或者使用模型的 返回值(如果 API 支持)

当前方案的优势是零依赖、即插即用,适合作为基础优化喵~

Bug 修复:
1. Token 计数器缓存键
   - 使用完整消息历史生成缓存键 (role + content + tool_calls)
   - 避免不同历史产生相同缓存键的问题

2. 消息开销计算
   - 移除 _estimate_tokens 中的 overhead 参数
   - 改为在消息级别统一添加 PER_MESSAGE_OVERHEAD (4 tokens)
   - 避免每个 TextPart/ThinkPart 都添加开销

3. 摘要缓存键
   - 使用完整历史生成缓存键
   - 避免不同历史碰撞

4. ContextManager 指纹
   - 使用消息指纹检测内容变化
   - 只有指纹匹配才复用缓存的 token 计数

5. 新增单元测试
   - 测试不同消息产生不同缓存键
   - 测试相同消息产生相同缓存键
   - 测试摘要缓存键生成
@rin259
Copy link
Contributor Author

rin259 commented Mar 20, 2026

跟踪#6660

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:core The bug / feature is about astrbot's core, backend size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants