Skip to content

Conversation

@trevor-e
Copy link
Member

@trevor-e trevor-e commented Nov 6, 2025

Goal: we are processing third-party apps and cannot predict the input will always cleanly succeed, it's fine and expected that some processing jobs will fail. We do need some guarantees that jobs will only take a certain amount of time and not clog up the system for everyone else, though.

For context, we ran into an edge case where our lief library essentially deadlocked our topic. It wasn't technically deadlocked, but it would have taken several hours to finish processing the message. Fixing this was rather painful in prod once it happened since we had to skip over that message. Instead, we want a hard processing timeout per message.

I originally looked into supporting timeouts per message batch in Arroyo itself, however this runs into an undesirable case where we don't know exactly which messages in a batch succeeded or failed and we might erroneously mark successful messages to the DLQ. This might be fine, but I'm not sure of the idempotency guarantees Arroyo makes. In our case this doesn't matter since we always set the batch size to 1, but since this is a general purpose library I could see this being confusing/unexpected behavior to others. The streaming team mentioned we could have per-message timeouts but it might be a larger undertaking/refactor to achieve that.

Instead, in the meantime we can implement timeouts in our subclass. This works by overriding the poll() method, recording the start times of batches, and then checking for batches exceeding our timeout threshold. Since we are the only team requesting this feature we can live with this for now until it's potentially supported upstream. I tried a few other methods such as running the processing logic in a thread with a join timeout, but those never trigger.

Testing this with the stuck build in question I now correctly see:

[17:22:13] ERROR    Batch exceeded timeout of 720.0s (elapsed=720.70s).
           DEBUG    Worker process exited normally (SIGCHLD 20)
           DEBUG    Worker process exited normally (SIGCHLD 20)
           INFO     Terminated worker and sent 1 messages to DLQ

processes = self._RunTaskWithMultiprocessing__processes
pool = self._RunTaskWithMultiprocessing__pool
invalid_messages = self._RunTaskWithMultiprocessing__invalid_messages
except AttributeError:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is brittle but we'll have alerting if something breaks around this. Timed out messages are pretty rare and should have plenty of time to react in this case.

message.value.offset,
TimeoutError(f"Batch processing exceeded {self._batch_timeout}s timeout"),
)
invalid_messages.append(invalid_msg)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part I wasn't entirely sure about, but I didn't think I could raise InvalidMessage from this part of the code.

@codecov
Copy link

codecov bot commented Nov 6, 2025

Codecov Report

❌ Patch coverage is 6.97674% with 40 lines in your changes missing coverage. Please review.
✅ Project coverage is 80.06%. Comparing base (ebde423) to head (6d43688).
⚠️ Report is 1 commits behind head on main.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
src/launchpad/kafka.py 6.97% 40 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #461      +/-   ##
==========================================
- Coverage   80.26%   80.06%   -0.21%     
==========================================
  Files         159      159              
  Lines       13482    13524      +42     
  Branches     1428     1434       +6     
==========================================
+ Hits        10822    10828       +6     
- Misses       2122     2158      +36     
  Partials      538      538              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Contributor

@chromy chromy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stamping in case we want to land need to land this in order to timeouts working, as discussed DMs maybe we're not using a lot of RunTaskWithMultiprocessing as this point (we're not using batching or pools and the timeout logic has to reach into the internals) - but then there is a some book keeping if we make our own Strategy.

@trevor-e trevor-e changed the title Add Arroyo batch timeout logic [DNM] Add Arroyo batch timeout logic Nov 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants