diff --git a/.github/workflows/export.yml b/.github/workflows/export.yml new file mode 100644 index 0000000..e0b3ed2 --- /dev/null +++ b/.github/workflows/export.yml @@ -0,0 +1,37 @@ +name: Mobile Export Parity + +# Heavy: installs tensorflow + coremltools. NOT in the default tests.yml +# selection because (a) coremltools is a big install and (b) the parity +# checks only matter when the export module or the model graph itself +# changes. Runs on: +# - workflow_dispatch (manual sanity check before merging an export change) +# - PRs that touch export_mobile.py or model.py (catches regressions in +# the conversion paths or the embedding/LSTM graph that breaks them) + +on: + workflow_dispatch: + pull_request: + paths: + - 'openFlowML/export_mobile.py' + - 'openFlowML/model.py' + - 'tests/test_export_parity.py' + +jobs: + parity: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r openFlowML/requirements.txt + pip install pytest + + - name: Run mobile export parity tests + run: python -m pytest -v tests/test_export_parity.py diff --git a/.github/workflows/ml_training.yml b/.github/workflows/ml_training.yml index 2675d15..32eb6b3 100644 --- a/.github/workflows/ml_training.yml +++ b/.github/workflows/ml_training.yml @@ -21,10 +21,18 @@ on: jobs: train: runs-on: ubuntu-latest + # contents:write is needed for softprops/action-gh-release to tag and + # publish the trained model artifacts as a GitHub Release. + permissions: + contents: write steps: - name: Checkout repository uses: actions/checkout@v4 + with: + # Fetch tags so the release-tag collision check ("does model-YYYY.MM.DD + # already exist?") can see existing release tags. + fetch-depth: 0 - name: Set up Python uses: actions/setup-python@v5 @@ -52,18 +60,68 @@ jobs: OPENFLOW_DISABLE_RESERVOIR: ${{ inputs.disable_reservoir && '1' || '' }} run: python openFlowML/train.py - # The model is not usable for inference without the scaler parameters - # (to invert the log1p + z-score flow transform), the station + basin - # index maps (for the embedding lookups), and the training config (which - # records the feature schema). All five travel together. + - name: Export mobile artifacts (CoreML + TFLite + manifest) + id: export + # Best-effort: a Phase 6 regression must NOT lose the trained .h5. The + # release step is gated on this step's success, so a failure here + # cleanly skips the release while still uploading the .h5 via the + # artifact step below. + continue-on-error: true + env: + OPENFLOW_MODEL_VERSION: model-${{ github.run_number }} + working-directory: openFlowML + run: python -m export_mobile --base-path "$GITHUB_WORKSPACE" + + - name: Compute release tag + id: tag + if: github.ref == 'refs/heads/main' && steps.export.outcome == 'success' + # Use the UTC date as the base tag and suffix -rN on collision so a + # same-day re-run still produces a fresh release. + run: | + BASE_TAG="model-$(date -u +%Y.%m.%d)" + TAG="$BASE_TAG" + N=2 + while git rev-parse "refs/tags/$TAG" >/dev/null 2>&1; do + TAG="${BASE_TAG}-r${N}" + N=$((N + 1)) + done + echo "tag=$TAG" >> "$GITHUB_OUTPUT" + + - name: Publish GitHub Release + if: github.ref == 'refs/heads/main' && steps.export.outcome == 'success' + uses: softprops/action-gh-release@v2 + with: + tag_name: ${{ steps.tag.outputs.tag }} + name: Model ${{ steps.tag.outputs.tag }} + generate_release_notes: true + # The model is not usable for inference without scalers.json, the + # station + basin index maps, and training_config.json. All artifacts + # travel together; manifest.json carries sha256s + schema so the + # mobile app can verify what it downloaded. See docs/INFERENCE.md. + files: | + lstm_model.h5 + lstm_model.mlpackage.zip + lstm_model.tflite + scalers.json + station_index.json + basin_index.json + training_config.json + manifest.json + + # Belt-and-braces: even on release-step failure, the artifact upload + # keeps the trained model retrievable for debugging the release path. - name: Upload model and inference artifacts + if: always() uses: actions/upload-artifact@v4 with: name: lstm_model_keras_release path: | ./lstm_model.h5 + ./lstm_model.mlpackage.zip + ./lstm_model.tflite ./scalers.json ./station_index.json ./basin_index.json ./training_config.json - if-no-files-found: error + ./manifest.json + if-no-files-found: warn diff --git a/README.md b/README.md index a22874d..557bd6d 100644 --- a/README.md +++ b/README.md @@ -7,3 +7,4 @@ New here? See our wiki - [x] Automated dataset creation based on a USGS and CODWR station - [x] GitHub actions ML training +- [x] Mobile artifacts published per training run (CoreML + TFLite + manifest) — see [docs/INFERENCE.md](docs/INFERENCE.md) diff --git a/docs/INFERENCE.md b/docs/INFERENCE.md new file mode 100644 index 0000000..7413bf0 --- /dev/null +++ b/docs/INFERENCE.md @@ -0,0 +1,94 @@ +# OpenFlow Inference Contract + +This document specifies what the [OpenFlowMobile](https://github.com/tmart234/OpenFlowMobile) app (or any other downstream consumer) needs to know in order to run the trained model end-to-end. If you find yourself reading `train.py` to figure out a tensor shape, this doc has failed — please open an issue. + +## Release artifacts + +Every successful weekly training run on `main` produces a GitHub Release tagged `model-YYYY.MM.DD` (with a `-r2`, `-r3`, ... suffix on same-day re-runs). The release attaches: + +| File | Purpose | +| --- | --- | +| `lstm_model.h5` | Canonical Keras model. The source of truth; everything else is derived. | +| `lstm_model.mlpackage.zip` | CoreML mlprogram for iOS (iOS 15+). Unzip and pass to `MLModel(contentsOf:)`. | +| `lstm_model.tflite` | TFLite for Android. May require the Flex delegate — see TFLite note below. | +| `scalers.json` | Per-column scaler params (mean, scale, transform). Inputs and outputs are in scaled space; you invert with this. | +| `station_index.json` | Map `site_id` → embedding index. Index `0` is reserved for "unseen". | +| `basin_index.json` | Map HUC8 → embedding index. Index `0` is reserved for "unseen". | +| `training_config.json` | Schema: encoder/decoder window sizes, feature names, vocab sizes. | +| `manifest.json` | sha256 + byte size per artifact, plus tool versions and the schema block. Verify on download. | + +### Verifying a downloaded release + +```python +import hashlib, json +with open("manifest.json") as f: + manifest = json.load(f) +for entry in manifest["files"]: + h = hashlib.sha256() + with open(entry["name"], "rb") as f: + for chunk in iter(lambda: f.read(1 << 20), b""): + h.update(chunk) + assert h.hexdigest() == entry["sha256"], entry["name"] +``` + +## Input tensors + +The model takes a dict of 5 inputs. Names are stable across exports: + +| Name | dtype | Shape | Source | +| --- | --- | --- | --- | +| `encoder_input` | float32 | `[1, 60, len(encoder_features)]` | Last 60 days of features, columns in `training_config.encoder_features` order, pre-scaled. | +| `decoder_input` | float32 | `[1, 14, len(decoder_features)]` | Next 14 days of forecast-time-available features, columns in `training_config.decoder_features` order, pre-scaled. | +| `persistence_input` | float32 | `[1, len(target_features)]` | Last encoder-day flow values (the `Min Flow` / `Max Flow` columns from the last encoder row), in scaled space. | +| `station_input` | int32 | `[1]` | `station_index.json[site_id]`, or `0` if the site isn't in the map. | +| `basin_input` | int32 | `[1]` | `basin_index.json[huc8]`, or `0` if the HUC isn't in the map. | + +`encoder_days = 60`, `decoder_days = 14`. Both are recorded in `training_config.json` so a future model with a different window size doesn't silently break clients. + +### Scaling inputs + +Every numeric column in `encoder_input` and `decoder_input` is z-scored; the flow columns (`Min Flow`, `Max Flow`) are additionally `log1p`-transformed before scaling. Apply scalers per-column: + +```python +# scalers.json shape: +# {"Min Flow": {"mean": ..., "scale": ..., "transform": "log1p"}, ...} +def scale(raw, params): + x = math.log1p(raw) if params["transform"] == "log1p" else raw + return (x - params["mean"]) / params["scale"] +``` + +Indicator columns (`sm_observed`, `reservoir_observed`) are 0/1 and are **not** scaled — they don't appear in `scalers.json`. + +## Output tensor + +One output, name `persistence_plus_delta`, shape `[1, 14, len(target_features)]` (= `[1, 14, 2]`), dtype `float32`. The two target columns are `Min Flow`, `Max Flow` in that order — same as `training_config.target_features`. + +The output is in **scaled** space. To get cfs, invert per column: + +```python +def invert(z, params): + x = z * params["scale"] + params["mean"] + return math.expm1(x) if params["transform"] == "log1p" else x +``` + +## Residual structure + +The model is a residual forecaster: `output = persistence_input + delta`, where `delta` is what the network actually learns. If the network outputs zero, predictions equal the persistence baseline — that's the worst case, not a failure mode. There's no special-casing for this in the consumer; just decode the output as above. + +## TFLite Flex delegate + +The graph contains two `Embedding(mask_zero=True)` layers and an LSTM with `initial_state`. The exporter tries three TFLite conversion paths and records which one was used in `manifest.json.tflite_mode`: + +- `builtins` — strict TFLITE_BUILTINS. No Flex needed on the consumer side. +- `select_tf_ops` — needed the Flex delegate. Android consumers must add `org.tensorflow:tensorflow-lite-select-tf-ops` to their `app/build.gradle`. iOS uses CoreML, not TFLite. +- `rebuilt_no_mask` — exporter rebuilt the model with `mask_zero=False` and copied weights. Functionally identical at inference (the mask only affected training-time padding behavior, which the model doesn't use). No Flex needed. + +Check `manifest.tflite_mode` on download and warn the user if the Flex delegate isn't bundled. + +## Unseen stations and basins + +Both embedding layers reserve index `0` for unknown. If the user picks a site not in `station_index.json`, pass `station_input = 0` — the model is trained with `StationDropout` to handle this via basin-only inference. Same for `basin_input` when the HUC8 isn't known. + +## Reference fixture + +The export parity tests in `tests/test_export_parity.py` generate synthetic inputs, run them through Keras / CoreML / TFLite, and assert they agree within `1e-3`. They're the executable spec of this document — when in doubt, read that file. diff --git a/openFlowML/data/cbrfc_lid_map.json b/openFlowML/data/cbrfc_lid_map.json new file mode 100644 index 0000000..e459cc2 --- /dev/null +++ b/openFlowML/data/cbrfc_lid_map.json @@ -0,0 +1,3 @@ +{ + "_comment": "Map of OpenFlow site_id (USGS:XXXX or DWR:XXXX) to NWS / AHPS 5-letter LID. Used by get_cbrfc.py for both live AHPS forecast retrieval and NWPS historical forecast archive lookup. Add a row when wiring CBRFC for a new gauge; a missing site_id silently skips the CBRFC baseline for that station (it remains a fully optional comparison baseline). Find the LID for a USGS site by searching https://water.weather.gov/ahps2/ for the gauge name and reading the LID from the page URL." +} diff --git a/openFlowML/data/get_cbrfc.py b/openFlowML/data/get_cbrfc.py index 4c7c80d..6d2ed93 100644 --- a/openFlowML/data/get_cbrfc.py +++ b/openFlowML/data/get_cbrfc.py @@ -16,27 +16,23 @@ None when CBRFC coverage is sparse enough that the comparison would be meaningless. -IMPLEMENTATION STATUS: - The CBRFC publishes operational deterministic forecasts via the - Advanced Hydrologic Prediction Service (AHPS) at water.weather.gov, and - Ensemble Streamflow Prediction (ESP) products through their own portal at - cbrfc.noaa.gov. Both have *current-day* access; the **historical archive** - needed for backtesting against our test-set anchor dates is the gap: - - - AHPS does not expose historical issuance via its REST API; the - archived forecasts live in tarballs at - https://water.weather.gov/ahps/download.php - - CBRFC's ESP archive is accessible per-basin via their THREDDS server - but requires a per-issuance lookup that is meaningfully more involved - than this stub. - - Filling either path in (the obvious follow-up commit) lets the baseline - actually evaluate against the test set. Until then, fetch() returns empty - for any anchor_date != today, and baseline_predictions() returns None so - train.py skips the comparison cleanly. +Data sources: + - Live (anchor_date >= today): AHPS public hydrograph XML at water.weather.gov. + The page only exposes the current issuance, so this is the "what does CBRFC + think tomorrow's flow is right now" lookup. + - Historical (anchor_date < today): NWS NWPS forecast archive at + api.water.noaa.gov, which accepts a `reference_time` query parameter and + returns the deterministic stage/flow forecast issued at that timestamp. + This is the path used for backtesting against test-set anchor dates. + +LID mapping (OpenFlow site_id -> NWS LID) is curated in cbrfc_lid_map.json +alongside this file. A missing mapping is a silent skip -- CBRFC is an +optional comparison baseline, not a training input. """ +import json import logging +import os from datetime import date, datetime, timedelta from typing import List, Optional @@ -51,9 +47,32 @@ # AHPS public site forecast page (one page per gauge by NWS LID). AHPS_FORECAST_URL = "https://water.weather.gov/ahps2/hydrograph_to_xml.php" +# NWPS forecast archive: accepts a historical `reference_time` and returns the +# deterministic stage/flow forecast issued at that timestamp as JSON. +NWPS_FORECAST_URL = "https://api.water.noaa.gov/nwps/v1/gauges/{lid}/stageflow/forecast" # Default 14-day horizon matches windowing.DECODER_DAYS. DEFAULT_HORIZON_DAYS = 14 +_LID_MAP_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), + 'cbrfc_lid_map.json') + + +def _load_lid_table() -> dict: + """Load site_id -> NWS LID from disk. Cached on the function.""" + cached = getattr(_load_lid_table, '_cache', None) + if cached is not None: + return cached + try: + with open(_LID_MAP_PATH) as f: + raw = json.load(f) + except (FileNotFoundError, json.JSONDecodeError) as e: + logger.warning("Could not read %s (%s); CBRFC baseline disabled", _LID_MAP_PATH, e) + raw = {} + # Drop comment / metadata keys (anything starting with underscore). + table = {k: v for k, v in raw.items() if not k.startswith('_')} + _load_lid_table._cache = table + return table + def _empty() -> pd.DataFrame: return pd.DataFrame(columns=['Date', 'cbrfc_flow']) @@ -68,20 +87,8 @@ def _to_date(d) -> date: def _ahps_lid_for_site(site_id: str) -> Optional[str]: - """ - Map a USGS / DWR site id to an NWS / AHPS LID (5-letter location id). - The mapping isn't algorithmic; it's a curated lookup. Returns None if no - LID is known for the site, in which case fetch() returns empty. - - Populate this when wiring CBRFC for a specific gauge: - {'USGS:09163500': 'CRSC2', ...} - """ - return _AHPS_LID_TABLE.get(site_id) - - -_AHPS_LID_TABLE: dict = { - # site_id -> NWS LID. Empty by default; fill in per gauge as needed. -} + """Map an OpenFlow site_id (USGS:XXXX or DWR:XXXX) to its NWS LID, or None.""" + return _load_lid_table().get(site_id) def fetch_current(site_id: str, horizon_days: int = DEFAULT_HORIZON_DAYS) -> pd.DataFrame: @@ -92,7 +99,7 @@ def fetch_current(site_id: str, horizon_days: int = DEFAULT_HORIZON_DAYS) -> pd. The AHPS public forecast page only exposes the current forecast issuance, so this is the "what does CBRFC think tomorrow's flow is right now" - helper. Historical issuances need the archive (see module docstring). + helper. Historical issuances use the NWPS archive (_fetch_nwps_historical). """ lid = _ahps_lid_for_site(site_id) if not lid: @@ -150,23 +157,75 @@ def _parse_ahps_forecast_xml(text: str, horizon_days: int) -> List[tuple]: return list(daily.itertuples(index=False, name=None)) +def _fetch_nwps_historical(lid: str, anchor: date, + horizon_days: int) -> List[tuple]: + """ + Pull the CBRFC forecast issued at `anchor` from the NWPS archive. + + Returns a list of (YYYY-MM-DD, cfs) tuples for the next `horizon_days` + days after the issuance, or an empty list on any error (404, parse + failure, no forecast for that anchor). Sub-daily values are collapsed to + daily mean to match the persistence / model output cadence. + """ + url = NWPS_FORECAST_URL.format(lid=lid) + # NWPS issues its deterministic forecast around 12Z; align the reference + # time there so any anchor lands on a real issuance. + reference_time = f"{anchor.strftime('%Y-%m-%d')}T12:00:00Z" + params = {'reference_time': reference_time} + response = data_utils.request_with_retry(url, params=params) + if response is None: + return [] + try: + payload = response.json() + except ValueError: + logger.debug("NWPS returned non-JSON for %s @ %s", lid, anchor) + return [] + # NWPS schema: {"data": [{"validTime": "...", "primary": "..."}, ...]}. + # The wrapper key drifts between schema versions; tolerate both shapes. + data = payload.get('data') or payload.get('forecast', {}).get('data') or [] + if not data: + return [] + rows = [] + for point in data: + valid = point.get('validTime') or point.get('valid') + primary = point.get('primary') + if not valid or primary is None: + continue + try: + d = datetime.fromisoformat(str(valid).replace('Z', '+00:00')).date() + v = float(primary) + except (ValueError, TypeError): + continue + rows.append((d.strftime('%Y-%m-%d'), v)) + if not rows: + return [] + df = pd.DataFrame(rows, columns=['Date', 'cbrfc_flow']) + daily = df.groupby('Date', as_index=False)['cbrfc_flow'].mean() + daily = daily.sort_values('Date').head(horizon_days) + return list(daily.itertuples(index=False, name=None)) + + def fetch(site_id: str, anchor_date, horizon_days: int = DEFAULT_HORIZON_DAYS) -> pd.DataFrame: """ CBRFC forecast for `site_id` issued on or before `anchor_date`, for the `horizon_days` days following the anchor. - For anchor_date == today, this is equivalent to fetch_current. For any - historical anchor_date, the AHPS API does not expose the issuance; this - returns empty until the historical archive integration lands (see module - docstring). + For anchor_date >= today, returns the current AHPS issuance (live path). + For any historical anchor_date, queries the NWPS forecast archive at the + matching reference_time and returns the issuance from that day. """ anchor = _to_date(anchor_date) if anchor >= date.today(): return fetch_current(site_id, horizon_days=horizon_days) - logger.debug("CBRFC historical forecast for %s @ %s requires archive integration", - site_id, anchor) - return _empty() + lid = _ahps_lid_for_site(site_id) + if not lid: + logger.info("No AHPS LID mapped for %s; CBRFC historical skipped", site_id) + return _empty() + rows = _fetch_nwps_historical(lid, anchor, horizon_days) + if not rows: + return _empty() + return pd.DataFrame(rows, columns=['Date', 'cbrfc_flow']) def baseline_predictions(test_samples) -> Optional[np.ndarray]: @@ -182,8 +241,6 @@ def baseline_predictions(test_samples) -> Optional[np.ndarray]: """ if not test_samples: return None - # Until the historical CBRFC archive is wired in there's nothing to - # backtest against; signal "no comparison" cleanly to the caller. found = 0 horizon = test_samples[0].target_Y.shape[0] rows = [] diff --git a/openFlowML/export_mobile.py b/openFlowML/export_mobile.py new file mode 100644 index 0000000..44a4e96 --- /dev/null +++ b/openFlowML/export_mobile.py @@ -0,0 +1,294 @@ +""" +Phase 6: convert the trained Keras model into mobile-friendly formats. + +Produces, alongside the canonical lstm_model.h5: + - lstm_model.mlpackage(.zip) -- CoreML mlprogram for iOS (iOS15+) + - lstm_model.tflite -- TFLite for Android + - manifest.json -- sha256s + schema + tool versions so the + mobile app can verify what it downloaded + +Can be invoked standalone after training: + + python -m export_mobile --base-path . + +train.py calls this as a best-effort post-save step; a failed export must NOT +lose the trained .h5, so the workflow runs this with continue-on-error and +the release step is gated on its success. + +The single highest-risk piece is TFLite conversion of the dual +Embedding(mask_zero=True) + LSTM-with-initial_state graph. We try three paths +in order and log which one wins: + 1. Strict TFLITE_BUILTINS. + 2. TFLITE_BUILTINS + SELECT_TF_OPS (Flex delegate; needs select-tf-ops AAR). + 3. Rebuild the model with mask_zero=False and copy weights; mask is unused + at inference because unseen-station fallback maps to index 0 explicitly. +""" + +import argparse +import hashlib +import json +import logging +import os +import shutil +import sys +from datetime import datetime, timezone + +logger = logging.getLogger(__name__) +if not logging.getLogger().hasHandlers(): + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +H5_NAME = 'lstm_model.h5' +MLPACKAGE_NAME = 'lstm_model.mlpackage' +MLPACKAGE_ZIP_NAME = 'lstm_model.mlpackage.zip' +TFLITE_NAME = 'lstm_model.tflite' +MANIFEST_NAME = 'manifest.json' +TRAINING_CONFIG_NAME = 'training_config.json' + + +def _load_keras_model(h5_path): + import tensorflow as tf + import model as model_mod + return tf.keras.models.load_model( + h5_path, custom_objects={'StationDropout': model_mod.StationDropout}) + + +def _input_specs_from_config(config): + """TensorSpecs for the five model inputs, in the order build_encoder_decoder declares them.""" + import tensorflow as tf + return [ + tf.TensorSpec(shape=(1, config['encoder_days'], len(config['encoder_features'])), + dtype=tf.float32, name='encoder_input'), + tf.TensorSpec(shape=(1, config['decoder_days'], len(config['decoder_features'])), + dtype=tf.float32, name='decoder_input'), + tf.TensorSpec(shape=(1, len(config['target_features'])), + dtype=tf.float32, name='persistence_input'), + tf.TensorSpec(shape=(1,), dtype=tf.int32, name='station_input'), + tf.TensorSpec(shape=(1,), dtype=tf.int32, name='basin_input'), + ] + + +def _make_serving_fn(net): + """Wrap the dict-input Keras model in a positional tf.function for converter consumption.""" + import tensorflow as tf + + @tf.function + def serving_fn(encoder_input, decoder_input, persistence_input, station_input, basin_input): + return net({ + 'encoder_input': encoder_input, + 'decoder_input': decoder_input, + 'persistence_input': persistence_input, + 'station_input': station_input, + 'basin_input': basin_input, + }, training=False) + + return serving_fn + + +def export_coreml(h5_path, out_dir): + """ + Convert Keras .h5 -> CoreML .mlpackage (mlprogram), then zip the directory + for release upload. Returns the path to the .zip on success, None on + failure (so callers can degrade gracefully). + """ + try: + import coremltools as ct + except ImportError: + logger.error("coremltools not installed; skipping CoreML export") + return None + + net = _load_keras_model(h5_path) + try: + ml_model = ct.convert( + net, + source='tensorflow', + convert_to='mlprogram', + minimum_deployment_target=ct.target.iOS15, + ) + except Exception as e: + logger.error("CoreML conversion failed: %s", e) + return None + + mlpackage_path = os.path.join(out_dir, MLPACKAGE_NAME) + if os.path.exists(mlpackage_path): + shutil.rmtree(mlpackage_path, ignore_errors=True) + ml_model.save(mlpackage_path) + + # .mlpackage is a directory; release uploads need a single file. + zip_base = os.path.join(out_dir, MLPACKAGE_NAME) + zip_path = shutil.make_archive(zip_base, 'zip', root_dir=out_dir, base_dir=MLPACKAGE_NAME) + logger.info("Wrote CoreML mlpackage: %s (zipped: %s)", mlpackage_path, zip_path) + return zip_path + + +def _rebuild_without_mask_zero(net, config): + """Build a fresh model with mask_zero=False and copy weights from `net`.""" + import model as model_mod + rebuilt = model_mod.build_encoder_decoder( + num_stations=config['num_stations'], + num_basins=config['num_basins'], + encoder_features=len(config['encoder_features']), + decoder_features=len(config['decoder_features']), + target_features=len(config['target_features']), + encoder_days=config['encoder_days'], + decoder_days=config['decoder_days'], + ) + # Swap the two Embedding layers for mask_zero=False replicas, then copy + # all weights layer-by-layer. The Keras builder is deterministic in layer + # naming, so set_weights by name aligns the two graphs cleanly. + import tensorflow as tf + for layer in rebuilt.layers: + if isinstance(layer, tf.keras.layers.Embedding) and layer.mask_zero: + layer.mask_zero = False + rebuilt.set_weights(net.get_weights()) + return rebuilt + + +def export_tflite(h5_path, out_path, config): + """ + Convert Keras .h5 -> TFLite. Returns (out_path, mode) on success or + (None, None) on failure. `mode` is one of 'builtins', 'select_tf_ops', + 'rebuilt_no_mask' so we can record which path won in the manifest. + """ + import tensorflow as tf + + net = _load_keras_model(h5_path) + input_specs = _input_specs_from_config(config) + + def _try_convert(target_net, supported_ops, label): + serving_fn = _make_serving_fn(target_net) + concrete = serving_fn.get_concrete_function(*input_specs) + converter = tf.lite.TFLiteConverter.from_concrete_functions([concrete], target_net) + converter.target_spec.supported_ops = supported_ops + try: + tflite_bytes = converter.convert() + except Exception as e: + logger.warning("TFLite conversion (%s) failed: %s", label, e) + return None + with open(out_path, 'wb') as f: + f.write(tflite_bytes) + logger.info("Wrote TFLite (%s): %s (%d bytes)", label, out_path, len(tflite_bytes)) + return out_path + + # Tier 1: strict builtins. + result = _try_convert(net, [tf.lite.OpsSet.TFLITE_BUILTINS], 'builtins') + if result: + return result, 'builtins' + # Tier 2: builtins + Flex delegate. + result = _try_convert(net, + [tf.lite.OpsSet.TFLITE_BUILTINS, tf.lite.OpsSet.SELECT_TF_OPS], + 'select_tf_ops') + if result: + return result, 'select_tf_ops' + # Tier 3: rebuild without mask_zero, retry with strict builtins. + rebuilt = _rebuild_without_mask_zero(net, config) + result = _try_convert(rebuilt, [tf.lite.OpsSet.TFLITE_BUILTINS], 'rebuilt_no_mask') + if result: + return result, 'rebuilt_no_mask' + return None, None + + +def _sha256(path): + h = hashlib.sha256() + with open(path, 'rb') as f: + for chunk in iter(lambda: f.read(1 << 20), b''): + h.update(chunk) + return h.hexdigest() + + +def write_manifest(base_path, config, artifact_files, tflite_mode): + """ + Emit manifest.json next to the artifacts. The mobile app reads this to + verify integrity, learn the input/output schema, and pick which artifact + to use without parsing training_config.json. + """ + import tensorflow as tf + try: + import coremltools as ct + coremltools_version = ct.__version__ + except ImportError: + coremltools_version = None + + files = [] + for name in artifact_files: + path = os.path.join(base_path, name) + if not os.path.exists(path): + continue + files.append({ + 'name': name, + 'sha256': _sha256(path), + 'bytes': os.path.getsize(path), + }) + + manifest = { + 'model_version': os.environ.get('OPENFLOW_MODEL_VERSION', + datetime.now(timezone.utc).strftime('model-%Y.%m.%d')), + 'created_utc': datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'), + 'tf_version': tf.__version__, + 'coremltools_version': coremltools_version, + 'tflite_mode': tflite_mode, + 'schema': { + 'encoder_days': config['encoder_days'], + 'decoder_days': config['decoder_days'], + 'encoder_features': config['encoder_features'], + 'decoder_features': config['decoder_features'], + 'target_features': config['target_features'], + 'num_stations': config['num_stations'], + 'num_basins': config['num_basins'], + }, + 'files': files, + } + out_path = os.path.join(base_path, MANIFEST_NAME) + with open(out_path, 'w') as f: + json.dump(manifest, f, indent=2) + logger.info("Wrote manifest: %s (%d files)", out_path, len(files)) + return out_path + + +def export_all(base_path): + """End-to-end: read training_config + .h5, emit mlpackage.zip + tflite + manifest.""" + h5_path = os.path.join(base_path, H5_NAME) + config_path = os.path.join(base_path, TRAINING_CONFIG_NAME) + if not os.path.exists(h5_path): + raise FileNotFoundError(f"Missing {h5_path} -- run train.py first") + if not os.path.exists(config_path): + raise FileNotFoundError(f"Missing {config_path} -- run train.py first") + with open(config_path) as f: + config = json.load(f) + + mlpackage_zip = export_coreml(h5_path, base_path) + tflite_path = os.path.join(base_path, TFLITE_NAME) + tflite_result, tflite_mode = export_tflite(h5_path, tflite_path, config) + + artifact_files = [H5_NAME, 'scalers.json', 'station_index.json', + 'basin_index.json', TRAINING_CONFIG_NAME] + if mlpackage_zip: + artifact_files.append(MLPACKAGE_ZIP_NAME) + if tflite_result: + artifact_files.append(TFLITE_NAME) + write_manifest(base_path, config, artifact_files, tflite_mode) + + if not mlpackage_zip and not tflite_result: + raise RuntimeError("Both CoreML and TFLite exports failed") + return { + 'mlpackage_zip': mlpackage_zip, + 'tflite': tflite_result, + 'tflite_mode': tflite_mode, + } + + +def main(): + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument('--base-path', default=None, + help='Directory containing lstm_model.h5 and training_config.json. ' + 'Defaults to the repo root (combine_data.get_base_path()).') + args = parser.parse_args() + if args.base_path is None: + import combine_data + args.base_path = combine_data.get_base_path() + result = export_all(args.base_path) + logger.info("Export complete: %s", result) + return 0 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/openFlowML/requirements.txt b/openFlowML/requirements.txt index 85fbbfa..6dd442e 100644 --- a/openFlowML/requirements.txt +++ b/openFlowML/requirements.txt @@ -11,3 +11,7 @@ scikit-learn>=1.3.2 numpy>=1.23,<1.25 scipy>=1.5.0 tensorflow==2.13 +# Phase 6: Keras -> CoreML conversion for the iOS mobile app. 7.x is the +# known-good range against tensorflow 2.13 + python 3.11; 8.x drops support +# for tf 2.13. TFLite ships with tensorflow itself, no extra dep. +coremltools>=7.1,<8.0 diff --git a/openFlowML/train.py b/openFlowML/train.py index 69a265f..063ec5a 100644 --- a/openFlowML/train.py +++ b/openFlowML/train.py @@ -41,9 +41,9 @@ def _save_keras_model(model_obj, base_path): """ Save the trained model. - Keras 2.13's preferred format is still .h5; we keep the .h5 for the - existing iOS conversion path (Phase 6) but also write the native .keras - format as the modern artifact. + Keras 2.13's preferred format is still .h5; the .h5 is the canonical + training output and the source of truth for Phase 6 mobile conversion + (CoreML + TFLite). See docs/INFERENCE.md. """ h5_path = os.path.join(base_path, 'lstm_model.h5') model_obj.save(h5_path) @@ -204,6 +204,10 @@ def main(): json.dump(config, f, indent=2) logger.info("Saved model -> %s, training_config.json alongside scalers/index JSON", h5_path) + # Phase 6 mobile conversion (CoreML + TFLite + manifest) is run as a + # separate workflow step against this same .h5 so that a conversion + # regression cannot lose the trained model. See export_mobile.py and + # the "Export mobile artifacts" step in ml_training.yml. if __name__ == '__main__': diff --git a/tests/test_export_parity.py b/tests/test_export_parity.py new file mode 100644 index 0000000..320d916 --- /dev/null +++ b/tests/test_export_parity.py @@ -0,0 +1,172 @@ +""" +Parity tests for the Phase 6 mobile export. + +Trains a tiny seq2seq model, exports it to CoreML + TFLite, and verifies that: + - both formats load in their respective runtimes + - per-sample predictions match the Keras .h5 within tolerance + - manifest.json sha256s match files on disk + - the schema block in manifest.json reflects the training config + +This is heavy (requires tensorflow + coremltools), so it's NOT in the default +`tests.yml` selection -- it runs in the opt-in export.yml workflow. Locally, +invoke with: pytest tests/test_export_parity.py +""" + +import hashlib +import json +import os + +import numpy as np +import pytest + +pytest.importorskip('tensorflow') +pytest.importorskip('coremltools') + +os.environ.setdefault('TF_CPP_MIN_LOG_LEVEL', '3') + +import model as model_mod +import windowing +import export_mobile + + +def _build_tiny_model(): + return model_mod.build_encoder_decoder( + num_stations=3, + num_basins=2, + encoder_features=len(windowing.ENCODER_FEATURES), + decoder_features=len(windowing.DECODER_FEATURES), + target_features=len(windowing.TARGET_FEATURES), + encoder_days=windowing.ENCODER_DAYS, + decoder_days=windowing.DECODER_DAYS, + lstm_units=8, + embedding_dim=4, + station_dropout=0.0, # inference-time parity: no stochasticity at all + dropout=0.0, + recurrent_dropout=0.0, + ) + + +def _write_training_config(base_path, num_stations=3, num_basins=2): + config = { + 'encoder_days': windowing.ENCODER_DAYS, + 'decoder_days': windowing.DECODER_DAYS, + 'encoder_features': windowing.ENCODER_FEATURES, + 'decoder_features': windowing.DECODER_FEATURES, + 'target_features': windowing.TARGET_FEATURES, + 'num_stations': num_stations, + 'num_basins': num_basins, + } + with open(os.path.join(base_path, 'training_config.json'), 'w') as f: + json.dump(config, f) + return config + + +def _seed_index_files(base_path): + """The exporter doesn't read these, but write them for manifest realism.""" + for name in ('scalers.json', 'station_index.json', 'basin_index.json'): + with open(os.path.join(base_path, name), 'w') as f: + json.dump({}, f) + + +def _sample_inputs(n=1): + rng = np.random.default_rng(0) + return { + 'encoder_input': rng.standard_normal( + (n, windowing.ENCODER_DAYS, len(windowing.ENCODER_FEATURES))).astype('float32'), + 'decoder_input': rng.standard_normal( + (n, windowing.DECODER_DAYS, len(windowing.DECODER_FEATURES))).astype('float32'), + 'persistence_input': rng.standard_normal( + (n, len(windowing.TARGET_FEATURES))).astype('float32'), + 'station_input': np.array([1] * n, dtype='int32'), + 'basin_input': np.array([1] * n, dtype='int32'), + } + + +def test_export_round_trips_with_parity(tmp_path): + base_path = str(tmp_path) + net = _build_tiny_model() + h5_path = os.path.join(base_path, 'lstm_model.h5') + net.save(h5_path) + config = _write_training_config(base_path) + _seed_index_files(base_path) + + result = export_mobile.export_all(base_path) + + # Both formats made it to disk. + assert result['mlpackage_zip'] and os.path.exists(result['mlpackage_zip']) + assert result['tflite'] and os.path.exists(result['tflite']) + + # Keras reference predictions. + keras_inputs = _sample_inputs(n=4) + keras_pred = net.predict(keras_inputs, verbose=0) + + # TFLite parity (one sample at a time -- fixed batch=1 in input specs). + import tensorflow as tf + interp = tf.lite.Interpreter(model_path=result['tflite']) + interp.allocate_tensors() + in_details = {d['name'].split(':')[0]: d for d in interp.get_input_details()} + out_details = interp.get_output_details() + for i in range(keras_pred.shape[0]): + for name in ('encoder_input', 'decoder_input', 'persistence_input', + 'station_input', 'basin_input'): + # Names may carry serving prefixes ("serving_default_encoder_input") + # depending on TF version; match by suffix. + matched = next((v for k, v in in_details.items() if k.endswith(name)), None) + assert matched is not None, f"TFLite input {name} not found in {list(in_details)}" + interp.set_tensor(matched['index'], keras_inputs[name][i:i+1]) + interp.invoke() + tflite_pred = interp.get_tensor(out_details[0]['index']) + assert np.max(np.abs(tflite_pred - keras_pred[i:i+1])) < 1e-3 + + # CoreML parity. + import coremltools as ct + ct_model = ct.models.MLModel(os.path.join(base_path, export_mobile.MLPACKAGE_NAME)) + for i in range(keras_pred.shape[0]): + ct_inputs = {name: keras_inputs[name][i:i+1] for name in keras_inputs} + ct_out = ct_model.predict(ct_inputs) + # CoreML names the output after the Add layer; pick the single output. + coreml_pred = next(iter(ct_out.values())) + assert np.max(np.abs(np.asarray(coreml_pred) - keras_pred[i:i+1])) < 1e-3 + + +def test_manifest_records_correct_sha256s(tmp_path): + base_path = str(tmp_path) + net = _build_tiny_model() + h5_path = os.path.join(base_path, 'lstm_model.h5') + net.save(h5_path) + _write_training_config(base_path) + _seed_index_files(base_path) + + export_mobile.export_all(base_path) + + with open(os.path.join(base_path, 'manifest.json')) as f: + manifest = json.load(f) + + # Every file the manifest lists must exist and hash to the recorded sha. + for entry in manifest['files']: + path = os.path.join(base_path, entry['name']) + assert os.path.exists(path) + h = hashlib.sha256() + with open(path, 'rb') as f: + for chunk in iter(lambda: f.read(1 << 20), b''): + h.update(chunk) + assert entry['sha256'] == h.hexdigest() + assert entry['bytes'] == os.path.getsize(path) + + +def test_manifest_schema_matches_training_config(tmp_path): + base_path = str(tmp_path) + net = _build_tiny_model() + net.save(os.path.join(base_path, 'lstm_model.h5')) + config = _write_training_config(base_path) + _seed_index_files(base_path) + + export_mobile.export_all(base_path) + + with open(os.path.join(base_path, 'manifest.json')) as f: + manifest = json.load(f) + + for key in ('encoder_days', 'decoder_days', 'encoder_features', + 'decoder_features', 'target_features', + 'num_stations', 'num_basins'): + assert manifest['schema'][key] == config[key] diff --git a/tests/test_external_baselines.py b/tests/test_external_baselines.py index 82dd7b2..9cb09a0 100644 --- a/tests/test_external_baselines.py +++ b/tests/test_external_baselines.py @@ -2,10 +2,11 @@ Smoke tests for the CBRFC + S2F baseline modules. Both modules expose a `baseline_predictions(test_samples)` hook that train.py -calls during evaluation. Until the historical archive integrations are wired -in (see module docstrings), both must return None cleanly -- not crash, not -log noise, not pollute the persistence comparison. These tests pin that -contract so the train.py integration stays safe. +calls during evaluation. The CBRFC module fetches forecasts (live AHPS for +today, NWPS archive for historical anchors) -- when no LID is mapped for a +site, or the archive returns no forecast for an anchor date, the module must +return None / empty cleanly. The S2F module remains a stub; same contract. +These tests pin that contract so the train.py integration stays safe. """ import numpy as np @@ -31,9 +32,10 @@ def _sample(site_id='USGS:09163500'): def test_cbrfc_baseline_returns_none_when_no_lid_mapped(): - # The default _AHPS_LID_TABLE is empty -> every site has no LID -> every - # fetch returns empty -> the predictions stack is None (skip cleanly). - pred = get_cbrfc.baseline_predictions([_sample()]) + # The seeded cbrfc_lid_map.json carries no site mappings by default -- + # the comment-only file means every fetch returns empty and the stacked + # prediction is None (skip cleanly). + pred = get_cbrfc.baseline_predictions([_sample('USGS:UNMAPPED')]) assert pred is None @@ -42,14 +44,14 @@ def test_cbrfc_baseline_returns_none_for_empty_test_set(): def test_cbrfc_fetch_current_skips_when_no_lid(): - df = get_cbrfc.fetch_current('USGS:UNKNOWN') + df = get_cbrfc.fetch_current('USGS:UNMAPPED') assert df.empty assert list(df.columns) == ['Date', 'cbrfc_flow'] -def test_cbrfc_fetch_historical_returns_empty_until_archive_wired(): - # Anchor in the past -> archive lookup is stubbed -> empty. - df = get_cbrfc.fetch('USGS:09163500', '2024-01-01') +def test_cbrfc_fetch_historical_skips_when_no_lid(): + # Anchor in the past, no LID mapping -> empty (no NWPS request issued). + df = get_cbrfc.fetch('USGS:UNMAPPED', '2024-01-01') assert df.empty diff --git a/tests/test_get_cbrfc.py b/tests/test_get_cbrfc.py new file mode 100644 index 0000000..b75ddb3 --- /dev/null +++ b/tests/test_get_cbrfc.py @@ -0,0 +1,134 @@ +""" +Unit tests for the CBRFC NWPS historical archive path in get_cbrfc.py. + +These tests do NOT make live network calls -- they monkey-patch +data_utils.request_with_retry to return canned NWPS payloads, so they run in +the default `pytest -m "not network"` selection. +""" + +import json +from datetime import date +from unittest.mock import MagicMock + +import pandas as pd +import pytest + +from data import get_cbrfc + + +@pytest.fixture(autouse=True) +def _reset_lid_cache(): + """Clear the module-level LID-table cache between tests.""" + if hasattr(get_cbrfc._load_lid_table, '_cache'): + delattr(get_cbrfc._load_lid_table, '_cache') + yield + if hasattr(get_cbrfc._load_lid_table, '_cache'): + delattr(get_cbrfc._load_lid_table, '_cache') + + +def _seed_lid_map(monkeypatch, mapping): + """Stub _load_lid_table so the tests don't depend on the on-disk JSON.""" + monkeypatch.setattr(get_cbrfc, '_load_lid_table', lambda: mapping) + + +def _mock_response(payload, monkeypatch): + """Make data_utils.request_with_retry return a Response-shaped object whose .json() yields `payload`.""" + response = MagicMock() + response.json.return_value = payload + response.text = json.dumps(payload) + monkeypatch.setattr(get_cbrfc.data_utils, 'request_with_retry', + lambda *args, **kwargs: response) + + +def test_nwps_historical_parses_daily_forecast(monkeypatch): + _seed_lid_map(monkeypatch, {'USGS:09163500': 'CRSC2'}) + payload = { + 'data': [ + {'validTime': '2024-01-02T00:00:00Z', 'primary': 100.0}, + {'validTime': '2024-01-02T12:00:00Z', 'primary': 120.0}, # collapses to mean + {'validTime': '2024-01-03T00:00:00Z', 'primary': 150.0}, + {'validTime': '2024-01-04T00:00:00Z', 'primary': 140.0}, + ] + } + _mock_response(payload, monkeypatch) + + df = get_cbrfc.fetch('USGS:09163500', '2024-01-01', horizon_days=14) + assert list(df.columns) == ['Date', 'cbrfc_flow'] + # Sub-daily values for 2024-01-02 collapse to the daily mean of 110.0. + by_date = dict(zip(df['Date'], df['cbrfc_flow'])) + assert by_date['2024-01-02'] == pytest.approx(110.0) + assert by_date['2024-01-03'] == pytest.approx(150.0) + assert by_date['2024-01-04'] == pytest.approx(140.0) + + +def test_nwps_historical_honors_horizon_cap(monkeypatch): + _seed_lid_map(monkeypatch, {'USGS:09163500': 'CRSC2'}) + # 20 distinct days; horizon_days=5 should keep only the first 5. + payload = { + 'data': [ + {'validTime': f'2024-01-{d:02d}T00:00:00Z', 'primary': float(d)} + for d in range(2, 22) + ] + } + _mock_response(payload, monkeypatch) + + df = get_cbrfc.fetch('USGS:09163500', '2024-01-01', horizon_days=5) + assert len(df) == 5 + + +def test_nwps_historical_returns_empty_when_lid_missing(monkeypatch): + _seed_lid_map(monkeypatch, {}) # no mapping + # The fetch should short-circuit before issuing a request. + sentinel = MagicMock(side_effect=AssertionError("request_with_retry must not be called")) + monkeypatch.setattr(get_cbrfc.data_utils, 'request_with_retry', sentinel) + + df = get_cbrfc.fetch('USGS:UNMAPPED', '2024-01-01') + assert df.empty + + +def test_nwps_historical_returns_empty_on_request_failure(monkeypatch): + _seed_lid_map(monkeypatch, {'USGS:09163500': 'CRSC2'}) + monkeypatch.setattr(get_cbrfc.data_utils, 'request_with_retry', + lambda *args, **kwargs: None) + + df = get_cbrfc.fetch('USGS:09163500', '2024-01-01') + assert df.empty + + +def test_nwps_historical_tolerates_alt_schema_wrapper(monkeypatch): + """NWPS occasionally wraps the data array under `forecast` instead of top-level `data`.""" + _seed_lid_map(monkeypatch, {'USGS:09163500': 'CRSC2'}) + payload = { + 'forecast': { + 'data': [ + {'validTime': '2024-01-02T00:00:00Z', 'primary': 200.0}, + ] + } + } + _mock_response(payload, monkeypatch) + + df = get_cbrfc.fetch('USGS:09163500', '2024-01-01') + assert len(df) == 1 + assert df.iloc[0]['cbrfc_flow'] == pytest.approx(200.0) + + +def test_fetch_today_uses_ahps_not_nwps(monkeypatch): + """For anchor_date >= today, fetch() must use the AHPS live path.""" + _seed_lid_map(monkeypatch, {'USGS:09163500': 'CRSC2'}) + calls = [] + + def _fake_request(url, **kwargs): + calls.append(url) + response = MagicMock() + response.text = '' # empty AHPS XML + return response + + monkeypatch.setattr(get_cbrfc.data_utils, 'request_with_retry', _fake_request) + get_cbrfc.fetch('USGS:09163500', date.today()) + assert calls and calls[0] == get_cbrfc.AHPS_FORECAST_URL + + +def test_lid_map_file_skips_metadata_keys(): + """The on-disk cbrfc_lid_map.json keeps a leading underscore comment row; loader must ignore it.""" + table = get_cbrfc._load_lid_table() + assert all(not k.startswith('_') for k in table)