|
8 | 8 | import time |
9 | 9 | import re |
10 | 10 | import yaml |
| 11 | +import asyncio |
11 | 12 | from urllib.parse import urlparse |
12 | 13 | from collections import deque |
13 | 14 | from dataclasses import dataclass, field |
@@ -94,6 +95,9 @@ class ActiveJob: |
94 | 95 | # Active jobs (keyed by job_id for cancellation lookup) |
95 | 96 | _active_jobs: dict[str, ActiveJob] = {} |
96 | 97 |
|
| 98 | +# Initialization lock for run configs to prevent race conditions |
| 99 | +_run_config_lock = asyncio.Lock() |
| 100 | + |
97 | 101 | # Stale run cleanup threshold (24 hours) |
98 | 102 | RUN_STALE_THRESHOLD_SECONDS = 86400 |
99 | 103 |
|
@@ -556,19 +560,21 @@ async def github_webhook(request: Request): |
556 | 560 | _processed_jobs[str(job_id)] = current_time |
557 | 561 |
|
558 | 562 | # Get or create run config for this workflow run |
559 | | - if run_id not in _run_configs: |
560 | | - # Fetch max-parallel from workflow YAML |
561 | | - max_parallel = await fetch_workflow_max_parallel( |
562 | | - repo_url, workflow_name, os.environ["GITHUB_TOKEN"] |
563 | | - ) |
564 | | - _run_configs[run_id] = RunConfig( |
565 | | - max_parallel=max_parallel, |
566 | | - workflow_name=workflow_name, |
567 | | - ) |
568 | | - logger.info( |
569 | | - f"Created run config for {repo_full_name}/{workflow_name} " |
570 | | - f"(run_id={run_id}) with max_parallel={max_parallel}" |
571 | | - ) |
| 563 | + # Use lock to prevent race conditions when multiple webhooks arrive simultaneously |
| 564 | + async with _run_config_lock: |
| 565 | + if run_id not in _run_configs: |
| 566 | + # Fetch max-parallel from workflow YAML |
| 567 | + max_parallel = await fetch_workflow_max_parallel( |
| 568 | + repo_url, workflow_name, os.environ["GITHUB_TOKEN"] |
| 569 | + ) |
| 570 | + _run_configs[run_id] = RunConfig( |
| 571 | + max_parallel=max_parallel, |
| 572 | + workflow_name=workflow_name, |
| 573 | + ) |
| 574 | + logger.info( |
| 575 | + f"Created run config for {repo_full_name}/{workflow_name} " |
| 576 | + f"(run_id={run_id}) with max_parallel={max_parallel}" |
| 577 | + ) |
572 | 578 |
|
573 | 579 | run_config = _run_configs[run_id] |
574 | 580 | queue_position = len(run_config.queue) + run_config.active_count + 1 |
|
0 commit comments