Skip to content

Fix asset event scheduling race condition and consistency#62501

Open
dingo4dev wants to merge 4 commits into
apache:mainfrom
dingo4dev:fix/asset-triggered-dag-run
Open

Fix asset event scheduling race condition and consistency#62501
dingo4dev wants to merge 4 commits into
apache:mainfrom
dingo4dev:fix/asset-triggered-dag-run

Conversation

@dingo4dev

@dingo4dev dingo4dev commented Feb 26, 2026

Copy link
Copy Markdown
Contributor

This PR addresses a race condition in the Asset-based scheduling logic where AssetEvent records created by concurrent tasks can be "missed" or "orphaned" by the Scheduler.

The issue stems from the Visibility Gap between a database flush() (which generates the created_at timestamp) and the final commit() (which makes the record visible to other sessions).

This change ensures that the synchronization between the AssetEvent and the AssetDagRunQueue is atomic and that the timestamps are aligned to prevent the drift caused by performance issue.

  • Manually assigns the same timestamp to both the AssetEvent and the ADRQ before the flush.
  • Commit and populate the asset event record in database immediately, so that the event can be accessible by job scheduler
  • Prevent creating dag by empty asset event as the asset event is trigger in prev_dag_run

related: #54659, #56750, #56749

---
config:
  theme: redux-dark-color
  look: neo
---
sequenceDiagram
    participant J1 as Task Instance 1
    participant J2 as Task Instance 2
    participant JS as Job Scheduler
    participant DB as Database

    Note over J1,DB: T0

    Note left of J1: T1
    J1-->>DB: Asset Event 1 (AE1) & commit
    activate J1
    activate DB
    Note left of J1: T2
    J2-->>DB: Asset Event 2 (AE2) & commit
    activate J2
    activate DB

    Note left of J1: T3
    J2-->>-DB: Asset Dag Run Queue (ADRQ)<br/>created_at = AE2.timestamp<br/>(flush & commit)
    deactivate DB
    Note left of J1: T4
    activate JS
    activate DB
    JS-->>DB: Fetch created_at from ADRQ
    JS-->>DB: Fetch events (AE1, AE2)
    JS->>JS: Create DAG with run_after = ADRQ created_at (AE2.timestamp)
    deactivate DB
    deactivate JS
    Note left of J1: T5
    J1-->>DB: Asset Dag Run Queue (ADRQ)<br/>created_at = AE1.timestamp<br/>(flush & commit)
    deactivate J1
    deactivate DB
    Note left of J1: T6
    activate JS
    activate DB
    JS-->>DB: Fetch created_at from ADRQ
    JS->>JS: No event found<br/>(AE1.timestamp < prev DAG run_after)
    deactivate DB
    deactivate JS
Loading

Durability tradeoff (new failure mode)

On Postgres/MySQL the AssetEvent is committed in a short-lived independent session so it is visible to the scheduler before the producing task's transaction commits the alias association, AssetDagRunQueue rows, and TI state change. This is intentional and required to fix the scheduling race. The tradeoff is that a failure after the event is committed but before the caller's transaction commits can leave:

  • an orphaned asset event — persisted with no AssetDagRunQueue row for the scheduler to consume, or
  • a duplicate asset event — if the producing task retries register_asset_change.

Operators should be aware of this new failure mode when diagnosing missing or duplicate asset-triggered Dag runs. This is also documented in the newsfragment for this change.

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)
    Copilot

Open with GitKraken

Important

🛠️ Maintainer triage note for @dingo4dev · by @potiuk · 2026-06-17 14:51 UTC

Helpful heads-up from the maintainers — please address before this PR can be reviewed:

  • Static / docs checks failing (CI image checks / Static checks). Run them locally with prek run --all-files (or pre-commit run --all-files) and push the fixes.
  • Failing test jobs: MySQL tests: core / DB-core:MySQL:8.0:3.10:Core...Serialization. Reproduce and fix locally, then push.
  • See the Pull Request quality criteria.

The ball is in your court — you've been assigned to this PR. Fix the above, then mark it Ready for review.

Automated triage — may be imperfect; a maintainer takes the next look.

Copilot AI review requested due to automatic review settings February 26, 2026 04:41
@boring-cyborg boring-cyborg Bot added the area:Scheduler including HA (high availability) scheduler label Feb 26, 2026
@dingo4dev dingo4dev force-pushed the fix/asset-triggered-dag-run branch from 11ed83d to 624e1ea Compare February 26, 2026 04:41
@dingo4dev dingo4dev changed the title [WIP] Fix asset event scheduling race condition Fix asset event scheduling race condition Feb 26, 2026

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR attempts to fix a race condition in asset-based scheduling where AssetEvent records created by concurrent tasks can be missed by the Scheduler due to a visibility gap between database flush() and commit() operations. The fix synchronizes timestamps between AssetEvent and AssetDagRunQueue records by manually assigning timestamps and committing the asset event immediately.

Changes:

  • Modified asset event registration to commit immediately after creating AssetEvent, ensuring early visibility to the scheduler
  • Updated AssetDagRunQueue creation to use the asset event's timestamp for synchronization across both PostgreSQL and slow-path implementations
  • Changed scheduler logic to conditionally create DAG runs only when asset events exist, and to delete ADRQ entries more selectively based on timestamps

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.

File Description
airflow-core/src/airflow/jobs/scheduler_job_runner.py Modified DAG run creation to be conditional on asset_events existence and updated ADRQ deletion to filter by timestamp
airflow-core/src/airflow/assets/manager.py Changed from session.flush() to session.commit() for early AssetEvent persistence and updated ADRQ timestamp synchronization logic

Comment thread airflow-core/src/airflow/assets/manager.py Outdated
Comment thread airflow-core/src/airflow/assets/manager.py
Comment thread airflow-core/src/airflow/assets/manager.py Outdated
Comment thread airflow-core/src/airflow/assets/manager.py Outdated
Comment thread airflow-core/src/airflow/jobs/scheduler_job_runner.py Outdated
Comment thread airflow-core/src/airflow/assets/manager.py Outdated
Comment thread airflow-core/src/airflow/assets/manager.py
@dingo4dev dingo4dev force-pushed the fix/asset-triggered-dag-run branch 4 times, most recently from 2308c0b to 0019359 Compare February 27, 2026 02:42
@dingo4dev dingo4dev changed the title Fix asset event scheduling race condition Fix asset event scheduling race condition and consistency Feb 27, 2026
@dingo4dev dingo4dev force-pushed the fix/asset-triggered-dag-run branch from 0019359 to e796ff5 Compare March 5, 2026 14:17
@dingo4dev dingo4dev requested review from Lee-W and uranusjr as code owners March 5, 2026 14:17
@dingo4dev dingo4dev force-pushed the fix/asset-triggered-dag-run branch from e796ff5 to bcdd0c5 Compare March 6, 2026 02:03
@dingo4dev

Copy link
Copy Markdown
Contributor Author

@Lee-W @uranusjr Please take a time to review it for me. Thanks a lot ;p

joseotaviorf added a commit to joseotaviorf/airflow that referenced this pull request Mar 12, 2026
…rt of apache#62501)

Backports three targeted fixes to 2.10.4 to address the race condition reported
in GH#56541 and GH#41101, where a dataset-dependent DAG fires with one or more
upstream dataset events missing from consumed_dataset_events.

Root cause: stale `created_at` timestamps in DatasetDagRunQueue caused the
scheduler's event-window query to exclude valid events, yet the presence-based
readiness check still considered the DAG ready to run.

Changes:
- manager.py: replace ON CONFLICT DO NOTHING with ON CONFLICT DO UPDATE
  (Postgres) and add a WHERE guard to prevent backwards timestamp drift;
  pass explicit created_at=utcnow() on the non-Postgres merge path so
  existing rows are always refreshed.
- scheduler_job_runner.py: wrap create_dagrun in `if dataset_events:` to
  prevent phantom runs when the event window is empty; scope the DDRQ
  DELETE to `created_at <= exec_date` instead of deleting all rows, so
  events that arrived during processing are preserved for the next cycle.
- tests/datasets/test_manager.py: add WHERE-guard regression test for the
  Postgres upsert path.
- dev/BUG_REPORT_DATASET_SCHEDULING.md: detailed bug report with timeline
  diagrams, related issues (apache#56541, apache#41101, apache#35870), root cause analysis,
  and explanation of each fix.
- reproduce_bug_56541.py: end-to-end reproduction and verification script.

Made-with: Cursor
@dingo4dev dingo4dev force-pushed the fix/asset-triggered-dag-run branch from bcdd0c5 to c7dae2d Compare March 17, 2026 06:36
@Lee-W

Lee-W commented Mar 17, 2026

Copy link
Copy Markdown
Member

will try to take a look this week

joseotaviorf added a commit to joseotaviorf/airflow that referenced this pull request Mar 19, 2026
…rt of apache#62501)

Backports three targeted fixes to 2.10.4 to address the race condition reported
in GH#56541 and GH#41101, where a dataset-dependent DAG fires with one or more
upstream dataset events missing from consumed_dataset_events.

Root cause: stale `created_at` timestamps in DatasetDagRunQueue caused the
scheduler's event-window query to exclude valid events, yet the presence-based
readiness check still considered the DAG ready to run.

Changes:
- manager.py: replace ON CONFLICT DO NOTHING with ON CONFLICT DO UPDATE
  (Postgres) and add a WHERE guard to prevent backwards timestamp drift;
  pass explicit created_at=utcnow() on the non-Postgres merge path so
  existing rows are always refreshed.
- scheduler_job_runner.py: wrap create_dagrun in `if dataset_events:` to
  prevent phantom runs when the event window is empty; scope the DDRQ
  DELETE to `created_at <= exec_date` instead of deleting all rows, so
  events that arrived during processing are preserved for the next cycle.
- tests/datasets/test_manager.py: add WHERE-guard regression test for the
  Postgres upsert path.
- dev/BUG_REPORT_DATASET_SCHEDULING.md: detailed bug report with timeline
  diagrams, related issues (apache#56541, apache#41101, apache#35870), root cause analysis,
  and explanation of each fix.
- reproduce_bug_56541.py: end-to-end reproduction and verification script.

@kaxil kaxil left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few comments

Comment thread airflow-core/src/airflow/assets/manager.py Outdated
Comment thread airflow-core/src/airflow/assets/manager.py
Comment thread airflow-core/src/airflow/jobs/scheduler_job_runner.py Outdated
Comment thread airflow-core/tests/unit/jobs/test_scheduler_job.py Outdated
Comment thread airflow-core/tests/unit/jobs/test_scheduler_job.py
@dingo4dev dingo4dev force-pushed the fix/asset-triggered-dag-run branch 2 times, most recently from e674f0f to 1a08e6f Compare March 22, 2026 19:58
@dingo4dev dingo4dev force-pushed the fix/asset-triggered-dag-run branch 2 times, most recently from 3a647bf to f0d5a65 Compare March 29, 2026 16:48
@kaxil kaxil requested a review from Copilot April 2, 2026 01:03
@dingo4dev dingo4dev force-pushed the fix/asset-triggered-dag-run branch 2 times, most recently from 3246f38 to 3190395 Compare June 2, 2026 09:49
@dingo4dev dingo4dev force-pushed the fix/asset-triggered-dag-run branch 4 times, most recently from d839e70 to 96725a7 Compare June 10, 2026 07:27
@dingo4dev dingo4dev marked this pull request as ready for review June 10, 2026 14:52
@dingo4dev dingo4dev force-pushed the fix/asset-triggered-dag-run branch 4 times, most recently from 3dff355 to 44cb946 Compare June 10, 2026 16:51
@potiuk potiuk marked this pull request as draft June 12, 2026 01:17
@Lee-W Lee-W removed this from the Airflow 3.3.0 milestone Jun 15, 2026
Comment thread airflow-core/newsfragments/62501.significant.rst
Comment thread airflow-core/src/airflow/assets/manager.py Outdated
Comment thread airflow-core/src/airflow/assets/manager.py Outdated
Comment thread airflow-core/src/airflow/assets/manager.py Outdated
Comment thread airflow-core/src/airflow/assets/manager.py
@molcay

molcay commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

Hi @dingo4dev, I wanted to ping you here if you missed my message on Slack.

I am repeating my message and offer for help in here as well:

Hi @Stanley Law, hope you're having a good week!

I have been following your PR, it seems great stuff and thanks for tackling this. I know bouncing between reviews and CI pipelines can take up a lot of time. If you're feeling swamped or just want someone to bounce ideas off of (since I am having trouble reproducing the issue), I'd be glad to step in and help in way that I can. If you've got it fully under control though, totally feel free to ignore this message 🙂
Just wanted to offer some support.

@dingo4dev

Copy link
Copy Markdown
Contributor Author

Hi @molcay, Thank you so much for checking in and offering help!
Some extra eyes would be amazing. Right now I am investing the failure ci test on Mysql, the concurrent tests sometimes failure and orphan few asset event, and I can't reproduce it in my local tho.
I had logged the asset event and dag run with id and timestamp.

_______________________________________________________________ 
TestSchedulerJob.test_create_dag_runs_when_concurrent_asset_events_created
 _______________________________________________________________
airflow-core/tests/unit/jobs/test_scheduler_job.py:5792: 
in test_create_dag_runs_when_concurrent_asset_events_created
    assert total_consumed_asset_events == ASSET_EVENT_COUNT
E   assert 29 == 30
-------------------------------------------- Captured stdout setup -----------------------------------------
    ] Filling up the DagBag from /dev/null [airflow.dag_processing.dagbag.DagBag]
-------------------------------------------- Captured stdout call ------------------------------------------
Missing AssetEvent Metadata: {(16, '2026-06-23T05:02:05.983645+00:00')}
{'event_': (8, '2026-06-23T05:02:05.967063+00:00'), 'dagrun': (434, '2026-06-23T05:02:05.989838+00:00')}
{'event_': (9, '2026-06-23T05:02:05.972418+00:00'), 'dagrun': (434, '2026-06-23T05:02:05.989838+00:00')}
{'event_': (13, '2026-06-23T05:02:05.982971+00:00'), 'dagrun': (436, '2026-06-23T05:02:05.982971+00:00')}
{'event_': (16, '2026-06-23T05:02:05.983645+00:00'), 'dagrun': ()}
{'event_': (10, '2026-06-23T05:02:05.989838+00:00'), 'dagrun': (434, '2026-06-23T05:02:05.989838+00:00')}
{'event_': (15, '2026-06-23T05:02:05.994289+00:00'), 'dagrun': (435, '2026-06-23T05:02:06.034978+00:00')}
{'event_': (14, '2026-06-23T05:02:05.995763+00:00'), 'dagrun': (435, '2026-06-23T05:02:06.034978+00:00')}
{'event_': (11, '2026-06-23T05:02:06.025005+00:00'), 'dagrun': (435, '2026-06-23T05:02:06.034978+00:00')}
{'event_': (12, '2026-06-23T05:02:06.034978+00:00'), 'dagrun': (435, '2026-06-23T05:02:06.034978+00:00')}
{'event_': (18, '2026-06-23T05:02:06.059185+00:00'), 'dagrun': (437, '2026-06-23T05:02:07.001801+00:00')}
{'event_': (17, '2026-06-23T05:02:06.060623+00:00'), 'dagrun': (437, '2026-06-23T05:02:07.001801+00:00')}
{'event_': (19, '2026-06-23T05:02:06.099119+00:00'), 'dagrun': (437, '2026-06-23T05:02:07.001801+00:00')}
{'event_': (20, '2026-06-23T05:02:07.001801+00:00'), 'dagrun': (437, '2026-06-23T05:02:07.001801+00:00')}
{'event_': (21, '2026-06-23T05:02:07.022033+00:00'), 'dagrun': (438, '2026-06-23T05:02:07.090158+00:00')}
{'event_': (22, '2026-06-23T05:02:07.083870+00:00'), 'dagrun': (438, '2026-06-23T05:02:07.090158+00:00')}
{'event_': (23, '2026-06-23T05:02:07.090158+00:00'), 'dagrun': (438, '2026-06-23T05:02:07.090158+00:00')}
{'event_': (24, '2026-06-23T05:02:07.097476+00:00'), 'dagrun': (439, '2026-06-23T05:02:08.046920+00:00')}
{'event_': (25, '2026-06-23T05:02:07.116000+00:00'), 'dagrun': (439, '2026-06-23T05:02:08.046920+00:00')}
{'event_': (26, '2026-06-23T05:02:08.046920+00:00'), 'dagrun': (439, '2026-06-23T05:02:08.046920+00:00')}
{'event_': (27, '2026-06-23T05:02:08.054838+00:00'), 'dagrun': (440, '2026-06-23T05:02:08.092453+00:00')}
{'event_': (28, '2026-06-23T05:02:08.067990+00:00'), 'dagrun': (440, '2026-06-23T05:02:08.092453+00:00')}
{'event_': (29, '2026-06-23T05:02:08.092453+00:00'), 'dagrun': (440, '2026-06-23T05:02:08.092453+00:00')}
{'event_': (30, '2026-06-23T05:02:08.094004+00:00'), 'dagrun': (441, '2026-06-23T05:02:08.125311+00:00')}
{'event_': (31, '2026-06-23T05:02:08.118823+00:00'), 'dagrun': (441, '2026-06-23T05:02:08.125311+00:00')}
{'event_': (32, '2026-06-23T05:02:08.125311+00:00'), 'dagrun': (441, '2026-06-23T05:02:08.125311+00:00')}
{'event_': (33, '2026-06-23T05:02:08.127450+00:00'), 'dagrun': (442, '2026-06-23T05:02:09.073822+00:00')}
{'event_': (34, '2026-06-23T05:02:08.154315+00:00'), 'dagrun': (442, '2026-06-23T05:02:09.073822+00:00')}
{'event_': (35, '2026-06-23T05:02:09.073822+00:00'), 'dagrun': (442, '2026-06-23T05:02:09.073822+00:00')}
{'event_': (36, '2026-06-23T05:02:09.091265+00:00'), 'dagrun': (443, '2026-06-23T05:02:09.091265+00:00')}
{'event_': (37, '2026-06-23T05:02:09.118804+00:00'), 'dagrun': (444, '2026-06-23T05:02:09.118804+00:00')}

Comment thread airflow-core/tests/unit/jobs/test_scheduler_job.py Outdated
Comment thread airflow-core/tests/unit/jobs/test_scheduler_job.py Outdated
@molcay

molcay commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Hi @molcay, Thank you so much for checking in and offering help! Some extra eyes would be amazing. Right now I am investing the failure ci test on Mysql, the concurrent tests sometimes failure and orphan few asset event, and I can't reproduce it in my local tho. I had logged the asset event and dag run with id and timestamp.

_______________________________________________________________ 
TestSchedulerJob.test_create_dag_runs_when_concurrent_asset_events_created
 _______________________________________________________________
airflow-core/tests/unit/jobs/test_scheduler_job.py:5792: 
in test_create_dag_runs_when_concurrent_asset_events_created
    assert total_consumed_asset_events == ASSET_EVENT_COUNT
E   assert 29 == 30

Hi @dingo4dev,
As I see that you already mark the PR as "ready for review" and you've already got an approval, I need to ask if you still need help on the matter?

@Lee-W Lee-W left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a few nits, but overall looks good

@@ -0,0 +1,24 @@
Fix a race condition that could cause asset-triggered Dags to be skipped

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not user-facing but more like internal improvement. I feel we don't need a new fragment for this. Even if we really want to, a bug newsfragment should be enough

Comment thread airflow-core/tests/unit/models/test_taskinstance.py Outdated
Comment thread airflow-core/tests/unit/models/test_taskinstance.py Outdated
Comment thread airflow-core/tests/unit/models/test_taskinstance.py Outdated
Comment thread airflow-core/tests/unit/jobs/test_scheduler_job.py Outdated
dags_to_queue: set[DagModel],
event: AssetEvent,
session: Session,
dialect_name: str | None,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will this be None?

@dingo4dev dingo4dev Jul 4, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this because the statics check (mypy-airflow-core) failed before.
This code dialect_name = get_dialect_name(session) may return None tho

dingo4dev added 4 commits July 5, 2026 04:34
Fixes an issue where the Scheduler misses AssetEvents due to a
visibility gap between flush() and commit().

When multiple tasks create events for the same asset, the AssetEvent
timestamp is generated during flush, but the record remains invisible
to the Scheduler until the final commit. If the AssetDagRunQueue is
processed in the interim, the 'late-committing' event is orphaned.
The job scheduler will read the AssetDagRunQueue and fetch the created_at column as the triggered_date which will use for fetching the asset event and assign dag `run_after`.

This change ensures timestamps are synchronized and the transaction
boundary is tightened to prevent data-aware scheduling misses.

Add check for asset events before creating asset-triggered DAG runs

This is to prevent double triggering when the late-commit asset_dag_run_queue is exist in entry but the asset event is already process in prev dag

Fix update statement condition in AssetDagRunQueue to use correct column reference

Add tests for concurrent asset events and no asset event scenarios in DAG run creation

linting tests

make mypy happy

Add check for existing asset DAG run queue entry before merge

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>

add log when no dag run without asset event

Support concurrent asset event queuing on MySQL

Implement a MySQL-specific optimization using `ON DUPLICATE KEY UPDATE` when queuing asset-triggered DAG runs. This prevents unique constraint violations during concurrent asset events, mirroring the existing PostgreSQL implementation. The concurrency tests are also refactored to verify correct behavior across both database dialects.

Refactor asset event handling and improve session management in AssetManager

make lint happy

Refactor asset event handling to streamline session management in AssetManager

Re-attach asset_event to caller's session to prevent DetachedInstanceError

fix: prevent race condition causing lost events in asset-triggered DAG run creation

- Remove CTE lower-bound filter from asset event query: the bound excluded
  late-committed events that arrived after a concurrent DagRun was created,
  causing those events to be permanently lost and no follow-up DagRun created.
- Add 'not consumed' filter using association_table to prevent duplicate
  consumption of asset events across concurrent DagRuns.
- Always delete ADRQ rows after processing, even when no new DagRun is
  created, to prevent stale entries accumulating and causing infinite
  scheduler loops.
- Fix concurrent test to use seen_dr_ids set instead of prev_dr reference,
  ensuring all DagRuns are counted regardless of creation order.

fix: Persist AssetEvent in independent session for immediate visibility

Persist AssetEvent using a truly independent session (`scoped=False`)
to ensure it's committed and visible to processes like the Scheduler
before other transaction operations proceed. This closes a critical
visibility gap for Asset-Triggered DagRuns (ADRQ).

The event is then explicitly reloaded into the caller's session
to prevent DetachedInstanceError and allow subsequent relationship
operations to function correctly.

make static check happy

refactor: Consolidate `AssetEvent` persistence logic

Extract the complex logic for persisting `AssetEvent` instances into a new
dedicated class method, `create_asset_event`.

This centralizes the handling of distinct session management strategies
(independent session for non-SQLite, direct flush for SQLite) and ensures
immediate visibility of asset events to the scheduler. It also re-attaches
the event to the caller's session to prevent `DetachedInstanceError`.

fix: Ensure DagRun updates and AssetEvents are visible across transaction boundaries

Add `session.flush()` after `DagRun` partition key updates to ensure these changes are persisted within the caller's transaction before an independent session for `AssetEvent` creation begins. This prevents stale `DagRun` data from being read.

Change `AssetEvent` creation to use `create_session(scoped=False)`, guaranteeing a truly independent session and immediate commit of the asset event. This ensures the event is visible to processes like the Scheduler for Asset-Triggered DagRuns (ADRQ) without transactional delays.

fix: Change log level from warning to info for DagRun creation status

apply copilot reviewer

test: Add backend markers for concurrent asset event tests

docs: Address asset event scheduling race condition by using independent session commits
Ensures the asset-triggered consumer DAG is registered in the session prior to creating the AssetEvent. This prevents potential test failures or incorrect behavior due to timing or visibility issues with DAG metadata, aligning the test setup with real-world scheduler requirements.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler ready for maintainer review Set after triaging when all criteria pass.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants