Skip to content

fix(agentic): reasoning model token tracking, event-loop safety, stre…#989

Merged
brajrajnagar merged 1 commit intomasterfrom
AgenticModelClass_update
Mar 16, 2026
Merged

fix(agentic): reasoning model token tracking, event-loop safety, stre…#989
brajrajnagar merged 1 commit intomasterfrom
AgenticModelClass_update

Conversation

@brajrajnagar
Copy link
Contributor

Summary

  • Fix token tracking for reasoning models: _add_tokens now derives
    completion_tokens as total_tokens - prompt_tokens when total_tokens is
    available. Reasoning models (o1, o3) exclude reasoning tokens from
    completion_tokens but include them in total_tokens, so this ensures
    reasoning tokens are counted in the reported usage.

  • Fix streaming token double-counting: Added _stream_with_nulled_usage
    wrapper that nulls out usage on all but the final usage-bearing chunk.
    Providers like Gemini emit a usage object on every stream chunk, causing
    token counts to be multiplied by the number of chunks.

  • Fix token collection for MCP streaming: Token accumulation during MCP
    streaming happens inside the pool's background event loop thread (via
    _sync_to_async_iter). Replaced the _finalize_tokens() call (which reads
    from the wrong thread's _thread_local) with pool._run_async(_clear_bg_tokens())
    to correctly collect tokens from the background thread. Added _drain_tokens()
    to atomically read-and-reset the accumulator.

  • Fix event loop blocking in streaming: Added _sync_to_async_iter which
    wraps sync iterators (OpenAI streaming responses) to run each next() call in
    a thread pool executor, preventing the background event loop thread from
    blocking during I/O waits.

  • Add reasoning_effort support: Passed through to _create_stream_request
    and _stream_chat_with_tools so reasoning-capable models honor the parameter
    end-to-end.

  • Preserve extended tool call attributes: _accumulate_tool_delta now uses
    model_dump to capture all fields generically (e.g. thought_signature from
    reasoning models) instead of only copying id/type/function.
    _finalize_tool_calls preserves all accumulated attributes accordingly.

  • Refactor: Extracted _extract_tool_content and _normalize_input_items
    to eliminate duplicated inline logic across all four _execute_*_tools*
    methods and two input-normalization sites.

  • Verification: verified the changes with gemini 3.1 flash lite and GPT-5.4 wrapper.

@github-actions
Copy link

Code Coverage

Package Line Rate Health
clarifai 45%
clarifai.cli 61%
clarifai.cli.templates 67%
clarifai.cli.templates.toolkits 100%
clarifai.client 65%
clarifai.client.auth 67%
clarifai.constants 100%
clarifai.datasets 100%
clarifai.datasets.export 69%
clarifai.datasets.upload 75%
clarifai.datasets.upload.loaders 37%
clarifai.models 100%
clarifai.rag 0%
clarifai.runners 52%
clarifai.runners.models 58%
clarifai.runners.pipeline_steps 39%
clarifai.runners.pipelines 72%
clarifai.runners.utils 62%
clarifai.runners.utils.data_types 72%
clarifai.schema 100%
clarifai.urls 58%
clarifai.utils 65%
clarifai.utils.evaluation 16%
clarifai.workflows 95%
Summary 60% (11373 / 18970)

Minimum allowed line rate is 50%

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Improves AgenticModelClass streaming/tool-loop behavior by fixing token accounting (especially for reasoning models and streaming providers that emit usage per chunk) and addressing event-loop safety when consuming sync streaming iterators.

Changes:

  • Adjust token accumulation to derive completion_tokens from total_tokens - prompt_tokens and add drain/reset semantics for thread-local token tracking.
  • Add streaming helpers to prevent usage double-counting and to avoid blocking the background event loop when iterating sync streaming responses.
  • Refactor repeated tool-result extraction and responses input normalization into shared helpers; expand tests accordingly.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.

File Description
clarifai/runners/models/agentic_class.py Token tracking fixes, streaming wrappers/executor bridging, helper refactors, and MCP streaming token finalization changes.
tests/runners/test_agentic_class.py Updates and expands tests for token draining/finalization and new helper methods; adjusts MCP streaming test expectations.
Comments suppressed due to low confidence (1)

tests/runners/test_agentic_class.py:78

  • The updated token-tracking logic prefers total_tokens - prompt_tokens when total_tokens is present, but the tests currently only cover the total_tokens is None path. Add a test case where total_tokens is set (and completion_tokens differs) to ensure reasoning-token-inclusive accounting behaves as intended.
    def test_add_tokens_from_usage(self, model):
        """Test adding tokens from response with usage attribute."""
        mock_response = MagicMock()
        mock_usage = MagicMock()
        mock_usage.prompt_tokens = 10
        mock_usage.completion_tokens = 20
        mock_usage.total_tokens = None
        mock_response.usage = mock_usage

        model._add_tokens(mock_response)
        assert model._thread_local.tokens['prompt'] == 10
        assert model._thread_local.tokens['completion'] == 20

"""Yield chunks with usage=None on all but the last usage-bearing chunk.

def _create_stream_request(self, messages, tools, max_tokens, temperature, top_p):
Some providers(Gemini) send a usage object on every chunk; keeping only the last one
Comment on lines +805 to +823
def _stream_with_nulled_usage(self, chunks):
"""Yield chunks with usage=None on all but the last usage-bearing chunk.

def _create_stream_request(self, messages, tools, max_tokens, temperature, top_p):
Some providers(Gemini) send a usage object on every chunk; keeping only the last one
avoids accumulating duplicated token counts across the multi-turn tool loop.
We buffer the most recent usage chunk and null out all earlier ones before yielding.
"""
self.buffered_usage_chunk = None
for chunk in chunks:
if getattr(chunk, "usage", None) is not None:
if self.buffered_usage_chunk is not None:
# Null out earlier usage so _set_usage only counts the final summary.
self.buffered_usage_chunk.usage = None
yield self.buffered_usage_chunk
self.buffered_usage_chunk = chunk
else:
yield chunk
if self.buffered_usage_chunk is not None:
yield self.buffered_usage_chunk
Comment on lines +812 to +823
self.buffered_usage_chunk = None
for chunk in chunks:
if getattr(chunk, "usage", None) is not None:
if self.buffered_usage_chunk is not None:
# Null out earlier usage so _set_usage only counts the final summary.
self.buffered_usage_chunk.usage = None
yield self.buffered_usage_chunk
self.buffered_usage_chunk = chunk
else:
yield chunk
if self.buffered_usage_chunk is not None:
yield self.buffered_usage_chunk
Comment on lines +467 to +473
completion_tokens = (
(total_tokens - prompt_tokens)
if total_tokens is not None
else (
getattr(usage, 'completion_tokens', 0) or getattr(usage, 'output_tokens', 0) or 0
)
)
# empty accumulator.
bg_tokens = pool._run_async(self._clear_bg_tokens())
if bg_tokens['prompt'] > 0 or bg_tokens['completion'] > 0:
logger.info(
@brajrajnagar brajrajnagar merged commit e9510d1 into master Mar 16, 2026
15 checks passed
@brajrajnagar brajrajnagar deleted the AgenticModelClass_update branch March 16, 2026 15:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants