Skip to content

perf: 优化上下文压缩模块 (v2)#6660

Open
rin259 wants to merge 4 commits intoAstrBotDevs:masterfrom
rin259:perf/context-compression-v2
Open

perf: 优化上下文压缩模块 (v2)#6660
rin259 wants to merge 4 commits intoAstrBotDevs:masterfrom
rin259:perf/context-compression-v2

Conversation

@rin259
Copy link
Contributor

@rin259 rin259 commented Mar 20, 2026

优化内容

1. Token 估算算法改进

  • 更精确的中英文混合文本估算
  • 区分中英数特字符使用不同比率

2. 添加缓存机制

  • Token 计数缓存 (EstimateTokenCounter)
  • 摘要缓存 (LLMSummaryCompressor)

3. ContextManager 优化

  • 消息指纹机制减少重复 token 计算
  • 添加压缩统计信息

4. Bug 修复

  • 修复缓存键碰撞问题(使用完整消息历史生成缓存键)
  • 修复 overhead 重复计算问题

5. 单元测试

  • 新增 18 个测试用例

测试

pytest tests/test_context_compression.py -v

关闭旧 PR #6655(因冲突)

Summary by Sourcery

Optimize the context compression pipeline with improved token estimation, caching, and runtime statistics.

New Features:

  • Introduce a cacheable token estimation counter with per-message overhead accounting and cache statistics APIs.
  • Add summary caching and cache management to the LLM-based summary compressor to avoid redundant summarization calls.
  • Expose context manager statistics and reset APIs for observing and managing compression behavior.

Bug Fixes:

  • Prevent cache key collisions in token and summary caching by basing keys on full message history structure.
  • Avoid duplicate token overhead calculations during compression by reusing cached counts where possible.

Enhancements:

  • Refine token estimation ratios for mixed Chinese/English text, digits, and special characters to improve context size estimation accuracy.
  • Reduce redundant token counting in the context manager by fingerprinting message histories and reusing previous counts.
  • Improve compression logging with compression sequencing and clearer token usage reporting.

Tests:

  • Add a dedicated test suite for context compression covering token estimation accuracy, cache behavior, compressor triggering, summary caching, and context manager statistics.

- Token 估算算法改进(中英数特字符分别计算)
- 添加 Token 计数缓存和摘要缓存
- ContextManager 添加指纹机制减少重复计算
- 修复缓存键碰撞和 overhead 重复计算 bug
@auto-assign auto-assign bot requested review from Fridemn and advent259141 March 20, 2026 03:14
@dosubot dosubot bot added the size:XL This PR changes 500-999 lines, ignoring generated files. label Mar 20, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

此拉取请求旨在显著优化上下文压缩模块的性能和准确性。通过引入先进的 token 估算算法、全面的缓存机制以及 ContextManager 的内部改进,系统能够更高效地管理对话上下文,减少不必要的计算开销,并修复了关键的缓存和计算错误,从而提升了整体的用户体验和资源利用率。

Highlights

  • Token 估算算法改进: 引入了更精确的中英文混合文本 token 估算方法,并区分中英数特字符使用不同比率。
  • 缓存机制: 为 Token 计数 (EstimateTokenCounter) 和 LLM 摘要 (LLMSummaryCompressor) 添加了缓存,以避免重复计算。
  • ContextManager 优化: 实现了消息指纹机制来减少重复 token 计算,并添加了压缩统计信息。
  • Bug 修复: 解决了缓存键碰撞问题(通过使用完整消息历史生成缓存键)和 overhead 重复计算问题。
  • 单元测试: 新增了 18 个测试用例,全面覆盖了优化后的上下文压缩模块。
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@dosubot dosubot bot added area:core The bug / feature is about astrbot's core, backend area:provider The bug / feature is about AI Provider, Models, LLM Agent, LLM Agent Runner. labels Mar 20, 2026
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues, and left some high level feedback:

  • ContextManager currently relies on the token counter’s private method _get_cache_key for fingerprints; consider exposing a public fingerprint/cache-key API on EstimateTokenCounter instead so the manager doesn’t depend on another class’s internals.
  • _generate_summary_cache_key only uses the first 50 characters of each message’s content and ignores provider/instruction configuration, which can lead to summary cache collisions or stale summaries if those change; consider hashing full content plus relevant config to make the cache safer.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- ContextManager currently relies on the token counter’s private method _get_cache_key for fingerprints; consider exposing a public fingerprint/cache-key API on EstimateTokenCounter instead so the manager doesn’t depend on another class’s internals.
- _generate_summary_cache_key only uses the first 50 characters of each message’s content and ignores provider/instruction configuration, which can lead to summary cache collisions or stale summaries if those change; consider hashing full content plus relevant config to make the cache safer.

## Individual Comments

### Comment 1
<location path="tests/test_context_compression.py" line_range="241-250" />
<code_context>
+class TestLLMSummaryCompressor:
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for error handling, cache clearing, and `should_compress` behavior in `LLMSummaryCompressor`.

Right now we only cover the happy path and basic caching. It would be valuable to: (1) add a test where `provider.text_chat` raises to confirm `__call__` returns the original messages; (2) verify `clear_cache()` actually invalidates entries by asserting the provider is called again (e.g., `call_count` increases after clearing); and (3) add focused tests for `should_compress` using edge values around the threshold for `current_tokens` vs `max_tokens` to validate the decision boundary.

Suggested implementation:

```python
class TestLLMSummaryCompressor:
    """Test cases for LLM summary compressor."""

    def setup_method(self):
        """Setup test fixtures."""
        self.mock_provider = Mock()
        self.mock_provider.text_chat = AsyncMock()
        self.mock_provider.text_chat.return_value = Mock(completion_text="这是一段摘要。")

        # NOTE: adjust arguments here if LLMSummaryCompressor requires more params.
        self.compressor = LLMSummaryCompressor(
            provider=self.mock_provider,
        )

    @pytest.mark.asyncio
    async def test_error_handling_returns_original_messages_on_provider_failure(self):
        """When provider.text_chat raises, __call__ should return original messages unchanged."""
        messages = [
            Message(role="user", content="需要被压缩的对话内容"),
            Message(role="assistant", content="助手的长回复"),
        ]

        # Force the provider to fail
        self.mock_provider.text_chat.side_effect = RuntimeError("LLM error")

        # Call compressor (adjust call signature if needed)
        result_messages = await self.compressor(
            messages=messages,
            max_tokens=2048,
            language="zh",
        )

        # Should fall back to returning the original messages
        assert result_messages == messages

    @pytest.mark.asyncio
    async def test_clear_cache_causes_provider_to_be_called_again(self):
        """clear_cache should invalidate cached summaries so provider is called again."""
        messages = [
            Message(role="user", content="第一条消息"),
            Message(role="assistant", content="第一条回复"),
        ]

        # First call: should hit provider
        result1 = await self.compressor(
            messages=messages,
            max_tokens=2048,
            language="zh",
        )
        assert self.mock_provider.text_chat.call_count == 1
        assert "这是一段摘要" in result1[-1].content

        # Change provider output to detect cache behavior
        self.mock_provider.text_chat.reset_mock()
        self.mock_provider.text_chat.return_value = Mock(completion_text="新的摘要内容")

        # Second call without clearing cache: should use cached result, not call provider
        result2 = await self.compressor(
            messages=messages,
            max_tokens=2048,
            language="zh",
        )
        assert self.mock_provider.text_chat.call_count == 0
        # Still old summary because of cache
        assert "这是一段摘要" in result2[-1].content

        # Clear cache and call again: provider should be hit and new summary used
        self.compressor.clear_cache()

        result3 = await self.compressor(
            messages=messages,
            max_tokens=2048,
            language="zh",
        )
        assert self.mock_provider.text_chat.call_count == 1
        assert "新的摘要内容" in result3[-1].content

    def test_should_compress_below_threshold(self):
        """should_compress should return False when current_tokens is just below threshold."""
        max_tokens = 1000

        # Derive threshold ratio from compressor if available; otherwise fall back to a default.
        ratio = getattr(self.compressor, "compression_ratio_threshold", 0.8)
        threshold_tokens = int(max_tokens * ratio)

        current_tokens = threshold_tokens - 1  # just below
        assert self.compressor.should_compress(current_tokens=current_tokens, max_tokens=max_tokens) is False

    def test_should_compress_at_threshold(self):
        """Validate decision boundary when current_tokens is at the threshold."""
        max_tokens = 1000
        ratio = getattr(self.compressor, "compression_ratio_threshold", 0.8)
        threshold_tokens = int(max_tokens * ratio)

        current_tokens = threshold_tokens  # exactly at threshold

        # Most implementations consider reaching the threshold as needing compression.
        assert self.compressor.should_compress(current_tokens=current_tokens, max_tokens=max_tokens) is True

    def test_should_compress_above_threshold(self):
        """should_compress should return True when current_tokens is just above threshold."""
        max_tokens = 1000
        ratio = getattr(self.compressor, "compression_ratio_threshold", 0.8)
        threshold_tokens = int(max_tokens * ratio)

        current_tokens = threshold_tokens + 1  # just above
        assert self.compressor.should_compress(current_tokens=current_tokens, max_tokens=max_tokens) is True

```

1. Ensure `pytest` is imported at the top of `tests/test_context_compression.py` if it is not already:
   - `import pytest`
2. Confirm the actual `LLMSummaryCompressor` constructor signature:
   - If it requires additional parameters (e.g., `tokenizer`, `summary_language`, `summary_max_tokens`, etc.), add them in `setup_method` where `self.compressor = LLMSummaryCompressor(...)` is created.
3. Adjust the `__call__` signature used in the tests:
   - Update the calls to `await self.compressor(messages=..., max_tokens=..., language=...)` to match the real `__call__` parameters (names and count).
4. If `LLMSummaryCompressor` uses a different attribute name than `compression_ratio_threshold` for the threshold ratio, update the tests:
   - Replace `"compression_ratio_threshold"` with the actual attribute name that represents the compression threshold.
5. If `__call__` returns something other than a list of `Message` objects (e.g., a tuple with metadata), adapt the assertions to check the correct part of the return value that contains the compressed messages.
</issue_to_address>

### Comment 2
<location path="astrbot/core/agent/context/token_counter.py" line_range="65" />
<code_context>
+        self._hit_count = 0
+        self._miss_count = 0
+
+    def _get_cache_key(self, messages: list[Message]) -> int:
+        """Generate a cache key for messages based on full history structure.
+        
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the cache key computation and cache eviction policy by using an explicit structural fingerprint plus a small LRU cache for more deterministic and maintainable behavior.

You can keep all new features (better heuristic, caching, stats) but reduce complexity and improve determinism by:

1. Replacing the rolling hash with an explicit, structural fingerprint.
2. Using a simple LRU-style cache with `OrderedDict` instead of half-clearing arbitrary keys.

### 1) Simplify `_get_cache_key` with an explicit fingerprint

Instead of nested re‑hashing and `str(...)`, build a simple, immutable structure and hash that. This is easier to reason about and avoids depending on `str()` representations:

```python
from typing import Hashable

def _message_fingerprint(msg: Message) -> Hashable:
    # Normalize content
    if isinstance(msg.content, str):
        content_repr: Hashable = msg.content
    elif isinstance(msg.content, list):
        # Keep explicit structure, don’t rely on str(list)
        parts = []
        for part in msg.content:
            if isinstance(part, TextPart):
                parts.append(("text", part.text))
            elif isinstance(part, ThinkPart):
                parts.append(("think", part.think))
            elif isinstance(part, ImageURLPart):
                parts.append(("image", part.image_url))
            elif isinstance(part, AudioURLPart):
                parts.append(("audio", part.audio_url))
            else:
                parts.append(("other", repr(part)))
        content_repr = tuple(parts)
    else:
        content_repr = repr(msg.content)

    # Normalize tool_calls (dicts sorted by key, others by repr)
    if msg.tool_calls:
        tools = []
        for tc in msg.tool_calls:
            if isinstance(tc, dict):
                tools.append(("dict", tuple(sorted(tc.items()))))
            else:
                tools.append(("other", repr(tc)))
        tool_repr: Hashable = tuple(tools)
    else:
        tool_repr = ()

    return (msg.role, content_repr, tool_repr)

def _get_cache_key(self, messages: list[Message]) -> int:
    if not messages:
        return 0
    fingerprint = tuple(_message_fingerprint(m) for m in messages)
    return hash(fingerprint)
```

This keeps the “full-history structure” idea while making the identity explicit and deterministic.

### 2) Replace custom eviction with a tiny LRU cache

Use `collections.OrderedDict` to get deterministic, straightforward eviction instead of clearing half of the cache based on dict key order:

```python
from collections import OrderedDict

class EstimateTokenCounter:
    def __init__(self, cache_size: int = 100) -> None:
        self._cache: "OrderedDict[int, int]" = OrderedDict()
        self._cache_size = cache_size
        self._hit_count = 0
        self._miss_count = 0

    def count_tokens(
        self, messages: list[Message], trusted_token_usage: int = 0
    ) -> int:
        if trusted_token_usage > 0:
            return trusted_token_usage

        cache_key = self._get_cache_key(messages)

        if cache_key in self._cache:
            self._hit_count += 1
            # mark as recently used
            self._cache.move_to_end(cache_key)
            return self._cache[cache_key]

        self._miss_count += 1
        total = self._count_tokens_internal(messages)

        if self._cache_size > 0:
            if len(self._cache) >= self._cache_size:
                # evict least-recently-used
                self._cache.popitem(last=False)
            self._cache[cache_key] = total

        return total
```

This keeps caching and stats behavior but removes the non-obvious “clear half of the cache” policy and reliance on incidental dict ordering.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +241 to +250
class TestLLMSummaryCompressor:
"""Test cases for LLM summary compressor."""

def setup_method(self):
"""Setup test fixtures."""
self.mock_provider = Mock()
self.mock_provider.text_chat = AsyncMock()
self.mock_provider.text_chat.return_value = Mock(completion_text="这是一段摘要。")

self.compressor = LLMSummaryCompressor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add tests for error handling, cache clearing, and should_compress behavior in LLMSummaryCompressor.

Right now we only cover the happy path and basic caching. It would be valuable to: (1) add a test where provider.text_chat raises to confirm __call__ returns the original messages; (2) verify clear_cache() actually invalidates entries by asserting the provider is called again (e.g., call_count increases after clearing); and (3) add focused tests for should_compress using edge values around the threshold for current_tokens vs max_tokens to validate the decision boundary.

Suggested implementation:

class TestLLMSummaryCompressor:
    """Test cases for LLM summary compressor."""

    def setup_method(self):
        """Setup test fixtures."""
        self.mock_provider = Mock()
        self.mock_provider.text_chat = AsyncMock()
        self.mock_provider.text_chat.return_value = Mock(completion_text="这是一段摘要。")

        # NOTE: adjust arguments here if LLMSummaryCompressor requires more params.
        self.compressor = LLMSummaryCompressor(
            provider=self.mock_provider,
        )

    @pytest.mark.asyncio
    async def test_error_handling_returns_original_messages_on_provider_failure(self):
        """When provider.text_chat raises, __call__ should return original messages unchanged."""
        messages = [
            Message(role="user", content="需要被压缩的对话内容"),
            Message(role="assistant", content="助手的长回复"),
        ]

        # Force the provider to fail
        self.mock_provider.text_chat.side_effect = RuntimeError("LLM error")

        # Call compressor (adjust call signature if needed)
        result_messages = await self.compressor(
            messages=messages,
            max_tokens=2048,
            language="zh",
        )

        # Should fall back to returning the original messages
        assert result_messages == messages

    @pytest.mark.asyncio
    async def test_clear_cache_causes_provider_to_be_called_again(self):
        """clear_cache should invalidate cached summaries so provider is called again."""
        messages = [
            Message(role="user", content="第一条消息"),
            Message(role="assistant", content="第一条回复"),
        ]

        # First call: should hit provider
        result1 = await self.compressor(
            messages=messages,
            max_tokens=2048,
            language="zh",
        )
        assert self.mock_provider.text_chat.call_count == 1
        assert "这是一段摘要" in result1[-1].content

        # Change provider output to detect cache behavior
        self.mock_provider.text_chat.reset_mock()
        self.mock_provider.text_chat.return_value = Mock(completion_text="新的摘要内容")

        # Second call without clearing cache: should use cached result, not call provider
        result2 = await self.compressor(
            messages=messages,
            max_tokens=2048,
            language="zh",
        )
        assert self.mock_provider.text_chat.call_count == 0
        # Still old summary because of cache
        assert "这是一段摘要" in result2[-1].content

        # Clear cache and call again: provider should be hit and new summary used
        self.compressor.clear_cache()

        result3 = await self.compressor(
            messages=messages,
            max_tokens=2048,
            language="zh",
        )
        assert self.mock_provider.text_chat.call_count == 1
        assert "新的摘要内容" in result3[-1].content

    def test_should_compress_below_threshold(self):
        """should_compress should return False when current_tokens is just below threshold."""
        max_tokens = 1000

        # Derive threshold ratio from compressor if available; otherwise fall back to a default.
        ratio = getattr(self.compressor, "compression_ratio_threshold", 0.8)
        threshold_tokens = int(max_tokens * ratio)

        current_tokens = threshold_tokens - 1  # just below
        assert self.compressor.should_compress(current_tokens=current_tokens, max_tokens=max_tokens) is False

    def test_should_compress_at_threshold(self):
        """Validate decision boundary when current_tokens is at the threshold."""
        max_tokens = 1000
        ratio = getattr(self.compressor, "compression_ratio_threshold", 0.8)
        threshold_tokens = int(max_tokens * ratio)

        current_tokens = threshold_tokens  # exactly at threshold

        # Most implementations consider reaching the threshold as needing compression.
        assert self.compressor.should_compress(current_tokens=current_tokens, max_tokens=max_tokens) is True

    def test_should_compress_above_threshold(self):
        """should_compress should return True when current_tokens is just above threshold."""
        max_tokens = 1000
        ratio = getattr(self.compressor, "compression_ratio_threshold", 0.8)
        threshold_tokens = int(max_tokens * ratio)

        current_tokens = threshold_tokens + 1  # just above
        assert self.compressor.should_compress(current_tokens=current_tokens, max_tokens=max_tokens) is True
  1. Ensure pytest is imported at the top of tests/test_context_compression.py if it is not already:
    • import pytest
  2. Confirm the actual LLMSummaryCompressor constructor signature:
    • If it requires additional parameters (e.g., tokenizer, summary_language, summary_max_tokens, etc.), add them in setup_method where self.compressor = LLMSummaryCompressor(...) is created.
  3. Adjust the __call__ signature used in the tests:
    • Update the calls to await self.compressor(messages=..., max_tokens=..., language=...) to match the real __call__ parameters (names and count).
  4. If LLMSummaryCompressor uses a different attribute name than compression_ratio_threshold for the threshold ratio, update the tests:
    • Replace "compression_ratio_threshold" with the actual attribute name that represents the compression threshold.
  5. If __call__ returns something other than a list of Message objects (e.g., a tuple with metadata), adapt the assertions to check the correct part of the return value that contains the compressed messages.

self._hit_count = 0
self._miss_count = 0

def _get_cache_key(self, messages: list[Message]) -> int:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider simplifying the cache key computation and cache eviction policy by using an explicit structural fingerprint plus a small LRU cache for more deterministic and maintainable behavior.

You can keep all new features (better heuristic, caching, stats) but reduce complexity and improve determinism by:

  1. Replacing the rolling hash with an explicit, structural fingerprint.
  2. Using a simple LRU-style cache with OrderedDict instead of half-clearing arbitrary keys.

1) Simplify _get_cache_key with an explicit fingerprint

Instead of nested re‑hashing and str(...), build a simple, immutable structure and hash that. This is easier to reason about and avoids depending on str() representations:

from typing import Hashable

def _message_fingerprint(msg: Message) -> Hashable:
    # Normalize content
    if isinstance(msg.content, str):
        content_repr: Hashable = msg.content
    elif isinstance(msg.content, list):
        # Keep explicit structure, don’t rely on str(list)
        parts = []
        for part in msg.content:
            if isinstance(part, TextPart):
                parts.append(("text", part.text))
            elif isinstance(part, ThinkPart):
                parts.append(("think", part.think))
            elif isinstance(part, ImageURLPart):
                parts.append(("image", part.image_url))
            elif isinstance(part, AudioURLPart):
                parts.append(("audio", part.audio_url))
            else:
                parts.append(("other", repr(part)))
        content_repr = tuple(parts)
    else:
        content_repr = repr(msg.content)

    # Normalize tool_calls (dicts sorted by key, others by repr)
    if msg.tool_calls:
        tools = []
        for tc in msg.tool_calls:
            if isinstance(tc, dict):
                tools.append(("dict", tuple(sorted(tc.items()))))
            else:
                tools.append(("other", repr(tc)))
        tool_repr: Hashable = tuple(tools)
    else:
        tool_repr = ()

    return (msg.role, content_repr, tool_repr)

def _get_cache_key(self, messages: list[Message]) -> int:
    if not messages:
        return 0
    fingerprint = tuple(_message_fingerprint(m) for m in messages)
    return hash(fingerprint)

This keeps the “full-history structure” idea while making the identity explicit and deterministic.

2) Replace custom eviction with a tiny LRU cache

Use collections.OrderedDict to get deterministic, straightforward eviction instead of clearing half of the cache based on dict key order:

from collections import OrderedDict

class EstimateTokenCounter:
    def __init__(self, cache_size: int = 100) -> None:
        self._cache: "OrderedDict[int, int]" = OrderedDict()
        self._cache_size = cache_size
        self._hit_count = 0
        self._miss_count = 0

    def count_tokens(
        self, messages: list[Message], trusted_token_usage: int = 0
    ) -> int:
        if trusted_token_usage > 0:
            return trusted_token_usage

        cache_key = self._get_cache_key(messages)

        if cache_key in self._cache:
            self._hit_count += 1
            # mark as recently used
            self._cache.move_to_end(cache_key)
            return self._cache[cache_key]

        self._miss_count += 1
        total = self._count_tokens_internal(messages)

        if self._cache_size > 0:
            if len(self._cache) >= self._cache_size:
                # evict least-recently-used
                self._cache.popitem(last=False)
            self._cache[cache_key] = total

        return total

This keeps caching and stats behavior but removes the non-obvious “clear half of the cache” policy and reliance on incidental dict ordering.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

您好,感谢您对上下文压缩模块的性能优化。这次的改进非常全面,包括了更精确的 Token 估算、为 Token 计数和摘要增加了缓存机制、以及通过消息指纹减少重复计算等,并辅以了详尽的单元测试,这对于提升性能和稳定性非常有帮助。

代码整体质量很高,我只有一个关于 token_counter.py 中缓存淘汰策略的建议,旨在提高效率并保持代码风格的一致性。请查看具体的审查评论。

Comment on lines 143 to +145

# calculate compress rate
compress_rate = (tokens_after_summary / self.config.max_context_tokens) * 100
compress_rate = (

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

This expression logs
sensitive data (secret)
as clear text.
This expression logs
sensitive data (secret)
as clear text.
@rin259
Copy link
Contributor Author

rin259 commented Mar 21, 2026

CodeQL False-Positive Fix

The CodeQL pycleartext-logging-sensitive-data alert is a false positive — the prev_tokens parameter is just an integer token count, not an auth token. CodeQL's heuristic incorrectly flags variable names containing "token".

Fix PR: #6736 — renamed prev_tokens to token_count_before in _run_compression().

Once #6736 is merged, the CodeQL check should pass.

@rin259
Copy link
Contributor Author

rin259 commented Mar 21, 2026

Update: Combined Fix PR #6738

The previous fix PRs #6736 and #6737 have been superseded by #6738, which combines both the CodeQL fix and the format-check fix in one PR.

Please review and merge #6738 instead ⬇️

#6738

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:core The bug / feature is about astrbot's core, backend area:provider The bug / feature is about AI Provider, Models, LLM Agent, LLM Agent Runner. size:XL This PR changes 500-999 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant