Skip to content

Commit 08cbf99

Browse files
committed
ER: move WAL writer from global state to runner context.
1 parent ac52fa4 commit 08cbf99

6 files changed

Lines changed: 36 additions & 36 deletions

File tree

finecode_extension_runner/src/finecode_extension_runner/_services/run_action.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
context,
1818
domain,
1919
er_wal,
20-
global_state,
2120
partial_result_sender as partial_result_sender_module,
2221
run_utils,
2322
schemas,
@@ -356,7 +355,7 @@ async def run_action(
356355
)
357356
)
358357
er_wal.emit_run_event(
359-
global_state.wal_writer,
358+
runner_context.wal_writer,
360359
event_type=er_wal.ErWalEventType.HANDLER_PARTS_STARTED,
361360
wal_run_id=wal_run_id or "",
362361
action_name=action_def.name,
@@ -422,7 +421,7 @@ async def run_action(
422421
" See ER logs for more details"
423422
) from eg
424423
er_wal.emit_run_event(
425-
global_state.wal_writer,
424+
runner_context.wal_writer,
426425
event_type=er_wal.ErWalEventType.HANDLER_PARTS_COMPLETED,
427426
wal_run_id=wal_run_id or "",
428427
action_name=action_def.name,
@@ -546,7 +545,7 @@ async def run_action(
546545

547546
if tracking_sender is not None and tracking_sender.has_sent:
548547
er_wal.emit_run_event(
549-
global_state.wal_writer,
548+
runner_context.wal_writer,
550549
event_type=er_wal.ErWalEventType.PARTIAL_RESULT_FINAL_SENT,
551550
wal_run_id=wal_run_id,
552551
action_name=action_def.name,
@@ -625,7 +624,7 @@ async def run_action_raw(
625624
meta = dataclasses.replace(options.meta, wal_run_id=wal_run_id)
626625

627626
er_wal.emit_run_event(
628-
global_state.wal_writer,
627+
runner_context.wal_writer,
629628
event_type=er_wal.ErWalEventType.RUN_DISPATCHED,
630629
wal_run_id=wal_run_id,
631630
action_name=request.action_name,
@@ -991,7 +990,7 @@ async def execute_action_handler(
991990
logger.trace(f"R{run_id} | Run {handler.name} on {str(payload)[:100]}...")
992991
if wal_run_id is not None:
993992
er_wal.emit_run_event(
994-
global_state.wal_writer,
993+
runner_context.wal_writer,
995994
event_type=er_wal.ErWalEventType.HANDLER_STARTED,
996995
wal_run_id=wal_run_id,
997996
action_name=action_name,
@@ -1068,7 +1067,7 @@ def get_run_context(param_type):
10681067
and not tracking_sender.has_sent
10691068
):
10701069
er_wal.emit_run_event(
1071-
global_state.wal_writer,
1070+
runner_context.wal_writer,
10721071
event_type=er_wal.ErWalEventType.PARTIAL_RESULT_FIRST_SENT,
10731072
wal_run_id=wal_run_id,
10741073
action_name=action_name,
@@ -1124,7 +1123,7 @@ def get_run_context(param_type):
11241123
logger.exception(exception)
11251124
if wal_run_id is not None:
11261125
er_wal.emit_run_event(
1127-
global_state.wal_writer,
1126+
runner_context.wal_writer,
11281127
event_type=er_wal.ErWalEventType.HANDLER_FAILED,
11291128
wal_run_id=wal_run_id,
11301129
action_name=action_name,
@@ -1145,7 +1144,7 @@ def get_run_context(param_type):
11451144
)
11461145
if wal_run_id is not None:
11471146
er_wal.emit_run_event(
1148-
global_state.wal_writer,
1147+
runner_context.wal_writer,
11491148
event_type=er_wal.ErWalEventType.HANDLER_COMPLETED,
11501149
wal_run_id=wal_run_id,
11511150
action_name=action_name,
@@ -1215,7 +1214,7 @@ async def run_subresult_coros_concurrently(
12151214
return None
12161215
if tracking_sender is not None and wal_run_id is not None and not tracking_sender.has_sent:
12171216
er_wal.emit_run_event(
1218-
global_state.wal_writer,
1217+
runner_context.wal_writer,
12191218
event_type=er_wal.ErWalEventType.PARTIAL_RESULT_FIRST_SENT,
12201219
wal_run_id=wal_run_id,
12211220
action_name=action_name,
@@ -1275,7 +1274,7 @@ async def run_subresult_coros_sequentially(
12751274
return None
12761275
if tracking_sender is not None and wal_run_id is not None and not tracking_sender.has_sent:
12771276
er_wal.emit_run_event(
1278-
global_state.wal_writer,
1277+
runner_context.wal_writer,
12791278
event_type=er_wal.ErWalEventType.PARTIAL_RESULT_FIRST_SENT,
12801279
wal_run_id=wal_run_id,
12811280
action_name=action_name,

finecode_extension_runner/src/finecode_extension_runner/cli.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,22 +57,21 @@ def start(
5757
global_state.log_level = "INFO" if trace is False else "TRACE"
5858
global_state.project_dir_path = project_path
5959
global_state.env_name = env_name
60-
if wal:
61-
global_state.wal_writer = er_wal.ErWalWriter()
60+
wal_writer = er_wal.ErWalWriter() if wal else None
6261

6362
log_file_path = (project_path
6463
/ ".venvs"
6564
/ env_name
6665
/ "logs"
6766
/ "runner"
6867
/ "runner.log")
69-
68+
7069
global_state.log_file_path = logs.setup_logging(log_level="INFO" if trace is False else "TRACE", log_file_path=log_file_path)
7170

7271
if debug is True:
7372
logger.info(f"Started debugger on 127.0.0.1:{debug_port}")
7473

75-
runner_start.start_runner_sync()
74+
runner_start.start_runner_sync(wal_writer=wal_writer)
7675

7776

7877
@main.command()

finecode_extension_runner/src/finecode_extension_runner/context.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

33
from dataclasses import dataclass, field
4-
from finecode_extension_runner import domain
4+
from finecode_extension_runner import domain, er_wal
55
from finecode_extension_runner.di.registry import Registry
66
from finecode_extension_api import service
77

@@ -15,3 +15,4 @@ class RunnerContext:
1515
running_services: dict[service.Service, domain.RunningServiceInfo] = field(
1616
default_factory=dict
1717
)
18+
wal_writer: er_wal.ErWalWriter | None = None

finecode_extension_runner/src/finecode_extension_runner/er_server.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ def __init__(self) -> None:
178178
self._stop_event = threading.Event()
179179
self._tcp_server: asyncio.Server | None = None
180180
self._runner_context: context.RunnerContext | None = None
181+
self._wal_writer: er_wal.ErWalWriter | None = None
181182

182183
# ------------------------------------------------------------------
183184
# Server → client helpers
@@ -346,8 +347,8 @@ async def _on_initialized(_server: ErServer, params: dict | None) -> None:
346347

347348
async def _on_shutdown(server: ErServer, _params: dict | None) -> None:
348349
logger.info("Shutdown extension runner")
349-
if global_state.wal_writer is not None:
350-
global_state.wal_writer.close()
350+
if server._wal_writer is not None:
351+
server._wal_writer.close()
351352
services.shutdown_all_action_handlers(server._runner_context)
352353

353354
logger.debug("Stop Finecode async tasks")
@@ -362,8 +363,8 @@ async def _on_shutdown(server: ErServer, _params: dict | None) -> None:
362363

363364
async def _on_exit(_server: ErServer, _params: dict | None) -> None:
364365
logger.info("Exit extension runner")
365-
if global_state.wal_writer is not None:
366-
global_state.wal_writer.close()
366+
if _server._wal_writer is not None:
367+
_server._wal_writer.close()
367368

368369

369370
async def _document_did_open(server: ErServer, params: dict | None) -> None:
@@ -479,6 +480,7 @@ async def _send_request_to_wm(method: str, req_params: dict):
479480
project_raw_config_getter=functools.partial(get_project_raw_config, server),
480481
send_request_to_wm=_send_request_to_wm,
481482
)
483+
runner_context.wal_writer = server._wal_writer
482484
server._runner_context = runner_context
483485

484486
file_editor = await runner_context.di_registry.get_instance(ifileeditor.IFileEditor)
@@ -541,7 +543,7 @@ async def run_action(server: ErServer, params: dict | None) -> dict:
541543
return {"error": "Extension runner not initialized"}
542544
project_path = server._runner_context.project.dir_path
543545
er_wal.emit_run_event(
544-
global_state.wal_writer,
546+
server._wal_writer,
545547
event_type=er_wal.ErWalEventType.RUN_ACCEPTED,
546548
wal_run_id=wal_run_id,
547549
action_name=action_name,
@@ -569,7 +571,7 @@ async def run_action(server: ErServer, params: dict | None) -> dict:
569571
status = "stopped"
570572
response = exception.response
571573
er_wal.emit_run_event(
572-
global_state.wal_writer,
574+
server._wal_writer,
573575
event_type=er_wal.ErWalEventType.RUN_FAILED,
574576
wal_run_id=wal_run_id,
575577
action_name=action_name,
@@ -587,7 +589,7 @@ async def run_action(server: ErServer, params: dict | None) -> dict:
587589
logger.exception(exception)
588590
error_msg = f"{type(exception)}: {str(exception)}"
589591
er_wal.emit_run_event(
590-
global_state.wal_writer,
592+
server._wal_writer,
591593
event_type=er_wal.ErWalEventType.RUN_FAILED,
592594
wal_run_id=wal_run_id,
593595
action_name=action_name,
@@ -606,7 +608,7 @@ async def run_action(server: ErServer, params: dict | None) -> dict:
606608
for fmt, result in result_by_format.items()
607609
}
608610
er_wal.emit_run_event(
609-
global_state.wal_writer,
611+
server._wal_writer,
610612
event_type=er_wal.ErWalEventType.RUN_COMPLETED,
611613
wal_run_id=wal_run_id,
612614
action_name=action_name,
@@ -647,7 +649,7 @@ async def run_handlers(server: ErServer, params: dict | None) -> dict:
647649
return {"error": "Extension runner not initialized"}
648650
project_path = server._runner_context.project.dir_path
649651
er_wal.emit_run_event(
650-
global_state.wal_writer,
652+
server._wal_writer,
651653
event_type=er_wal.ErWalEventType.RUN_ACCEPTED,
652654
wal_run_id=wal_run_id,
653655
action_name=action_name,
@@ -681,7 +683,7 @@ async def run_handlers(server: ErServer, params: dict | None) -> dict:
681683
logger.exception(exception)
682684
error_msg = f"{type(exception)}: {str(exception)}"
683685
er_wal.emit_run_event(
684-
global_state.wal_writer,
686+
server._wal_writer,
685687
event_type=er_wal.ErWalEventType.RUN_FAILED,
686688
wal_run_id=wal_run_id,
687689
action_name=action_name,
@@ -700,7 +702,7 @@ async def run_handlers(server: ErServer, params: dict | None) -> dict:
700702
for fmt, result in result_by_format.items()
701703
}
702704
er_wal.emit_run_event(
703-
global_state.wal_writer,
705+
server._wal_writer,
704706
event_type=er_wal.ErWalEventType.RUN_COMPLETED,
705707
wal_run_id=wal_run_id,
706708
action_name=action_name,
@@ -804,9 +806,10 @@ async def get_runner_info(_server: ErServer, _params: dict | None) -> dict:
804806
# ---------------------------------------------------------------------------
805807

806808

807-
def create_er_server() -> ErServer:
809+
def create_er_server(wal_writer: er_wal.ErWalWriter | None = None) -> ErServer:
808810
"""Create and wire the ER server with all handlers registered."""
809811
server = ErServer()
812+
server._wal_writer = wal_writer
810813
session = server._session
811814

812815
def _wrap(handler):
@@ -861,8 +864,8 @@ async def _on_progress_from_wm(params: dict | None) -> None:
861864

862865
def on_process_exit() -> None:
863866
logger.info("Exit extension runner (atexit)")
864-
if global_state.wal_writer is not None:
865-
global_state.wal_writer.close()
867+
if server._wal_writer is not None:
868+
server._wal_writer.close()
866869
services.shutdown_all_action_handlers(server._runner_context)
867870
services.exit_all_action_handlers(server._runner_context)
868871

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
from pathlib import Path
22
from typing import Literal
33

4-
from finecode_extension_runner import er_wal
5-
64
project_dir_path: Path | None = None
75
log_level: Literal["TRACE", "INFO"] = "INFO"
86
env_name: str = ""
97
log_file_path: Path | None = None
10-
wal_writer: er_wal.ErWalWriter | None = None

finecode_extension_runner/src/finecode_extension_runner/start.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,19 @@
44

55
from loguru import logger
66

7-
import finecode_extension_runner.global_state as global_state
87
import finecode_extension_runner.er_server as extension_runner_er
8+
import finecode_extension_runner.global_state as global_state
9+
from finecode_extension_runner import er_wal
910

1011

11-
def start_runner_sync() -> None:
12+
def start_runner_sync(wal_writer: er_wal.ErWalWriter | None = None) -> None:
1213
assert global_state.project_dir_path is not None
1314

1415
logger.info(f"Python executable: {sys.executable}")
1516
logger.info(f"Project path: {global_state.project_dir_path}")
1617
logger.info(f"Process id: {os.getpid()}")
1718

18-
server = extension_runner_er.create_er_server()
19+
server = extension_runner_er.create_er_server(wal_writer=wal_writer)
1920
# asyncio.run(server.start_io_async())
2021
port = _find_free_port()
2122
server.start_tcp(host="127.0.0.1", port=port)

0 commit comments

Comments
 (0)