Support celerycam for reconnecting db connenction if db connection is lost accidentally#550
Support celerycam for reconnecting db connenction if db connection is lost accidentally#550jxltom wants to merge 4 commits intocelery:masterfrom
Conversation
There was a problem hiding this comment.
Pull Request Overview
Adds automatic reconnection logic for Celery snapshot state handling when the Django database connection is lost.
- Imports
connectionandInterfaceErrorfrom Django. - Wraps worker and task handling in a
try/exceptto catch a closed connection. - Closes the connection and logs a message so Django’s connection pooling can reconnect.
Comments suppressed due to low confidence (1)
djcelery/snapshot.py:141
- Ensure that a
loggerinstance is defined or imported in this module before usinglogger.info(), otherwise this will raise aNameErrorat runtime.
logger.info(
| except InterfaceError as e: | ||
| # When connection already closed exception is raised, | ||
| # force to close connection and Django will automatically reconnect | ||
| if str(e) == 'connection already closed': |
There was a problem hiding this comment.
Comparing exception message strings is brittle; consider catching a more specific exception subclass or inspecting error attributes instead of matching on str(e).
| if str(e) == 'connection already closed': | |
| if getattr(e, 'args', None) and 'connection already closed' in e.args[0]: |
| try: | ||
| for worker in state.workers.items(): | ||
| self.handle_worker(worker) | ||
| _handle_tasks() | ||
| except InterfaceError as e: | ||
| # When connection already closed exception is raised, | ||
| # force to close connection and Django will automatically reconnect | ||
| if str(e) == 'connection already closed': | ||
| connection.close() |
There was a problem hiding this comment.
The try/except block wraps both worker and task handling, which may swallow unrelated errors. Consider narrowing the scope to only the database operations that need reconnection.
| try: | |
| for worker in state.workers.items(): | |
| self.handle_worker(worker) | |
| _handle_tasks() | |
| except InterfaceError as e: | |
| # When connection already closed exception is raised, | |
| # force to close connection and Django will automatically reconnect | |
| if str(e) == 'connection already closed': | |
| connection.close() | |
| for worker in state.workers.items(): | |
| self.handle_worker(worker) | |
| _handle_tasks() | |
| try: | |
| # Check and handle database reconnection if needed | |
| connection.close() | |
| except InterfaceError as e: | |
| # When connection already closed exception is raised, | |
| # force to close connection and Django will automatically reconnect | |
| if str(e) == 'connection already closed': |
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull Request Overview
This PR adds automatic reconnection logic for celerycam when the Django database connection is lost.
- Imports
connectionandInterfaceErrorto detect closed connections - Wraps worker/task handling in a try-except to catch
InterfaceErrorand force-close the connection - Logs when the connection is closed so Django can reconnect on next use
Comments suppressed due to low confidence (1)
djcelery/snapshot.py:136
- Add a unit or integration test that simulates
InterfaceErrorduring worker/task handling to verify the reconnection logic runs correctly and doesn’t break the loop.
except InterfaceError as e:
| try: | ||
| for worker in state.workers.items(): | ||
| self.handle_worker(worker) | ||
| _handle_tasks() | ||
| except InterfaceError as e: | ||
| # When connection already closed exception is raised, | ||
| # force to close connection and Django will automatically reconnect | ||
| if getattr(e, 'args', None) and 'connection already closed' in e.args[0]: | ||
| connection.close() |
There was a problem hiding this comment.
[nitpick] The try block currently wraps both worker and task loops, which may swallow unrelated InterfaceErrors. Consider narrowing its scope to only the database-close operation or isolating the catch to the specific call that can raise due to a closed connection.
| try: | |
| for worker in state.workers.items(): | |
| self.handle_worker(worker) | |
| _handle_tasks() | |
| except InterfaceError as e: | |
| # When connection already closed exception is raised, | |
| # force to close connection and Django will automatically reconnect | |
| if getattr(e, 'args', None) and 'connection already closed' in e.args[0]: | |
| connection.close() | |
| for worker in state.workers.items(): | |
| self.handle_worker(worker) | |
| _handle_tasks() | |
| try: | |
| # Force to close connection if "connection already closed" exception is raised | |
| connection.close() | |
| except InterfaceError as e: | |
| if getattr(e, 'args', None) and 'connection already closed' in e.args[0]: |
| # When connection already closed exception is raised, | ||
| # force to close connection and Django will automatically reconnect | ||
| if getattr(e, 'args', None) and 'connection already closed' in e.args[0]: | ||
| connection.close() |
There was a problem hiding this comment.
Instead of calling connection.close() directly, use django.db.close_old_connections() to properly handle connection pooling and ensure thread safety according to Django's recommendations.
| connection.close() | |
| close_old_connections() |
Currently if the database is not stable and connection is lost, celerycam can not reconnect db connection automatically and it can not work properly anymore. The error
connection already closedwill always be raised.This PR supports to automatically reconnect db connection for celerycam if db connection is lost.