-
-
Notifications
You must be signed in to change notification settings - Fork 1
[DNM] Add Arroyo batch timeout logic #461
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| processes = self._RunTaskWithMultiprocessing__processes | ||
| pool = self._RunTaskWithMultiprocessing__pool | ||
| invalid_messages = self._RunTaskWithMultiprocessing__invalid_messages | ||
| except AttributeError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is brittle but we'll have alerting if something breaks around this. Timed out messages are pretty rare and should have plenty of time to react in this case.
| message.value.offset, | ||
| TimeoutError(f"Batch processing exceeded {self._batch_timeout}s timeout"), | ||
| ) | ||
| invalid_messages.append(invalid_msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part I wasn't entirely sure about, but I didn't think I could raise InvalidMessage from this part of the code.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #461 +/- ##
==========================================
- Coverage 80.26% 80.06% -0.21%
==========================================
Files 159 159
Lines 13482 13524 +42
Branches 1428 1434 +6
==========================================
+ Hits 10822 10828 +6
- Misses 2122 2158 +36
Partials 538 538 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
chromy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stamping in case we want to land need to land this in order to timeouts working, as discussed DMs maybe we're not using a lot of RunTaskWithMultiprocessing as this point (we're not using batching or pools and the timeout logic has to reach into the internals) - but then there is a some book keeping if we make our own Strategy.
Goal: we are processing third-party apps and cannot predict the input will always cleanly succeed, it's fine and expected that some processing jobs will fail. We do need some guarantees that jobs will only take a certain amount of time and not clog up the system for everyone else, though.
For context, we ran into an edge case where our
lieflibrary essentially deadlocked our topic. It wasn't technically deadlocked, but it would have taken several hours to finish processing the message. Fixing this was rather painful in prod once it happened since we had to skip over that message. Instead, we want a hard processing timeout per message.I originally looked into supporting timeouts per message batch in Arroyo itself, however this runs into an undesirable case where we don't know exactly which messages in a batch succeeded or failed and we might erroneously mark successful messages to the DLQ. This might be fine, but I'm not sure of the idempotency guarantees Arroyo makes. In our case this doesn't matter since we always set the batch size to 1, but since this is a general purpose library I could see this being confusing/unexpected behavior to others. The streaming team mentioned we could have per-message timeouts but it might be a larger undertaking/refactor to achieve that.
Instead, in the meantime we can implement timeouts in our subclass. This works by overriding the
poll()method, recording the start times of batches, and then checking for batches exceeding our timeout threshold. Since we are the only team requesting this feature we can live with this for now until it's potentially supported upstream. I tried a few other methods such as running the processing logic in a thread with a join timeout, but those never trigger.Testing this with the stuck build in question I now correctly see: