Skip to content

Commit 47e9683

Browse files
committed
Add running of single handlers (compared to whole actions) to WM-ER protocol and implement it
1 parent b9fa629 commit 47e9683

11 files changed

Lines changed: 855 additions & 75 deletions

File tree

docs/wm-er-protocol.md

Lines changed: 144 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,17 @@ method names.
2525
3. WM sends `initialized`.
2626
4. WM sends `finecodeRunner/updateConfig` to bootstrap handlers and services.
2727
- ER processes it and returns `{}`.
28-
5. WM sends `finecodeRunner/resolveActionSources` to get canonical action source paths.
29-
- ER returns a sparse map of `configSource → canonicalSource` for actions whose
30-
declared config path differs from the fully qualified runtime path.
28+
5. WM sends `finecodeRunner/resolveActionMeta` to get action meta info.
29+
- ER returns a complete map of `configSource → { canonical_source, runs_concurrently }` for every action
30+
whose class can be imported in this env. Actions that fail to import are omitted.
3131
- WM stores these on its `Action` domain objects before the runner is considered ready.
3232
6. On shutdown: WM sends `shutdown` then `exit`.
3333

34+
> **Multi-env runs:** when an action's handlers span more than one env, the WM
35+
> orchestrates execution segment-by-segment using `actions/runHandlers` so that
36+
> the serialized run context (`previousResult`) crosses the wire only at actual
37+
> env boundaries. See [Multi-Env Action Orchestration](#multi-env-action-orchestration).
38+
3439
## Message Catalog
3540

3641
### WM -> ER
@@ -110,6 +115,58 @@ method names.
110115
- Note: `result_by_format` is a JSON-encoded string (not a nested object) —
111116
the WM decodes it with `json.loads` after receiving the response.
112117

118+
- `actions/runHandlers`
119+
- Runs a named subset of an action's handlers sequentially within this ER,
120+
seeding `context.current_result` from a prior segment's serialized result.
121+
Used by the WM to orchestrate multi-env action runs; not used for single-env
122+
actions (those still use `actions/run`).
123+
- Params:
124+
- `actionName` (string): action name as registered via `finecodeRunner/updateConfig`
125+
- `handlerNames` (list of string): ordered list of handler names to execute;
126+
all must belong to this ER's env
127+
- `previousResult` (object | null): serialized `RunActionResult`
128+
(`dataclasses.asdict`) from the last handler of the preceding segment, or
129+
`null` for the first segment. Reconstructed as `context.current_result`
130+
before the first handler in `handlerNames` is invoked.
131+
- `options` (object | null): same keys as `actions/run`. `resultFormats`
132+
should be omitted (or `[]`) for intermediate segments and non-empty only
133+
for the final segment of a run.
134+
- Result (success):
135+
```json
136+
{
137+
"status": "success",
138+
"result": {"<resultField>": "..."},
139+
"resultByFormat": {"json": {"...": "..."}, "string": "..."},
140+
"returnCode": 0
141+
}
142+
```
143+
- `result`: serialized `RunActionResult` after all specified handlers ran
144+
(`dataclasses.asdict`). Pass as `previousResult` to the next segment's
145+
`actions/runHandlers` call.
146+
- `resultByFormat`: formatted results in the requested formats; `{}` when
147+
`resultFormats` was empty in options.
148+
- Result (streamed): used when `partialResultToken` was provided and all
149+
results were delivered via `$/progress`. `result` is still populated for
150+
context chaining.
151+
```json
152+
{
153+
"status": "streamed",
154+
"result": {"<resultField>": "..."},
155+
"resultByFormat": {},
156+
"returnCode": 0
157+
}
158+
```
159+
- Result (stopped):
160+
```json
161+
{
162+
"status": "stopped",
163+
"result": {"<resultField>": "..."},
164+
"resultByFormat": {"json": {"...": "..."}},
165+
"returnCode": 1
166+
}
167+
```
168+
- Result (error): `{"error": "message"}`
169+
113170
- `actions/getPayloadSchemas`
114171
- Params: `{}`
115172
- Result: `{ action_name: JSON Schema fragment | null }`
@@ -120,33 +177,34 @@ method names.
120177

121178
- `actions/mergeResults`
122179
- Params: `{ "actionName": string, "results": list }`
123-
- Result: `{ "merged": ... }` or `{ "error": "..." }`
180+
- `results`: list of serialized `RunActionResult` objects (`dataclasses.asdict`),
181+
one per concurrent segment or handler. Used by the WM after a concurrent
182+
multi-env run to merge the per-env results into a single final result.
183+
- Result: `{ "merged": <serialized RunActionResult> }` or `{ "error": "..." }`
124184

125185
- `actions/reload`
126186
- Params: `{ "actionName": string }`
127187
- Result: `{}`
128188

129-
- `finecodeRunner/resolveActionSources`
189+
- `finecodeRunner/resolveActionMeta`
130190
- Params: `{}` (no params)
131-
- Result: sparse map of `{ "<configSource>": "<canonicalSource>", ... }` for actions
132-
whose declared config path differs from the fully qualified runtime path.
133-
Only entries where the two differ are included.
134-
Example: `{ "myext.LintAction": "myext.actions.lint.LintAction" }`
191+
- Result: complete map of `{ "<configSource>": { "canonical_source": string, "runs_concurrently": bool }, ... }` for every
192+
action whose class can be imported in this env. Actions that fail to import are
193+
omitted entirely.
194+
Example: `{ "myext.LintAction": { "canonical_source": "myext.actions.lint.LintAction", "runs_concurrently": true } }`
135195
- Called by the WM after `finecodeRunner/updateConfig` completes to store canonical
136-
sources on its `Action` domain objects before the runner is considered ready.
137-
The WM uses `canonical_source` as the primary identifier in all subsequent action
138-
lookups; `source` (from config) is the fallback for actions whose class could not
139-
be imported in this env.
140-
- Actions where `cls.__module__ + "." + cls.__qualname__ == config source` are
141-
omitted (no mapping needed — the config source is already canonical).
196+
sources and execution modes on its `Action` domain objects before the runner is
197+
considered ready. The WM uses `canonical_source` as the primary identifier in all
198+
subsequent action lookups. Actions absent from the response (import failure) keep
199+
`canonical_source = None` until another runner for the same project resolves them.
142200

143201
- `actions/resolveSource`
144202
- Params: `{ "source": string }` — an arbitrary import-path alias to resolve.
145203
- Result: `{ "canonicalSource": string }` — the fully qualified class path
146204
(`cls.__module__ + "." + cls.__qualname__`).
147205
- Raises a JSON-RPC error if the alias cannot be imported or resolved.
148206
- Used during action lookup when a caller provides an alias not already known
149-
from `finecodeRunner/resolveActionSources` (full ADR-0019 support).
207+
from `finecodeRunner/resolveActionMeta` (full ADR-0019 support).
150208

151209
- `packages/resolvePath`
152210
- Params: `{ "packageName": string }`
@@ -180,7 +238,7 @@ method names.
180238
`f"{cls.__module__}.{cls.__qualname__}"` (e.g. `"myext.actions.lint.LintAction"`).
181239
Must not be a re-exported alias such as `"myext.LintAction"`. The WM resolves
182240
the action name by matching against the canonical source reported by
183-
`finecodeRunner/resolveActionSources`; a re-exported path will not match and the
241+
`finecodeRunner/resolveActionMeta`; a re-exported path will not match and the
184242
request will fail.
185243
- `payload` (object): serialized action payload (`dataclasses.asdict`)
186244
- `meta` (object): `{ "trigger": string, "devEnv": string, "orchestrationDepth": int }`
@@ -202,11 +260,76 @@ method names.
202260

203261
- `$/progress`
204262
- Params: `{ "token": <token>, "value": "<stringified JSON partial result>" }`
205-
- The `token` must match `partial_result_token` from `actions/run`.
263+
- The `token` must match `partialResultToken` from `actions/run` or
264+
`actions/runHandlers`.
206265
- `value` is a JSON string produced by the ER from a partial run result.
207-
- When `$/progress` is used to deliver results, the final `actions/run` response
208-
must have `status: "streamed"` and empty `result_by_format`. See `actions/run`
209-
result (streamed) above.
266+
- When `$/progress` is used to deliver results, the final `actions/run` or
267+
`actions/runHandlers` response must have `status: "streamed"` and empty
268+
`result_by_format`. See result (streamed) entries above.
269+
270+
## Multi-Env Action Orchestration
271+
272+
When an action's handlers span more than one env, the WM cannot delegate the
273+
whole run to a single ER via `actions/run`. Instead the WM becomes the
274+
orchestrator and drives execution using `actions/runHandlers`.
275+
276+
### Sequential mode (default)
277+
278+
The WM groups the action's handlers into **consecutive same-env segments**:
279+
280+
```text
281+
handlers: [h1/env1, h2/env1, h3/env1, h4/env2]
282+
segments: [(env1, [h1, h2, h3]), (env2, [h4])]
283+
284+
handlers: [h1/env1, h2/env2, h3/env1]
285+
segments: [(env1, [h1]), (env2, [h2]), (env1, [h3])]
286+
```
287+
288+
Execution:
289+
290+
1. WM calls `actions/runHandlers` for segment 1 with `previousResult: null`.
291+
2. For each subsequent segment, WM calls `actions/runHandlers` on that segment's
292+
ER with `previousResult` set to the `result` returned by the previous call.
293+
The ER reconstructs this as `context.current_result` before the first handler
294+
in the segment runs.
295+
3. If any call returns `status: "stopped"`, WM stops the chain and returns that
296+
result to the caller.
297+
4. `resultFormats` is passed only in the final segment's options — earlier
298+
segments return `resultByFormat: {}` to avoid unnecessary serialization.
299+
5. WM assembles the final response from the last segment's `result` and
300+
`resultByFormat`.
301+
302+
### Concurrent mode
303+
304+
The WM groups handlers by env (order within an env does not matter for
305+
concurrent execution):
306+
307+
```text
308+
handlers: [h1/env1, h2/env2, h3/env1]
309+
groups: [(env1, [h1, h3]), (env2, [h2])]
310+
```
311+
312+
Execution:
313+
314+
1. WM dispatches `actions/runHandlers` to all env groups in parallel, all with
315+
`previousResult: null`.
316+
2. WM collects all `result` objects from the parallel calls.
317+
3. WM calls `actions/mergeResults` on any available ER for the action, passing
318+
the collected `result` objects.
319+
4. The merged result and its formatted representation form the final response.
320+
321+
### Single-env actions
322+
323+
When all handlers are in the same env, the WM uses `actions/run` — a single
324+
delegated call where the ER manages handler sequencing internally.
325+
`actions/runHandlers` is only used when handlers span multiple envs.
326+
327+
### `walRunId` continuity
328+
329+
The WM generates a single `walRunId` for the whole logical action run and
330+
passes it in every `actions/runHandlers` call's options. Each ER emits WAL
331+
events tagged with that ID for the handler(s) it executes, so traces can be
332+
correlated across envs for the same logical run.
210333

211334
## Error Handling and Cancellation
212335

finecode_extension_runner/src/finecode_extension_runner/_services/run_action.py

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ async def run_action(
162162
run_id: int | None = None,
163163
partial_result_queue: asyncio.Queue | None = None,
164164
caller_kwargs: code_action.CallerRunContextKwargs | None = None,
165+
initial_result: code_action.RunActionResult | None = None,
165166
) -> code_action.RunActionResult | None:
166167
# design decisions:
167168
# - keep payload unchanged between all subaction runs.
@@ -259,7 +260,9 @@ async def run_action(
259260
# TODO: check run_context below, whether AsyncPlaceholder can really be used
260261
run_context = AsyncPlaceholderContext()
261262

262-
action_result: code_action.RunActionResult | None = None
263+
action_result: code_action.RunActionResult | None = initial_result
264+
if initial_result is not None:
265+
run_context_info.update(initial_result)
263266
runner_context = global_state.runner_context
264267

265268
# to be able to catch source of exceptions in user-accessible code more precisely,
@@ -518,6 +521,15 @@ async def run_action(
518521
action_result.update(handler_result)
519522

520523
run_context_info.update(action_result)
524+
525+
# Surface results sent via run_context.partial_result_sender.send()
526+
# (accumulated in accumulating_sender) — these are not captured by
527+
# handler return values but must contribute to the final result.
528+
if accumulating_sender is not None and accumulating_sender.has_sent:
529+
if action_result is None:
530+
action_result = accumulating_sender.accumulated
531+
elif accumulating_sender.accumulated is not None:
532+
action_result.update(accumulating_sender.accumulated)
521533
finally:
522534
# exit run context
523535
try:
@@ -673,6 +685,111 @@ def action_result_to_run_action_response(
673685
)
674686

675687

688+
async def run_handlers_raw(
689+
request: schemas.RunHandlersRequest, options: schemas.RunActionOptions
690+
) -> schemas.RunHandlersResponse:
691+
"""Execute a named subset of an action's handlers for multi-env orchestration.
692+
693+
Seeds context.current_result from request.previous_result before the first
694+
handler runs, so sequential handlers across env boundaries see a continuous
695+
result chain. Returns both the raw serialized result (for chaining) and
696+
formatted output (populated only when options.result_formats is non-empty).
697+
"""
698+
global last_run_id
699+
run_id = last_run_id
700+
last_run_id += 1
701+
702+
if global_state.runner_context is None:
703+
raise ActionFailedException(
704+
"run_handlers called before extension runner is initialized"
705+
)
706+
707+
project_def = global_state.runner_context.project
708+
709+
try:
710+
action = project_def.actions[request.action_name]
711+
except KeyError as exc:
712+
raise ActionFailedException(
713+
f"R{run_id} | Action {request.action_name} not found"
714+
) from exc
715+
716+
try:
717+
action_cache = global_state.runner_context.action_cache_by_name[request.action_name]
718+
except KeyError:
719+
action_cache = domain.ActionCache()
720+
global_state.runner_context.action_cache_by_name[request.action_name] = action_cache
721+
722+
if action_cache.exec_info is None:
723+
action_cache.exec_info = create_action_exec_info(action)
724+
action_exec_info = action_cache.exec_info
725+
726+
# Build a filtered ActionDeclaration containing only the requested handlers,
727+
# in the order specified by handler_names (preserves WM segment ordering).
728+
handler_names_ordered = request.handler_names
729+
handlers_by_name = {h.name: h for h in action.handlers}
730+
filtered_handlers = [
731+
handlers_by_name[name]
732+
for name in handler_names_ordered
733+
if name in handlers_by_name
734+
]
735+
filtered_action = domain.ActionDeclaration(
736+
name=action.name,
737+
config=action.config,
738+
handlers=filtered_handlers,
739+
source=action.source,
740+
)
741+
742+
# Reconstruct the previous segment's result so handlers see a continuous
743+
# context.current_result across the env boundary.
744+
initial_result: code_action.RunActionResult | None = None
745+
if request.previous_result is not None and action_exec_info.result_type is not None:
746+
try:
747+
initial_result = action_exec_info.result_type(**request.previous_result)
748+
except Exception as exc:
749+
logger.warning(
750+
f"R{run_id} | Could not reconstruct previous_result for "
751+
f"{request.action_name}: {exc}. Handlers will see no prior context."
752+
)
753+
754+
# Build payload from params.
755+
payload: code_action.RunActionPayload | None = None
756+
if action_exec_info.payload_type is not None and request.params:
757+
payload_type_with_validation = pydantic_dataclass(action_exec_info.payload_type)
758+
payload = typing.cast(
759+
code_action.RunActionPayload,
760+
payload_type_with_validation(**request.params),
761+
)
762+
763+
wal_run_id = getattr(options, "wal_run_id", None)
764+
if not isinstance(wal_run_id, str) or wal_run_id.strip() == "":
765+
raise ActionFailedException("Missing required wal_run_id in run options")
766+
767+
meta = dataclasses.replace(options.meta, wal_run_id=wal_run_id)
768+
769+
action_result = await run_action(
770+
action_def=filtered_action,
771+
payload=payload,
772+
meta=meta,
773+
partial_result_token=options.partial_result_token,
774+
progress_token=options.progress_token,
775+
run_id=run_id,
776+
initial_result=initial_result,
777+
)
778+
779+
# Raw serialized result for chaining to the next segment.
780+
raw_result: dict = dataclasses.asdict(action_result) if action_result is not None else {}
781+
782+
# Formatted result — only populated when the caller requests formats.
783+
formatted = action_result_to_run_action_response(action_result, options.result_formats)
784+
result_by_format: dict = formatted.result_by_format or {}
785+
786+
return schemas.RunHandlersResponse(
787+
return_code=formatted.return_code,
788+
result=raw_result,
789+
result_by_format=result_by_format,
790+
)
791+
792+
676793
def create_action_exec_info(action: domain.ActionDeclaration) -> domain.ActionExecInfo:
677794
try:
678795
action_type_def = run_utils.import_module_member_by_source_str(action.source)

0 commit comments

Comments
 (0)