Skip to content

feat(telegram): supports sendMessageDraft API#5726

Merged
Soulter merged 5 commits intoAstrBotDevs:masterfrom
camera-2018:master
Mar 4, 2026
Merged

feat(telegram): supports sendMessageDraft API#5726
Soulter merged 5 commits intoAstrBotDevs:masterfrom
camera-2018:master

Conversation

@camera-2018
Copy link
Contributor

@camera-2018 camera-2018 commented Mar 3, 2026

Telegram 现有的流式输出方案使用 send_message + edit_message_text,存在消息闪烁、推送通知干扰、编辑频率受 API 限制等问题。Telegram Bot API v9.3 新增了 sendMessageDraft ,可以在私聊中推送草稿预览动画而不创建真实消息,体验更接近"打字机"效果。 (需要开启AI配置-其他配置中的流式输出开关)

手机端效果

22a4b26f67c0e005797059a93a681372.mp4

电脑端效果

Video_5512051703405.1.1.mp4

Modifications / 改动点

astrbot/core/platform/sources/telegram/tg_event.py

  • 新增模块级 _allocate_draft_id() 函数,生成全局递增的 draft_id,确保 Telegram 端草稿变更以动画展示

  • 新增 _send_message_draft() 方法,封装 Bot.send_message_draft API 调用

  • 新增 _send_streaming_draft() 方法,私聊专用的流式推送:

    • 使用独立异步发送循环_draft_sender_loop),每 0.5s 取最新缓冲区内容发送 draft,完全解耦 LLM token 到达速度与 Telegram API 网络延迟
    • 流式结束后发送真实 send_message(MarkdownV2)保留最终内容(draft 是临时的会消失)
    • 支持 break 分割符的正确处理
  • 修改 send_streaming() 入口:私聊自动使用 _send_streaming_draft,群聊回退到 _send_streaming_edit

  • 流式结束后,先发一条 ⏳ emoji 的 draft 清空 draft 显示,再发真实消息,避免 draft 文本与真实消息同时可见("双消息"问题 // todo 疑似为tg手机端bug,故使用此策略)

  • 流式中 draft 使用 MarkdownV2 渲染发送(与最终消息格式一致),markdownify 失败时回退纯文本

  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果


Checklist / 检查清单

  • 😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。/ If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
  • 👀 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。/ My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
  • 🤓 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到了 requirements.txtpyproject.toml 文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
  • 😮 我的更改没有引入恶意代码。/ My changes do not introduce malicious code.

- 新增 _send_message_draft 方法封装 Telegram Bot API sendMessageDraft
- 私聊流式输出使用 sendMessageDraft 推送草稿动画,群聊保留 edit_message_text 回退
- 使用独立异步发送循环 (_draft_sender_loop) 按固定间隔推送最新缓冲区内容,
  完全解耦 token 到达速度与 API 网络延迟
- 流式结束后发送真实消息保留最终内容(draft 是临时的)
- 使用模块级递增 draft_id 替代随机生成,确保 Telegram 端动画连续性
@dosubot dosubot bot added the size:L This PR changes 100-499 lines, ignoring generated files. label Mar 3, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

此拉取请求旨在显著改善 Telegram 私聊中的流式输出用户体验。通过利用 Telegram Bot API v9.3 中新增的 sendMessageDraft 功能,解决了现有 send_messageedit_message_text 组合方案中存在的闪烁、通知干扰和 API 频率限制等问题。新的实现方式在私聊中提供了一种更接近“打字机”效果的草稿预览动画,同时通过独立的异步发送机制优化了性能和响应速度,确保了流畅的交互体验。

Highlights

  • 私聊流式输出优化: 引入了新的 sendMessageDraft API,用于在 Telegram 私聊中实现更流畅的流式输出体验,解决了消息闪烁、通知干扰和编辑频率限制等问题。
  • 独立异步发送循环: 为草稿消息实现了一个独立的异步发送循环 (_draft_sender_loop),以解耦 LLM token 生成速度与 Telegram API 的网络延迟,确保了更平滑的“打字机”效果。
  • 智能流式策略: 在私聊中自动使用 _send_streaming_draft 进行流式输出,而在群聊中则回退到现有的 edit_message_text 方案,兼顾了不同聊天类型的最佳实践。
  • 草稿ID管理: 新增了模块级的 _allocate_draft_id() 函数,用于生成全局递增的 draft_id,确保草稿消息的动画展示效果。
  • 依赖更新: 更新了 python-telegram-bot 依赖到 22.6 版本,以支持新的 sendMessageDraft API。
Changelog
  • astrbot/core/platform/sources/telegram/tg_event.py
    • 新增了 _allocate_draft_id() 函数,用于管理 sendMessageDraft 所需的 draft_id
    • 添加了 _send_message_draft() 方法,封装了 Bot.send_message_draft API 调用。
    • 实现了 _send_streaming_draft() 方法,专为私聊设计,通过独立的异步循环发送草稿消息,并在流式结束后发送最终的真实消息。
    • 修改了 send_streaming() 方法的逻辑,使其在私聊时调用 _send_streaming_draft,在群聊时回退到 _send_streaming_edit
  • pyproject.toml
    • 更新了 python-telegram-bot 的版本依赖至 22.6
  • requirements.txt
    • 更新了 python-telegram-bot 的版本依赖至 22.6
  • tests/fixtures/mocks/telegram.py
    • Bot 类的 mock 对象添加了 send_message_draft 方法。
Activity
  • 此拉取请求自创建以来,尚未有评论或审查活动。
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@dosubot dosubot bot added the area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. label Mar 3, 2026
@dosubot
Copy link

dosubot bot commented Mar 3, 2026

Related Documentation

Checked 1 published document(s) in 1 knowledge base(s). No updates required.

How did I do? Any feedback?  Join Discord

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

感谢你的贡献!这个 PR 通过使用 sendMessageDraft API 极大地改善了 Telegram 私聊中的流式输出体验,设计思路清晰,实现也很完整。

我对代码进行了一些审查,并提出以下几点建议,希望能帮助进一步提升代码质量:

  1. _send_streaming_draft 方法中,发送最终消息的逻辑存在重复。
  2. _draft_sender_loop 中,对草稿发送失败的异常处理可以稍作改进,增加日志以方便调试。

具体的修改建议请见行内评论。总体来说,这是一次很棒的更新!

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

你好——我发现了 1 个问题,并留下了一些高层次反馈:

  • send_streaming 中,你现在已经完全通过 _send_streaming_draft / _send_streaming_edit 处理了流式发送,但在此之后仍然执行 return await super().send_streaming(generator, use_fallback),这会在 generator 已被消费的情况下再次运行;建议移除 super() 调用,或重新调整结构以避免被处理两次。
  • _allocate_draft_id 中的全局 _next_draft_id 计数器是一个简单的模块级可变状态;如果这段代码可能会在多线程或多进程环境下并发执行,你可能需要让草稿 ID 的分配显式地绑定到进程或实例,以避免冲突。
  • is_private 检查通过 self.get_message_type() != MessageType.GROUP_MESSAGE 来决定是否调用 sendMessageDraft;如果 MessageType 还可能表示其他非私聊上下文(例如频道/超级群),那么显式检查“私聊/DM 类型”会比依赖 != GROUP_MESSAGE 更安全。
供 AI 代理使用的提示
Please address the comments from this code review:

## Overall Comments
- In `send_streaming`, you now fully handle streaming via `_send_streaming_draft` / `_send_streaming_edit` but still `return await super().send_streaming(generator, use_fallback)` afterward, which will run with an already-consumed generator; consider removing the `super()` call or restructuring to avoid double-handling.
- The global `_next_draft_id` counter in `_allocate_draft_id` is a simple module-level mutable state; if this code can be executed concurrently across threads or processes, you may want to make draft ID allocation explicitly process- or instance-scoped to avoid collisions.
- The `is_private` check uses `self.get_message_type() != MessageType.GROUP_MESSAGE` to decide whether to call `sendMessageDraft`; if `MessageType` can represent other non-private contexts (e.g., channels/supergroups), it might be safer to explicitly check for a private/DM type instead of relying on `!= GROUP_MESSAGE`.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/telegram/tg_event.py" line_range="426" />
<code_context>
+
+        return await super().send_streaming(generator, use_fallback)
+
+    async def _send_streaming_draft(
+        self,
+        user_name: str,
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the shared MessageChain item handling, simplifying the draft sender loop, and localizing draft ID allocation to reduce duplication and make the streaming logic easier to follow and maintain.

You can reduce complexity and duplication without changing behavior by:

### 1. Extracting shared `MessageChain` processing

The inner `for i in chain.chain` loop is essentially duplicated between `_send_streaming_draft` and `_send_streaming_edit`. You can factor it out into a small helper that appends text and sends media; the caller just passes how to accumulate text and any extra context.

```python
async def _process_message_chain_items(
    self,
    chain: MessageChain,
    payload: dict[str, Any],
    user_name: str,
    message_thread_id: str | None,
    append_text: Callable[[str], None],
) -> None:
    for i in chain.chain:
        if isinstance(i, Plain):
            append_text(i.text)
        elif isinstance(i, Image):
            image_path = await i.convert_to_file_path()
            await self._send_media_with_action(
                self.client,
                ChatAction.UPLOAD_PHOTO,
                self.client.send_photo,
                user_name=user_name,
                photo=image_path,
                **cast(Any, payload),
            )
        elif isinstance(i, File):
            path = await i.get_file()
            name = i.name or os.path.basename(path)
            await self._send_media_with_action(
                self.client,
                ChatAction.UPLOAD_DOCUMENT,
                self.client.send_document,
                user_name=user_name,
                document=path,
                filename=name,
                **cast(Any, payload),
            )
        elif isinstance(i, Record):
            path = await i.convert_to_file_path()
            await self._send_voice_with_fallback(
                self.client,
                path,
                payload,
                caption=i.text or None,
                user_name=user_name,
                message_thread_id=message_thread_id,
                use_media_action=True,
            )
        elif isinstance(i, Video):
            path = await i.convert_to_file_path()
            await self._send_media_with_action(
                self.client,
                ChatAction.UPLOAD_VIDEO,
                self.client.send_video,
                user_name=user_name,
                video=path,
                **cast(Any, payload),
            )
        else:
            logger.warning(f"不支持的消息类型: {type(i)}")
```

Then in both streaming functions:

```python
# in _send_streaming_draft
async for chain in generator:
    if isinstance(chain, MessageChain):
        if chain.type == "break":
            # existing break handling...
            ...
            continue

        await self._process_message_chain_items(
            chain,
            payload,
            user_name,
            message_thread_id,
            append_text=lambda t: delta.__iadd__(t),  # or a small wrapper
        )
```

```python
# in _send_streaming_edit
async for chain in generator:
    if isinstance(chain, MessageChain):
        if chain.type == "break":
            # existing break handling...
            ...
            continue

        await self._process_message_chain_items(
            chain,
            payload,
            user_name,
            message_thread_id,
            append_text=lambda t: delta.__iadd__(t),
        )
```

This keeps all behavior but removes the large duplicated `Plain/Image/File/Record/Video` branches, making both methods focused on how text is streamed (draft vs edit).


### 2. Simplifying the draft sender loop (no restart per segment)

You can keep a single sender loop for the entire `_send_streaming_draft` call and avoid cancelling/restarting the task on each `break`. Let the loop:

- watch a `current_draft_id`
- send whatever `delta` contains at fixed intervals
- stop only once at the end of the generator

On `break`, you only need to send the final real message, clear `delta`, and update `current_draft_id`; the loop picks up the new state automatically.

```python
async def _send_streaming_draft(...):
    draft_id = _allocate_draft_id()
    delta = ""
    last_sent_text = ""
    send_interval = 0.5
    done = False  # generator finished

    async def _draft_sender_loop() -> None:
        nonlocal last_sent_text, draft_id
        while not done:
            await asyncio.sleep(send_interval)
            if delta and delta != last_sent_text:
                draft_text = delta[: self.MAX_MESSAGE_LENGTH]
                if draft_text != last_sent_text:
                    try:
                        await self._send_message_draft(
                            user_name,
                            draft_id,           # always use latest draft_id
                            draft_text,
                            message_thread_id,
                        )
                        last_sent_text = draft_text
                    except Exception:
                        pass

    sender_task = asyncio.create_task(_draft_sender_loop())
    try:
        async for chain in generator:
            if not isinstance(chain, MessageChain):
                continue

            if chain.type == "break":
                # flush current segment as real message
                if delta:
                    await self._send_final_segment(delta, payload)
                # reset state for next segment; loop keeps running
                delta = ""
                last_sent_text = ""
                draft_id = _allocate_draft_id()
                continue

            await self._process_message_chain_items(
                chain,
                payload,
                user_name,
                message_thread_id,
                append_text=lambda t: delta.__iadd__(t),
            )
    finally:
        done = True
        await sender_task

    if delta:
        await self._send_final_segment(delta, payload)

async def _send_final_segment(self, delta: str, payload: dict[str, Any]) -> None:
    try:
        markdown_text = telegramify_markdown.markdownify(
            delta,
            normalize_whitespace=False,
        )
        await self.client.send_message(
            text=markdown_text,
            parse_mode="MarkdownV2",
            **cast(Any, payload),
        )
    except Exception as e:
        logger.warning(f"Markdown转换失败,使用普通文本: {e!s}")
        await self.client.send_message(text=delta, **cast(Any, payload))
```

This keeps the “periodically send latest buffer” semantics and segment-by-segment `draft_id` updates, but removes:

- `streaming_done` toggling
- task cancellation / recreation on each `break`
- duplicated “send final markdown vs plain text” logic (moved to `_send_final_segment`).


### 3. Localizing draft ID state

If possible in your object model, you can avoid the module‑level `global` counter by attaching it to the class/instance (still preserves wraparound behavior):

```python
class TelegramPlatformEvent(AstrMessageEvent):
    _TELEGRAM_DRAFT_ID_MAX = 2_147_483_647
    _next_draft_id: int = 0  # class-level or move to __init__ as self._next_draft_id

    @classmethod
    def _allocate_draft_id(cls) -> int:
        cls._next_draft_id = (
            1 if cls._next_draft_id >= cls._TELEGRAM_DRAFT_ID_MAX else cls._next_draft_id + 1
        )
        return cls._next_draft_id
```

Then in `_send_streaming_draft`:

```python
draft_id = self._allocate_draft_id()
...
draft_id = self._allocate_draft_id()
```

This removes `global` and makes the draft ID evolution easier to reason about (and to override/mock in tests) while keeping the same integer behavior.
</issue_to_address>

Sourcery 对开源项目是免费的——如果你觉得我们的评审有帮助,欢迎分享 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续评审。
Original comment in English

Hey - I've found 1 issue, and left some high level feedback:

  • In send_streaming, you now fully handle streaming via _send_streaming_draft / _send_streaming_edit but still return await super().send_streaming(generator, use_fallback) afterward, which will run with an already-consumed generator; consider removing the super() call or restructuring to avoid double-handling.
  • The global _next_draft_id counter in _allocate_draft_id is a simple module-level mutable state; if this code can be executed concurrently across threads or processes, you may want to make draft ID allocation explicitly process- or instance-scoped to avoid collisions.
  • The is_private check uses self.get_message_type() != MessageType.GROUP_MESSAGE to decide whether to call sendMessageDraft; if MessageType can represent other non-private contexts (e.g., channels/supergroups), it might be safer to explicitly check for a private/DM type instead of relying on != GROUP_MESSAGE.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `send_streaming`, you now fully handle streaming via `_send_streaming_draft` / `_send_streaming_edit` but still `return await super().send_streaming(generator, use_fallback)` afterward, which will run with an already-consumed generator; consider removing the `super()` call or restructuring to avoid double-handling.
- The global `_next_draft_id` counter in `_allocate_draft_id` is a simple module-level mutable state; if this code can be executed concurrently across threads or processes, you may want to make draft ID allocation explicitly process- or instance-scoped to avoid collisions.
- The `is_private` check uses `self.get_message_type() != MessageType.GROUP_MESSAGE` to decide whether to call `sendMessageDraft`; if `MessageType` can represent other non-private contexts (e.g., channels/supergroups), it might be safer to explicitly check for a private/DM type instead of relying on `!= GROUP_MESSAGE`.

## Individual Comments

### Comment 1
<location path="astrbot/core/platform/sources/telegram/tg_event.py" line_range="426" />
<code_context>
+
+        return await super().send_streaming(generator, use_fallback)
+
+    async def _send_streaming_draft(
+        self,
+        user_name: str,
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the shared MessageChain item handling, simplifying the draft sender loop, and localizing draft ID allocation to reduce duplication and make the streaming logic easier to follow and maintain.

You can reduce complexity and duplication without changing behavior by:

### 1. Extracting shared `MessageChain` processing

The inner `for i in chain.chain` loop is essentially duplicated between `_send_streaming_draft` and `_send_streaming_edit`. You can factor it out into a small helper that appends text and sends media; the caller just passes how to accumulate text and any extra context.

```python
async def _process_message_chain_items(
    self,
    chain: MessageChain,
    payload: dict[str, Any],
    user_name: str,
    message_thread_id: str | None,
    append_text: Callable[[str], None],
) -> None:
    for i in chain.chain:
        if isinstance(i, Plain):
            append_text(i.text)
        elif isinstance(i, Image):
            image_path = await i.convert_to_file_path()
            await self._send_media_with_action(
                self.client,
                ChatAction.UPLOAD_PHOTO,
                self.client.send_photo,
                user_name=user_name,
                photo=image_path,
                **cast(Any, payload),
            )
        elif isinstance(i, File):
            path = await i.get_file()
            name = i.name or os.path.basename(path)
            await self._send_media_with_action(
                self.client,
                ChatAction.UPLOAD_DOCUMENT,
                self.client.send_document,
                user_name=user_name,
                document=path,
                filename=name,
                **cast(Any, payload),
            )
        elif isinstance(i, Record):
            path = await i.convert_to_file_path()
            await self._send_voice_with_fallback(
                self.client,
                path,
                payload,
                caption=i.text or None,
                user_name=user_name,
                message_thread_id=message_thread_id,
                use_media_action=True,
            )
        elif isinstance(i, Video):
            path = await i.convert_to_file_path()
            await self._send_media_with_action(
                self.client,
                ChatAction.UPLOAD_VIDEO,
                self.client.send_video,
                user_name=user_name,
                video=path,
                **cast(Any, payload),
            )
        else:
            logger.warning(f"不支持的消息类型: {type(i)}")
```

Then in both streaming functions:

```python
# in _send_streaming_draft
async for chain in generator:
    if isinstance(chain, MessageChain):
        if chain.type == "break":
            # existing break handling...
            ...
            continue

        await self._process_message_chain_items(
            chain,
            payload,
            user_name,
            message_thread_id,
            append_text=lambda t: delta.__iadd__(t),  # or a small wrapper
        )
```

```python
# in _send_streaming_edit
async for chain in generator:
    if isinstance(chain, MessageChain):
        if chain.type == "break":
            # existing break handling...
            ...
            continue

        await self._process_message_chain_items(
            chain,
            payload,
            user_name,
            message_thread_id,
            append_text=lambda t: delta.__iadd__(t),
        )
```

This keeps all behavior but removes the large duplicated `Plain/Image/File/Record/Video` branches, making both methods focused on how text is streamed (draft vs edit).


### 2. Simplifying the draft sender loop (no restart per segment)

You can keep a single sender loop for the entire `_send_streaming_draft` call and avoid cancelling/restarting the task on each `break`. Let the loop:

- watch a `current_draft_id`
- send whatever `delta` contains at fixed intervals
- stop only once at the end of the generator

On `break`, you only need to send the final real message, clear `delta`, and update `current_draft_id`; the loop picks up the new state automatically.

```python
async def _send_streaming_draft(...):
    draft_id = _allocate_draft_id()
    delta = ""
    last_sent_text = ""
    send_interval = 0.5
    done = False  # generator finished

    async def _draft_sender_loop() -> None:
        nonlocal last_sent_text, draft_id
        while not done:
            await asyncio.sleep(send_interval)
            if delta and delta != last_sent_text:
                draft_text = delta[: self.MAX_MESSAGE_LENGTH]
                if draft_text != last_sent_text:
                    try:
                        await self._send_message_draft(
                            user_name,
                            draft_id,           # always use latest draft_id
                            draft_text,
                            message_thread_id,
                        )
                        last_sent_text = draft_text
                    except Exception:
                        pass

    sender_task = asyncio.create_task(_draft_sender_loop())
    try:
        async for chain in generator:
            if not isinstance(chain, MessageChain):
                continue

            if chain.type == "break":
                # flush current segment as real message
                if delta:
                    await self._send_final_segment(delta, payload)
                # reset state for next segment; loop keeps running
                delta = ""
                last_sent_text = ""
                draft_id = _allocate_draft_id()
                continue

            await self._process_message_chain_items(
                chain,
                payload,
                user_name,
                message_thread_id,
                append_text=lambda t: delta.__iadd__(t),
            )
    finally:
        done = True
        await sender_task

    if delta:
        await self._send_final_segment(delta, payload)

async def _send_final_segment(self, delta: str, payload: dict[str, Any]) -> None:
    try:
        markdown_text = telegramify_markdown.markdownify(
            delta,
            normalize_whitespace=False,
        )
        await self.client.send_message(
            text=markdown_text,
            parse_mode="MarkdownV2",
            **cast(Any, payload),
        )
    except Exception as e:
        logger.warning(f"Markdown转换失败,使用普通文本: {e!s}")
        await self.client.send_message(text=delta, **cast(Any, payload))
```

This keeps the “periodically send latest buffer” semantics and segment-by-segment `draft_id` updates, but removes:

- `streaming_done` toggling
- task cancellation / recreation on each `break`
- duplicated “send final markdown vs plain text” logic (moved to `_send_final_segment`).


### 3. Localizing draft ID state

If possible in your object model, you can avoid the module‑level `global` counter by attaching it to the class/instance (still preserves wraparound behavior):

```python
class TelegramPlatformEvent(AstrMessageEvent):
    _TELEGRAM_DRAFT_ID_MAX = 2_147_483_647
    _next_draft_id: int = 0  # class-level or move to __init__ as self._next_draft_id

    @classmethod
    def _allocate_draft_id(cls) -> int:
        cls._next_draft_id = (
            1 if cls._next_draft_id >= cls._TELEGRAM_DRAFT_ID_MAX else cls._next_draft_id + 1
        )
        return cls._next_draft_id
```

Then in `_send_streaming_draft`:

```python
draft_id = self._allocate_draft_id()
...
draft_id = self._allocate_draft_id()
```

This removes `global` and makes the draft ID evolution easier to reason about (and to override/mock in tests) while keeping the same integer behavior.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@dosubot dosubot bot added the lgtm This PR has been approved by a maintainer label Mar 4, 2026
- 提取公共方法
- 有新 token 到达时触发流式
- 生成结束后清除draft内容
- 默认draft发送md格式
@Soulter Soulter changed the title feat(telegram): 使用 sendMessageDraft API 实现私聊流式输出 feat(telegram): supports sendMessageDraft API Mar 4, 2026
@Soulter Soulter merged commit 9683abe into AstrBotDevs:master Mar 4, 2026
7 checks passed
astrbot-doc-agent bot pushed a commit to AstrBotDevs/AstrBot-docs that referenced this pull request Mar 4, 2026
@astrbot-doc-agent
Copy link

Generated docs update PR (pending manual review):
AstrBotDevs/AstrBot-docs#160
Trigger: PR merged


AI change summary:

  • 更新 zh/platform/telegram.md:新增「流式输出」章节,介绍私聊使用 sendMessageDraft API 实现打字机效果及群聊回退方案,注明依赖版本要求。
  • 更新 en/platform/telegram.md:同步新增「Streaming Output」章节,与中文文档内容保持一致。
  • i18n:中英文文档均已同步更新。

Experimental bot notice:

  • This output is generated by AstrBot-Doc-Agent for review only.
  • It does not represent the final documentation form.

Soulter added a commit that referenced this pull request Mar 5, 2026
* feat(telegram): 使用 sendMessageDraft API 实现私聊流式输出

- 新增 _send_message_draft 方法封装 Telegram Bot API sendMessageDraft
- 私聊流式输出使用 sendMessageDraft 推送草稿动画,群聊保留 edit_message_text 回退
- 使用独立异步发送循环 (_draft_sender_loop) 按固定间隔推送最新缓冲区内容,
  完全解耦 token 到达速度与 API 网络延迟
- 流式结束后发送真实消息保留最终内容(draft 是临时的)
- 使用模块级递增 draft_id 替代随机生成,确保 Telegram 端动画连续性

* fix(telegram): convert draft text to Markdown before sending message draft

* chore(telegram): telegram 适配器重构

- 提取公共方法
- 有新 token 到达时触发流式
- 生成结束后清除draft内容
- 默认draft发送md格式

* style(telegram): ruff format

* style(telegram): ruff check

---------

Co-authored-by: Soulter <905617992@qq.com>
@lekoOwO
Copy link

lekoOwO commented Mar 20, 2026

@camera-2018 你好,Bot API v9.5 已經支援在群聊和頻道使用 sendMessageDraft
是否考慮取消 fallback 到 editMessageText 呢?謝謝

@camera-2018
Copy link
Contributor Author

@camera-2018 你好,Bot API v9.5 已經支援在群聊和頻道使用 sendMessageDraft
是否考慮取消 fallback 到 editMessageText 呢?謝謝

GET!本周末我会支持这个功能😗

@lekoOwO
Copy link

lekoOwO commented Mar 20, 2026

剛剛讓 Copilot 提了 #6666,但還沒想好怎麼測試 :3

@camera-2018
Copy link
Contributor Author

剛剛讓 Copilot 提了 #6666,但還沒想好怎麼測試 :3

需要把项目拉下来在本地运行 python main.py 测试:3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. lgtm This PR has been approved by a maintainer size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants