Fix asset event scheduling race condition and consistency#62501
Fix asset event scheduling race condition and consistency#62501dingo4dev wants to merge 4 commits into
Conversation
11ed83d to
624e1ea
Compare
There was a problem hiding this comment.
Pull request overview
This PR attempts to fix a race condition in asset-based scheduling where AssetEvent records created by concurrent tasks can be missed by the Scheduler due to a visibility gap between database flush() and commit() operations. The fix synchronizes timestamps between AssetEvent and AssetDagRunQueue records by manually assigning timestamps and committing the asset event immediately.
Changes:
- Modified asset event registration to commit immediately after creating AssetEvent, ensuring early visibility to the scheduler
- Updated AssetDagRunQueue creation to use the asset event's timestamp for synchronization across both PostgreSQL and slow-path implementations
- Changed scheduler logic to conditionally create DAG runs only when asset events exist, and to delete ADRQ entries more selectively based on timestamps
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| airflow-core/src/airflow/jobs/scheduler_job_runner.py | Modified DAG run creation to be conditional on asset_events existence and updated ADRQ deletion to filter by timestamp |
| airflow-core/src/airflow/assets/manager.py | Changed from session.flush() to session.commit() for early AssetEvent persistence and updated ADRQ timestamp synchronization logic |
2308c0b to
0019359
Compare
0019359 to
e796ff5
Compare
e796ff5 to
bcdd0c5
Compare
…rt of apache#62501) Backports three targeted fixes to 2.10.4 to address the race condition reported in GH#56541 and GH#41101, where a dataset-dependent DAG fires with one or more upstream dataset events missing from consumed_dataset_events. Root cause: stale `created_at` timestamps in DatasetDagRunQueue caused the scheduler's event-window query to exclude valid events, yet the presence-based readiness check still considered the DAG ready to run. Changes: - manager.py: replace ON CONFLICT DO NOTHING with ON CONFLICT DO UPDATE (Postgres) and add a WHERE guard to prevent backwards timestamp drift; pass explicit created_at=utcnow() on the non-Postgres merge path so existing rows are always refreshed. - scheduler_job_runner.py: wrap create_dagrun in `if dataset_events:` to prevent phantom runs when the event window is empty; scope the DDRQ DELETE to `created_at <= exec_date` instead of deleting all rows, so events that arrived during processing are preserved for the next cycle. - tests/datasets/test_manager.py: add WHERE-guard regression test for the Postgres upsert path. - dev/BUG_REPORT_DATASET_SCHEDULING.md: detailed bug report with timeline diagrams, related issues (apache#56541, apache#41101, apache#35870), root cause analysis, and explanation of each fix. - reproduce_bug_56541.py: end-to-end reproduction and verification script. Made-with: Cursor
bcdd0c5 to
c7dae2d
Compare
|
will try to take a look this week |
…rt of apache#62501) Backports three targeted fixes to 2.10.4 to address the race condition reported in GH#56541 and GH#41101, where a dataset-dependent DAG fires with one or more upstream dataset events missing from consumed_dataset_events. Root cause: stale `created_at` timestamps in DatasetDagRunQueue caused the scheduler's event-window query to exclude valid events, yet the presence-based readiness check still considered the DAG ready to run. Changes: - manager.py: replace ON CONFLICT DO NOTHING with ON CONFLICT DO UPDATE (Postgres) and add a WHERE guard to prevent backwards timestamp drift; pass explicit created_at=utcnow() on the non-Postgres merge path so existing rows are always refreshed. - scheduler_job_runner.py: wrap create_dagrun in `if dataset_events:` to prevent phantom runs when the event window is empty; scope the DDRQ DELETE to `created_at <= exec_date` instead of deleting all rows, so events that arrived during processing are preserved for the next cycle. - tests/datasets/test_manager.py: add WHERE-guard regression test for the Postgres upsert path. - dev/BUG_REPORT_DATASET_SCHEDULING.md: detailed bug report with timeline diagrams, related issues (apache#56541, apache#41101, apache#35870), root cause analysis, and explanation of each fix. - reproduce_bug_56541.py: end-to-end reproduction and verification script.
e674f0f to
1a08e6f
Compare
3a647bf to
f0d5a65
Compare
3246f38 to
3190395
Compare
d839e70 to
96725a7
Compare
3dff355 to
44cb946
Compare
|
Hi @dingo4dev, I wanted to ping you here if you missed my message on Slack. I am repeating my message and offer for help in here as well:
|
|
Hi @molcay, Thank you so much for checking in and offering help! _______________________________________________________________
TestSchedulerJob.test_create_dag_runs_when_concurrent_asset_events_created
_______________________________________________________________
airflow-core/tests/unit/jobs/test_scheduler_job.py:5792:
in test_create_dag_runs_when_concurrent_asset_events_created
assert total_consumed_asset_events == ASSET_EVENT_COUNT
E assert 29 == 30
-------------------------------------------- Captured stdout setup -----------------------------------------
] Filling up the DagBag from /dev/null [airflow.dag_processing.dagbag.DagBag]
-------------------------------------------- Captured stdout call ------------------------------------------
Missing AssetEvent Metadata: {(16, '2026-06-23T05:02:05.983645+00:00')}
{'event_': (8, '2026-06-23T05:02:05.967063+00:00'), 'dagrun': (434, '2026-06-23T05:02:05.989838+00:00')}
{'event_': (9, '2026-06-23T05:02:05.972418+00:00'), 'dagrun': (434, '2026-06-23T05:02:05.989838+00:00')}
{'event_': (13, '2026-06-23T05:02:05.982971+00:00'), 'dagrun': (436, '2026-06-23T05:02:05.982971+00:00')}
{'event_': (16, '2026-06-23T05:02:05.983645+00:00'), 'dagrun': ()}
{'event_': (10, '2026-06-23T05:02:05.989838+00:00'), 'dagrun': (434, '2026-06-23T05:02:05.989838+00:00')}
{'event_': (15, '2026-06-23T05:02:05.994289+00:00'), 'dagrun': (435, '2026-06-23T05:02:06.034978+00:00')}
{'event_': (14, '2026-06-23T05:02:05.995763+00:00'), 'dagrun': (435, '2026-06-23T05:02:06.034978+00:00')}
{'event_': (11, '2026-06-23T05:02:06.025005+00:00'), 'dagrun': (435, '2026-06-23T05:02:06.034978+00:00')}
{'event_': (12, '2026-06-23T05:02:06.034978+00:00'), 'dagrun': (435, '2026-06-23T05:02:06.034978+00:00')}
{'event_': (18, '2026-06-23T05:02:06.059185+00:00'), 'dagrun': (437, '2026-06-23T05:02:07.001801+00:00')}
{'event_': (17, '2026-06-23T05:02:06.060623+00:00'), 'dagrun': (437, '2026-06-23T05:02:07.001801+00:00')}
{'event_': (19, '2026-06-23T05:02:06.099119+00:00'), 'dagrun': (437, '2026-06-23T05:02:07.001801+00:00')}
{'event_': (20, '2026-06-23T05:02:07.001801+00:00'), 'dagrun': (437, '2026-06-23T05:02:07.001801+00:00')}
{'event_': (21, '2026-06-23T05:02:07.022033+00:00'), 'dagrun': (438, '2026-06-23T05:02:07.090158+00:00')}
{'event_': (22, '2026-06-23T05:02:07.083870+00:00'), 'dagrun': (438, '2026-06-23T05:02:07.090158+00:00')}
{'event_': (23, '2026-06-23T05:02:07.090158+00:00'), 'dagrun': (438, '2026-06-23T05:02:07.090158+00:00')}
{'event_': (24, '2026-06-23T05:02:07.097476+00:00'), 'dagrun': (439, '2026-06-23T05:02:08.046920+00:00')}
{'event_': (25, '2026-06-23T05:02:07.116000+00:00'), 'dagrun': (439, '2026-06-23T05:02:08.046920+00:00')}
{'event_': (26, '2026-06-23T05:02:08.046920+00:00'), 'dagrun': (439, '2026-06-23T05:02:08.046920+00:00')}
{'event_': (27, '2026-06-23T05:02:08.054838+00:00'), 'dagrun': (440, '2026-06-23T05:02:08.092453+00:00')}
{'event_': (28, '2026-06-23T05:02:08.067990+00:00'), 'dagrun': (440, '2026-06-23T05:02:08.092453+00:00')}
{'event_': (29, '2026-06-23T05:02:08.092453+00:00'), 'dagrun': (440, '2026-06-23T05:02:08.092453+00:00')}
{'event_': (30, '2026-06-23T05:02:08.094004+00:00'), 'dagrun': (441, '2026-06-23T05:02:08.125311+00:00')}
{'event_': (31, '2026-06-23T05:02:08.118823+00:00'), 'dagrun': (441, '2026-06-23T05:02:08.125311+00:00')}
{'event_': (32, '2026-06-23T05:02:08.125311+00:00'), 'dagrun': (441, '2026-06-23T05:02:08.125311+00:00')}
{'event_': (33, '2026-06-23T05:02:08.127450+00:00'), 'dagrun': (442, '2026-06-23T05:02:09.073822+00:00')}
{'event_': (34, '2026-06-23T05:02:08.154315+00:00'), 'dagrun': (442, '2026-06-23T05:02:09.073822+00:00')}
{'event_': (35, '2026-06-23T05:02:09.073822+00:00'), 'dagrun': (442, '2026-06-23T05:02:09.073822+00:00')}
{'event_': (36, '2026-06-23T05:02:09.091265+00:00'), 'dagrun': (443, '2026-06-23T05:02:09.091265+00:00')}
{'event_': (37, '2026-06-23T05:02:09.118804+00:00'), 'dagrun': (444, '2026-06-23T05:02:09.118804+00:00')} |
Hi @dingo4dev, |
Lee-W
left a comment
There was a problem hiding this comment.
a few nits, but overall looks good
| @@ -0,0 +1,24 @@ | |||
| Fix a race condition that could cause asset-triggered Dags to be skipped | |||
There was a problem hiding this comment.
This is not user-facing but more like internal improvement. I feel we don't need a new fragment for this. Even if we really want to, a bug newsfragment should be enough
| dags_to_queue: set[DagModel], | ||
| event: AssetEvent, | ||
| session: Session, | ||
| dialect_name: str | None, |
There was a problem hiding this comment.
I added this because the statics check (mypy-airflow-core) failed before.
This code dialect_name = get_dialect_name(session) may return None tho
Fixes an issue where the Scheduler misses AssetEvents due to a visibility gap between flush() and commit(). When multiple tasks create events for the same asset, the AssetEvent timestamp is generated during flush, but the record remains invisible to the Scheduler until the final commit. If the AssetDagRunQueue is processed in the interim, the 'late-committing' event is orphaned. The job scheduler will read the AssetDagRunQueue and fetch the created_at column as the triggered_date which will use for fetching the asset event and assign dag `run_after`. This change ensures timestamps are synchronized and the transaction boundary is tightened to prevent data-aware scheduling misses. Add check for asset events before creating asset-triggered DAG runs This is to prevent double triggering when the late-commit asset_dag_run_queue is exist in entry but the asset event is already process in prev dag Fix update statement condition in AssetDagRunQueue to use correct column reference Add tests for concurrent asset events and no asset event scenarios in DAG run creation linting tests make mypy happy Add check for existing asset DAG run queue entry before merge Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com> add log when no dag run without asset event Support concurrent asset event queuing on MySQL Implement a MySQL-specific optimization using `ON DUPLICATE KEY UPDATE` when queuing asset-triggered DAG runs. This prevents unique constraint violations during concurrent asset events, mirroring the existing PostgreSQL implementation. The concurrency tests are also refactored to verify correct behavior across both database dialects. Refactor asset event handling and improve session management in AssetManager make lint happy Refactor asset event handling to streamline session management in AssetManager Re-attach asset_event to caller's session to prevent DetachedInstanceError fix: prevent race condition causing lost events in asset-triggered DAG run creation - Remove CTE lower-bound filter from asset event query: the bound excluded late-committed events that arrived after a concurrent DagRun was created, causing those events to be permanently lost and no follow-up DagRun created. - Add 'not consumed' filter using association_table to prevent duplicate consumption of asset events across concurrent DagRuns. - Always delete ADRQ rows after processing, even when no new DagRun is created, to prevent stale entries accumulating and causing infinite scheduler loops. - Fix concurrent test to use seen_dr_ids set instead of prev_dr reference, ensuring all DagRuns are counted regardless of creation order. fix: Persist AssetEvent in independent session for immediate visibility Persist AssetEvent using a truly independent session (`scoped=False`) to ensure it's committed and visible to processes like the Scheduler before other transaction operations proceed. This closes a critical visibility gap for Asset-Triggered DagRuns (ADRQ). The event is then explicitly reloaded into the caller's session to prevent DetachedInstanceError and allow subsequent relationship operations to function correctly. make static check happy refactor: Consolidate `AssetEvent` persistence logic Extract the complex logic for persisting `AssetEvent` instances into a new dedicated class method, `create_asset_event`. This centralizes the handling of distinct session management strategies (independent session for non-SQLite, direct flush for SQLite) and ensures immediate visibility of asset events to the scheduler. It also re-attaches the event to the caller's session to prevent `DetachedInstanceError`. fix: Ensure DagRun updates and AssetEvents are visible across transaction boundaries Add `session.flush()` after `DagRun` partition key updates to ensure these changes are persisted within the caller's transaction before an independent session for `AssetEvent` creation begins. This prevents stale `DagRun` data from being read. Change `AssetEvent` creation to use `create_session(scoped=False)`, guaranteeing a truly independent session and immediate commit of the asset event. This ensures the event is visible to processes like the Scheduler for Asset-Triggered DagRuns (ADRQ) without transactional delays. fix: Change log level from warning to info for DagRun creation status apply copilot reviewer test: Add backend markers for concurrent asset event tests docs: Address asset event scheduling race condition by using independent session commits
revert unrelated change
Ensures the asset-triggered consumer DAG is registered in the session prior to creating the AssetEvent. This prevents potential test failures or incorrect behavior due to timing or visibility issues with DAG metadata, aligning the test setup with real-world scheduler requirements.
This PR addresses a race condition in the Asset-based scheduling logic where AssetEvent records created by concurrent tasks can be "missed" or "orphaned" by the Scheduler.
The issue stems from the Visibility Gap between a database flush() (which generates the created_at timestamp) and the final commit() (which makes the record visible to other sessions).
This change ensures that the synchronization between the AssetEvent and the AssetDagRunQueue is atomic and that the timestamps are aligned to prevent the drift caused by performance issue.
related: #54659, #56750, #56749
--- config: theme: redux-dark-color look: neo --- sequenceDiagram participant J1 as Task Instance 1 participant J2 as Task Instance 2 participant JS as Job Scheduler participant DB as Database Note over J1,DB: T0 Note left of J1: T1 J1-->>DB: Asset Event 1 (AE1) & commit activate J1 activate DB Note left of J1: T2 J2-->>DB: Asset Event 2 (AE2) & commit activate J2 activate DB Note left of J1: T3 J2-->>-DB: Asset Dag Run Queue (ADRQ)<br/>created_at = AE2.timestamp<br/>(flush & commit) deactivate DB Note left of J1: T4 activate JS activate DB JS-->>DB: Fetch created_at from ADRQ JS-->>DB: Fetch events (AE1, AE2) JS->>JS: Create DAG with run_after = ADRQ created_at (AE2.timestamp) deactivate DB deactivate JS Note left of J1: T5 J1-->>DB: Asset Dag Run Queue (ADRQ)<br/>created_at = AE1.timestamp<br/>(flush & commit) deactivate J1 deactivate DB Note left of J1: T6 activate JS activate DB JS-->>DB: Fetch created_at from ADRQ JS->>JS: No event found<br/>(AE1.timestamp < prev DAG run_after) deactivate DB deactivate JSDurability tradeoff (new failure mode)
On Postgres/MySQL the
AssetEventis committed in a short-lived independent session so it is visible to the scheduler before the producing task's transaction commits the alias association,AssetDagRunQueuerows, and TI state change. This is intentional and required to fix the scheduling race. The tradeoff is that a failure after the event is committed but before the caller's transaction commits can leave:AssetDagRunQueuerow for the scheduler to consume, orregister_asset_change.Operators should be aware of this new failure mode when diagnosing missing or duplicate asset-triggered Dag runs. This is also documented in the newsfragment for this change.
Was generative AI tooling used to co-author this PR?
Copilot
Important
🛠️ Maintainer triage note for @dingo4dev · by
@potiuk· 2026-06-17 14:51 UTCHelpful heads-up from the maintainers — please address before this PR can be reviewed:
CI image checks / Static checks). Run them locally withprek run --all-files(orpre-commit run --all-files) and push the fixes.MySQL tests: core / DB-core:MySQL:8.0:3.10:Core...Serialization. Reproduce and fix locally, then push.The ball is in your court — you've been assigned to this PR. Fix the above, then mark it Ready for review.
Automated triage — may be imperfect; a maintainer takes the next look.