-
Notifications
You must be signed in to change notification settings - Fork 277
Add realtime_trace_jsonl recipe for structured real-time optimization progress streaming #1177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
08308d7
408b6da
868022f
9d2dfac
4ab23b2
1f1bead
2c9444c
211359a
2e8f3bd
afbe9e0
6bf7355
cc41cad
f646e51
b822dbc
7fd53cf
987251f
3478efb
fc869bc
c7d36b9
7e81977
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,139 @@ | ||
| import json | ||
|
|
||
| from pyscipopt import SCIP_EVENTTYPE, Eventhdlr | ||
|
|
||
|
|
||
| class _TraceRun: | ||
| """ | ||
| Record optimization progress in real time while the solver is running. | ||
|
|
||
| Args | ||
| ---- | ||
| model: pyscipopt.Model | ||
| path: str | None | ||
| - None: in-memory only | ||
| - str : also write JSONL (one JSON object per line) for streaming/real-time consumption | ||
|
|
||
| Returns | ||
| ------- | ||
| None | ||
| Updates `model.data["trace"]` as a side effect. | ||
|
|
||
| Usage | ||
| ----- | ||
| optimizeTrace(model) # real-time in-memory trace | ||
| optimizeTrace(model, path="trace.jsonl") # real-time JSONL stream + in-memory | ||
| optimizeNogilTrace(model, path="trace.jsonl") # nogil variant | ||
| """ | ||
|
|
||
| def __init__(self, model, path=None): | ||
| self.model = model | ||
| self.path = path | ||
| self._fh = None | ||
| self._handler = None | ||
| self._caught_events = set() | ||
| self._last_snapshot = {} | ||
|
|
||
| def __enter__(self): | ||
| if not hasattr(self.model, "data") or self.model.data is None: | ||
| self.model.data = {} | ||
| self.model.data["trace"] = [] | ||
|
|
||
| if self.path is not None: | ||
| self._fh = open(self.path, "w") | ||
|
|
||
| class _TraceEventhdlr(Eventhdlr): | ||
| def eventinit(hdlr): | ||
| for et in ( | ||
| SCIP_EVENTTYPE.BESTSOLFOUND, | ||
| SCIP_EVENTTYPE.DUALBOUNDIMPROVED, | ||
| ): | ||
| self.model.catchEvent(et, hdlr) | ||
| self._caught_events.add(et) | ||
|
|
||
| def eventexec(hdlr, event): | ||
| et = event.getType() | ||
| if et == SCIP_EVENTTYPE.BESTSOLFOUND: | ||
| snapshot = self._snapshot_now() | ||
| self._last_snapshot = snapshot | ||
| self._write_event("bestsol_found", fields=snapshot, flush=True) | ||
| elif et == SCIP_EVENTTYPE.DUALBOUNDIMPROVED: | ||
| snapshot = self._snapshot_now() | ||
| self._last_snapshot = snapshot | ||
| # Flush disabled: frequent event; OS buffering suffices | ||
| self._write_event( | ||
| "dualbound_improved", fields=snapshot, flush=False | ||
| ) | ||
|
|
||
| self._handler = _TraceEventhdlr() | ||
| self.model.includeEventhdlr( | ||
| self._handler, "realtime_trace_jsonl", "Realtime trace jsonl handler" | ||
| ) | ||
|
Comment on lines
+68
to
+71
|
||
|
|
||
| return self | ||
|
|
||
| def __exit__(self, exc_type, exc, tb): | ||
| fields = {} | ||
| if self._last_snapshot: | ||
| fields.update(self._last_snapshot) | ||
|
|
||
| if exc_type is not None: | ||
| fields.update( | ||
| { | ||
| "status": "exception", | ||
| "exception": exc_type.__name__, | ||
| "message": str(exc) if exc is not None else None, | ||
| } | ||
| ) | ||
|
|
||
| try: | ||
| self._write_event("run_end", fields=fields, flush=True) | ||
| finally: | ||
| if self._fh: | ||
| try: | ||
| self._fh.close() | ||
| finally: | ||
| self._fh = None | ||
|
|
||
| if self._handler is not None: | ||
| for et in self._caught_events: | ||
| try: | ||
| self.model.dropEvent(et, self._handler) | ||
| except Exception: | ||
MySweetEden marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # Best-effort cleanup; continue dropping remaining events | ||
| pass | ||
| self._caught_events.clear() | ||
| self._handler = None | ||
|
|
||
| return False | ||
|
|
||
| def _snapshot_now(self) -> dict: | ||
| return { | ||
| "time": self.model.getSolvingTime(), | ||
| "primalbound": self.model.getPrimalbound(), | ||
| "dualbound": self.model.getDualbound(), | ||
| "gap": self.model.getGap(), | ||
| "nodes": self.model.getNNodes(), | ||
| "nsol": self.model.getNSols(), | ||
| } | ||
|
|
||
| def _write_event(self, event_type, fields=None, flush=True): | ||
| event = {"type": event_type} | ||
| if fields: | ||
| event.update(fields) | ||
|
|
||
| self.model.data["trace"].append(event) | ||
| if self._fh is not None: | ||
| self._fh.write(json.dumps(event) + "\n") | ||
| if flush: | ||
| self._fh.flush() | ||
|
|
||
|
|
||
| def optimizeTrace(model, path=None): | ||
| with _TraceRun(model, path): | ||
| model.optimize() | ||
|
|
||
|
|
||
| def optimizeNogilTrace(model, path=None): | ||
| with _TraceRun(model, path): | ||
| model.optimizeNogil() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| import json | ||
| from random import randint | ||
|
|
||
| import pytest | ||
| from helpers.utils import bin_packing_model | ||
|
|
||
| from pyscipopt import SCIP_EVENTTYPE, Eventhdlr | ||
| from pyscipopt.recipes.realtime_trace_jsonl import optimizeNogilTrace, optimizeTrace | ||
|
|
||
|
|
||
| @pytest.fixture( | ||
| params=[optimizeTrace, optimizeNogilTrace], ids=["optimize", "optimize_nogil"] | ||
| ) | ||
| def optimize(request): | ||
| return request.param | ||
|
|
||
|
|
||
| def test_realtime_trace_in_memory(optimize): | ||
| model = bin_packing_model(sizes=[randint(1, 40) for _ in range(120)], capacity=50) | ||
| model.setParam("limits/time", 5) | ||
|
|
||
| model.data = {"test": True} | ||
|
|
||
| optimize(model, path=None) | ||
|
|
||
| assert "test" in model.data | ||
| assert "trace" in model.data | ||
|
|
||
| required_fields = {"time", "primalbound", "dualbound", "gap", "nodes", "nsol"} | ||
|
|
||
| types = [r["type"] for r in model.data["trace"]] | ||
| assert ("bestsol_found" in types) or ("dualbound_improved" in types) | ||
|
|
||
| for record in model.data["trace"]: | ||
| if record["type"] != "run_end": | ||
| assert required_fields <= set(record.keys()) | ||
|
|
||
| primalbounds = [r["primalbound"] for r in model.data["trace"] if "primalbound" in r] | ||
| for i in range(1, len(primalbounds)): | ||
| assert primalbounds[i] <= primalbounds[i - 1] | ||
|
|
||
| dualbounds = [r["dualbound"] for r in model.data["trace"] if "dualbound" in r] | ||
| for i in range(1, len(dualbounds)): | ||
| assert dualbounds[i] >= dualbounds[i - 1] | ||
|
|
||
| assert "run_end" in types | ||
|
|
||
|
|
||
| def test_realtime_trace_file_output(optimize, tmp_path): | ||
| model = bin_packing_model(sizes=[randint(1, 40) for _ in range(120)], capacity=50) | ||
| model.setParam("limits/time", 5) | ||
|
|
||
| path = tmp_path / "trace.jsonl" | ||
|
|
||
| optimize(model, path=str(path)) | ||
|
|
||
| assert path.exists() | ||
|
|
||
| records = [json.loads(line) for line in path.read_text().splitlines()] | ||
| assert len(records) > 0 | ||
|
|
||
| types = [r["type"] for r in records] | ||
| assert "run_end" in types | ||
|
|
||
|
|
||
| class _InterruptOnBest(Eventhdlr): | ||
| def eventinit(self): | ||
| self.model.catchEvent(SCIP_EVENTTYPE.BESTSOLFOUND, self) | ||
|
|
||
| def eventexec(self, event): | ||
| self.model.interruptSolve() | ||
|
|
||
|
|
||
| def test_optimize_with_trace_records_run_end_on_interrupt(optimize): | ||
| model = bin_packing_model( | ||
| sizes=[randint(1, 40) for _ in range(120)], | ||
| capacity=50, | ||
| ) | ||
| model.setParam("limits/time", 5) | ||
|
|
||
| model.includeEventhdlr(_InterruptOnBest(), "stopper", "Interrupt on bestsol") | ||
|
|
||
| optimize(model, path=None) | ||
|
|
||
| types = [r["type"] for r in model.data["trace"]] | ||
| assert "bestsol_found" in types | ||
| assert "run_end" in types |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a recipe marketed as “real-time JSONL streaming”, not flushing
dualbound_improvedevents can delay visibility for external consumers tailing the file. Consider flushing here as well (or making flushing policy configurable), especially sincedualbound_improvedis one of the primary progress signals you record.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not Addressed
2.
dualbound_improvedflush policydualbound_improvedevents are intentionally not flushed:dualbound_improvedfires hundreds to thousands of times during optimization, whilebestsol_foundfires only a few dozen times at most; flushing on every dual bound update would accumulate significant I/O overheadDiscussion: I'm open to reconsidering the flush policy if there are use cases where immediate flushing of
dualbound_improvedevents is valuable (e.g., sub-minute monitoring). Would making it configurable be useful, or is the current approach acceptable?