Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/ml_training.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,14 @@ jobs:
# station + basin index maps, and training_config.json. All artifacts
# travel together; manifest.json carries sha256s + schema so the
# mobile app can verify what it downloaded. See docs/INFERENCE.md.
# lstm_model_int8.tflite is shipped only if the int8 parity check in
# export_mobile.py passes; if it didn't ship, softprops/action-gh-release
# quietly skips the missing file.
files: |
lstm_model.h5
lstm_model.mlpackage.zip
lstm_model.tflite
lstm_model_int8.tflite
scalers.json
station_index.json
basin_index.json
Expand All @@ -119,6 +123,7 @@ jobs:
./lstm_model.h5
./lstm_model.mlpackage.zip
./lstm_model.tflite
./lstm_model_int8.tflite
./scalers.json
./station_index.json
./basin_index.json
Expand Down
22 changes: 19 additions & 3 deletions docs/INFERENCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ Every successful weekly training run on `main` produces a GitHub Release tagged
| --- | --- |
| `lstm_model.h5` | Canonical Keras model. The source of truth; everything else is derived. |
| `lstm_model.mlpackage.zip` | CoreML mlprogram for iOS (iOS 15+). Unzip and pass to `MLModel(contentsOf:)`. |
| `lstm_model.tflite` | TFLite for Android. May require the Flex delegate — see TFLite note below. |
| `lstm_model.tflite` | TFLite (float32) for Android. May require the Flex delegate — see TFLite note below. |
| `lstm_model_int8.tflite` | Optional dynamic-range int8 TFLite. Smaller, slightly less accurate. Only present when it passed the parity gate; see `manifest.tflite_int8_max_abs_diff`. |
| `scalers.json` | Per-column scaler params (mean, scale, transform). Inputs and outputs are in scaled space; you invert with this. |
| `station_index.json` | Map `site_id` → embedding index. Index `0` is reserved for "unseen". |
| `basin_index.json` | Map HUC8 → embedding index. Index `0` is reserved for "unseen". |
Expand All @@ -37,8 +38,8 @@ The model takes a dict of 5 inputs. Names are stable across exports:

| Name | dtype | Shape | Source |
| --- | --- | --- | --- |
| `encoder_input` | float32 | `[1, 60, len(encoder_features)]` | Last 60 days of features, columns in `training_config.encoder_features` order, pre-scaled. |
| `decoder_input` | float32 | `[1, 14, len(decoder_features)]` | Next 14 days of forecast-time-available features, columns in `training_config.decoder_features` order, pre-scaled. |
| `encoder_input` | float32 | `[1, 60, len(encoder_features)]` | Last 60 days of features, columns in `training_config.encoder_features` order, pre-scaled. Includes observed precipitation (`precipitation`, mm/day from GHCND PRCP). |
| `decoder_input` | float32 | `[1, 14, len(decoder_features)]` | Next 14 days of forecast-time-available features, columns in `training_config.decoder_features` order, pre-scaled. Includes forecast precipitation — pull `precipitation_sum` from Open-Meteo's daily forecast (`data/get_forecast.py` does this server-side; mobile clients can call Open-Meteo directly). |
| `persistence_input` | float32 | `[1, len(target_features)]` | Last encoder-day flow values (the `Min Flow` / `Max Flow` columns from the last encoder row), in scaled space. |
| `station_input` | int32 | `[1]` | `station_index.json[site_id]`, or `0` if the site isn't in the map. |
| `basin_input` | int32 | `[1]` | `basin_index.json[huc8]`, or `0` if the HUC isn't in the map. |
Expand Down Expand Up @@ -92,3 +93,18 @@ Both embedding layers reserve index `0` for unknown. If the user picks a site no
## Reference fixture

The export parity tests in `tests/test_export_parity.py` generate synthetic inputs, run them through Keras / CoreML / TFLite, and assert they agree within `1e-3`. They're the executable spec of this document — when in doubt, read that file.

## Publishing the first release

The weekly cron at `.github/workflows/ml_training.yml` produces a Release tagged `model-YYYY.MM.DD` whenever it runs on `main`. To create the first release before the next Monday cron fires, after this branch is merged:

```bash
gh workflow run ml_training.yml --ref main
gh run watch # follow until done
```

Verify the release appeared with all expected files (8 if int8 shipped, 7 otherwise) and a valid manifest:

```bash
gh release view "model-$(date -u +%Y.%m.%d)" --json assets,name
```
35 changes: 31 additions & 4 deletions openFlowML/combine_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
# Longest interior gap (in days) we are willing to interpolate across for flow
# and temperature, which can change quickly day-to-day.
MAX_GAP_DAYS = 7
# Precipitation is far less amenable to interpolation than temperature: most
# days are dry, storms are spike events, and "average two dry days around a
# storm to fill the storm day" is meaningfully wrong. Use a very short interior
# interpolation limit (carries through ~2-day missing-data sensor outages) and
# then default to 0 ("no rain") for anything still missing.
MAX_PRECIP_GAP_DAYS = 2
# SWE changes slowly (snowpack accumulates/melts over weeks), so we tolerate
# longer interior gaps in the SWE series before giving up on a value.
MAX_SWE_GAP_DAYS = 30
Expand Down Expand Up @@ -94,7 +100,11 @@ def merge_dataframes(noaa_data, flow_data, site_id, start_date, end_date,
)

flow_daily = _to_daily_series(flow_data, ['Min Flow', 'Max Flow'], daily_index)
noaa_daily = _to_daily_series(noaa_data, ['TMIN', 'TMAX'], daily_index)
# NOAA fetch now returns precipitation alongside TMIN/TMAX (get_noaa.py
# surfaces GHCND PRCP as `precipitation` in mm). The column may still
# be absent if the NCEI station had no PRCP coverage at all.
noaa_daily = _to_daily_series(
noaa_data, ['TMIN', 'TMAX', 'precipitation'], daily_index)

combined = pd.concat([flow_daily, noaa_daily], axis=1)
for col in CORE_COLUMNS:
Expand All @@ -104,8 +114,17 @@ def merge_dataframes(noaa_data, flow_data, site_id, start_date, end_date,

# Time-aware interpolation of short interior gaps only (limit_area
# 'inside' means no edge extrapolation). `combined` is a single station,
# so this interpolation never bleeds across station boundaries.
combined = combined.interpolate(method='time', limit=MAX_GAP_DAYS, limit_area='inside')
# so this interpolation never bleeds across station boundaries. We
# interpolate the core flow/temp columns at MAX_GAP_DAYS and handle
# precipitation separately just below with a much shorter limit
# (interpolation across a missed storm day is misleading).
combined[CORE_COLUMNS] = combined[CORE_COLUMNS].interpolate(
method='time', limit=MAX_GAP_DAYS, limit_area='inside')
if 'precipitation' in combined.columns:
combined['precipitation'] = pd.to_numeric(
combined['precipitation'], errors='coerce')
combined['precipitation'] = combined['precipitation'].interpolate(
method='time', limit=MAX_PRECIP_GAP_DAYS, limit_area='inside')

# SWE: separate, slow-varying series; longer interpolation limit.
if swe_data is not None and not swe_data.empty:
Expand Down Expand Up @@ -159,9 +178,17 @@ def merge_dataframes(noaa_data, flow_data, site_id, start_date, end_date,
combined['reservoir_release'] = float('nan')

# Drop rows still missing any core flow/temp value. SWE / soil_moisture
# are NOT in CORE_COLUMNS, so a missing one never drops a row.
# / precipitation are NOT in CORE_COLUMNS, so a missing one never
# drops a row.
before = len(combined)
combined = combined.dropna(subset=CORE_COLUMNS)
# Precipitation: 0 mm means "no measurable rain", a legitimate default
# for any post-interpolation gap (NCEI station with no PRCP coverage,
# short outage that exceeded MAX_PRECIP_GAP_DAYS, etc.). Most days in
# most basins are zero anyway, so this defaults to the modal value.
if 'precipitation' not in combined.columns:
combined['precipitation'] = 0.0
combined['precipitation'] = combined['precipitation'].fillna(0.0)
# SWE: 0 means "no snow", which IS a legitimate default; keep that.
combined['SWE'] = combined['SWE'].fillna(0.0)
# Soil moisture: 0 means "Sahara desert", which is NOT a legitimate
Expand Down
2 changes: 1 addition & 1 deletion openFlowML/data/cbrfc_lid_map.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"_comment": "Map of OpenFlow site_id (USGS:XXXX or DWR:XXXX) to NWS / AHPS 5-letter LID. Used by get_cbrfc.py for both live AHPS forecast retrieval and NWPS historical forecast archive lookup. Add a row when wiring CBRFC for a new gauge; a missing site_id silently skips the CBRFC baseline for that station (it remains a fully optional comparison baseline). Find the LID for a USGS site by searching https://water.weather.gov/ahps2/ for the gauge name and reading the LID from the page URL."
"_comment": "Map of OpenFlow site_id (USGS:XXXX or DWR:XXXX) to NWS / AHPS 5-letter LID. Used by data/get_cbrfc.py for both the live AHPS forecast and the historical NWPS forecast archive. To wire a new gauge, run: python -m data.find_lid --site-id USGS:09163500 -- this prints the closest NWS gauges via the NWPS API. Copy the chosen LID into this file and re-run training. A missing site_id silently skips the CBRFC baseline for that station; CBRFC is an optional comparison baseline, not a training input."
}
142 changes: 142 additions & 0 deletions openFlowML/data/find_lid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
"""
Helper: find candidate NWS / AHPS LIDs near a given latitude/longitude.

The CBRFC baseline (data/get_cbrfc.py) needs a mapping site_id -> NWS LID.
Mappings are curated by hand in data/cbrfc_lid_map.json because there's no
algorithmic equivalence between a USGS site number and an AHPS LID. This
script queries the NWPS gauge index for gauges within a radius and prints
their LID + name + distance so a maintainer can pick the right one.

Usage:
python -m data.find_lid --lat 38.52 --lon -106.96
python -m data.find_lid --site-id USGS:09163500 # auto-resolves coords

Output is a ranked list; the closest gauge by haversine is usually the right
match for a flow-monitoring gauge near the LID's location. Copy the chosen
mapping into data/cbrfc_lid_map.json and re-run train.py to enable CBRFC
backtesting for that site.
"""

import argparse
import logging
import math
import sys
from typing import List, Optional

from data.utils import data_utils

logger = logging.getLogger(__name__)
if not logging.getLogger().hasHandlers():
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')

# NWPS publishes a gauges index; lat-bbox filter keeps the response small.
NWPS_GAUGES_URL = "https://api.water.noaa.gov/nwps/v1/gauges"


def _haversine(lat1, lon1, lat2, lon2):
"""Great-circle distance in km."""
R = 6371.0
lat1r, lat2r = math.radians(lat1), math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2) ** 2 + math.cos(lat1r) * math.cos(lat2r) * math.sin(dlon / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))


def find_nearest_lids(lat: float, lon: float, *,
bbox_deg: float = 0.5, top_n: int = 5) -> List[dict]:
"""
Return up to `top_n` NWS gauges nearest to (lat, lon) within a
±bbox_deg lat/lon window. Each entry has {lid, name, lat, lon,
distance_km, rfc}.
"""
params = {
# NWPS supports bbox filtering as a comma-joined "minLon,minLat,maxLon,maxLat".
'bbox': f"{lon - bbox_deg},{lat - bbox_deg},{lon + bbox_deg},{lat + bbox_deg}",
}
response = data_utils.request_with_retry(NWPS_GAUGES_URL, params=params)
if response is None:
logger.error("NWPS gauge search returned no response")
return []
try:
payload = response.json()
except ValueError:
logger.error("NWPS returned non-JSON")
return []
gauges = payload.get('gauges') or payload.get('data') or []
enriched = []
for g in gauges:
try:
g_lat = float(g.get('latitude') or g.get('lat'))
g_lon = float(g.get('longitude') or g.get('lon'))
except (TypeError, ValueError):
continue
enriched.append({
'lid': g.get('lid') or g.get('id'),
'name': g.get('name') or g.get('siteName', ''),
'lat': g_lat,
'lon': g_lon,
'distance_km': _haversine(lat, lon, g_lat, g_lon),
'rfc': g.get('rfc') or g.get('forecastCenter', ''),
})
enriched.sort(key=lambda x: x['distance_km'])
return enriched[:top_n]


def _coords_for_site_id(site_id: str) -> Optional[tuple]:
"""Resolve an OpenFlow site_id (USGS:XXX or DWR:XXX) to (lat, lon) via the existing coordinate utilities."""
from data.utils.get_coordinates import get_usgs_coordinates, get_dwr_coordinates
prefix, _, sid = site_id.partition(':')
if prefix == 'USGS':
coords = get_usgs_coordinates(sid)
elif prefix == 'DWR':
coords = get_dwr_coordinates(sid)
else:
logger.error("Unsupported prefix in %s", site_id)
return None
if not coords:
return None
return float(coords['latitude']), float(coords['longitude'])


def main():
parser = argparse.ArgumentParser(description=__doc__)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--site-id', help='OpenFlow site_id like USGS:09163500')
group.add_argument('--latlon', nargs=2, type=float, metavar=('LAT', 'LON'),
help='Direct lat lon')
parser.add_argument('--top-n', type=int, default=5)
parser.add_argument('--bbox-deg', type=float, default=0.5,
help='Search half-window in degrees (default 0.5 ~= 55 km)')
args = parser.parse_args()

if args.site_id:
coords = _coords_for_site_id(args.site_id)
if not coords:
logger.error("Could not resolve coordinates for %s", args.site_id)
return 1
lat, lon = coords
print(f"# Site {args.site_id} -> ({lat}, {lon})")
else:
lat, lon = args.latlon

hits = find_nearest_lids(lat, lon, bbox_deg=args.bbox_deg, top_n=args.top_n)
if not hits:
print("# No NWS gauges found within the bbox; widen --bbox-deg.")
return 1
print(f"# Top {len(hits)} NWS gauges near ({lat:.4f}, {lon:.4f}):")
print("# LID distance_km name RFC")
for h in hits:
print(f" {h['lid']:6s} {h['distance_km']:>10.2f} "
f"{(h['name'] or '')[:40]:40s} {h['rfc']}")
if args.site_id:
top = hits[0]
print()
print(f"# To wire the closest match, append to data/cbrfc_lid_map.json:")
print(f' "{args.site_id}": "{top["lid"]}"')
return 0


if __name__ == "__main__":
sys.exit(main())
55 changes: 36 additions & 19 deletions openFlowML/data/get_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,21 @@
from data.utils import data_utils

"""
14-day daily temperature forecast for a point.

The model design (see the wiki + the Phase 2 plan) needs forecast TMIN/TMAX as
inputs to the model's forecast window. The official NWS api.weather.gov
forecast caps at ~7 days, so we fetch from Open-Meteo's free public API,
which serves GFS-based daily forecasts out to 16 days -- this is the
"GFS / outlooks to reach 14 days" path the user picked.

Training uses *actual* (shifted) historical TMIN/TMAX in the forecast window;
this module is what an inference run will call. Importantly, this is point
data only -- soil moisture / SWE are current-conditions features, not things
this fetcher produces.
14-day daily TMIN/TMAX + precipitation forecast for a point.

The model design (see the wiki + the Phase 2 plan) needs forecast TMIN/TMAX
*and* daily precipitation as inputs to the model's forecast window. The
official NWS api.weather.gov forecast caps at ~7 days, so we fetch from
Open-Meteo's free public API, which serves GFS-based daily forecasts out to
16 days -- this is the "GFS / outlooks to reach 14 days" path the user picked.

Training uses *actual* (shifted) historical TMIN/TMAX/precipitation in the
forecast window; this module is what an inference run will call. Importantly,
this is point data only -- soil moisture / SWE are current-conditions
features, not things this fetcher produces.

Units: TMIN/TMAX in Fahrenheit (matches GHCND TMIN/TMAX), precipitation in
millimetres (matches GHCND PRCP after combine_data's tenths-of-mm conversion).
"""

if not logging.getLogger().hasHandlers():
Expand All @@ -33,21 +36,23 @@


def _empty_forecast():
return pd.DataFrame(columns=['Date', 'TMIN', 'TMAX'])
return pd.DataFrame(columns=['Date', 'TMIN', 'TMAX', 'precipitation'])


def get_open_meteo_forecast(lat, lon, days=DEFAULT_HORIZON_DAYS, temperature_unit='fahrenheit'):
"""
Fetch a daily TMIN/TMAX forecast from Open-Meteo for the next `days` days
starting today. Default unit is Fahrenheit to match the GHCND TMIN/TMAX
feature scale used during training.
Fetch a daily TMIN/TMAX + precipitation forecast from Open-Meteo for the
next `days` days starting today. Default temperature unit is Fahrenheit to
match the GHCND TMIN/TMAX feature scale used during training. Precipitation
is in mm (Open-Meteo's `precipitation_sum` default), matching the GHCND
PRCP unit after combine_data's tenths-of-mm conversion.
"""
if days < 1 or days > MAX_HORIZON_DAYS:
raise ValueError(f"days must be in 1..{MAX_HORIZON_DAYS}")
params = {
"latitude": lat,
"longitude": lon,
"daily": "temperature_2m_max,temperature_2m_min",
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum",
"forecast_days": days,
"temperature_unit": temperature_unit,
"timezone": "America/Denver",
Expand All @@ -65,16 +70,28 @@ def get_open_meteo_forecast(lat, lon, days=DEFAULT_HORIZON_DAYS, temperature_uni
dates = daily.get('time') or []
tmax = daily.get('temperature_2m_max') or []
tmin = daily.get('temperature_2m_min') or []
precip = daily.get('precipitation_sum') or []
if not dates or len(dates) != len(tmax) or len(dates) != len(tmin):
logger.error("Unexpected Open-Meteo payload shape")
return _empty_forecast()
# precipitation_sum is the newest of the three fields requested; tolerate
# an older Open-Meteo deployment that doesn't return it by zero-filling.
if len(precip) != len(dates):
logger.warning("Open-Meteo returned no precipitation_sum; defaulting to 0")
precip = [0.0] * len(dates)

return pd.DataFrame({'Date': dates, 'TMIN': tmin, 'TMAX': tmax})
return pd.DataFrame({
'Date': dates,
'TMIN': tmin,
'TMAX': tmax,
'precipitation': precip,
})


def get_forecast(lat, lon, days=DEFAULT_HORIZON_DAYS):
"""
Public API: a 14-day daily TMIN/TMAX forecast for (lat, lon).
Public API: a 14-day daily TMIN/TMAX + precipitation forecast for
(lat, lon).

Currently always Open-Meteo (GFS-backed). Kept as a thin wrapper so a
future api.weather.gov-primary / Open-Meteo-fallback blend can drop in
Expand Down
Loading
Loading