Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions .github/reservoir_mapping.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Station -> USBR RISE catalog-item-id mapping for reservoir features.
#
# Format (comma-separated, one record per line):
# <site_id>, <reservoir_name>, <storage_itemId>, <release_itemId>
#
# Where:
# - site_id matches an entry in .github/site_ids.txt (e.g. USGS:09163500)
# - reservoir_name is human-readable, used only for logging
# - storage_itemId is a USBR RISE catalog-item id for a storage timeseries
# (acre-feet, ideally daily)
# - release_itemId is a USBR RISE catalog-item id for an outflow / release
# timeseries (cfs, ideally daily). Use an empty trailing comma if the
# reservoir has no release series available.
#
# A station with no entries here is treated as unregulated -- both reservoir
# features default to 0 and the reservoir_observed indicator stays at 0,
# which is the correct semantic for an unregulated headwater gauge.
#
# To find catalog-item ids, browse https://data.usbr.gov/rise/ or query
# https://data.usbr.gov/rise/api/catalog-item?query=<reservoir name>
# and look for the daily storage / release records you want.
#
# Multiple reservoirs may map to one station; storage values are summed
# (total water held back) and release values are summed (total outflow) at
# read time.
#
# Lines beginning with # are comments and ignored. Empty lines are ignored.
#
# Example (commented out until you've verified the right itemIds for your
# stations -- the IDs below are illustrative placeholders):
#
# USGS:09163500, Lake Powell, 6126, 6127
# USGS:09114500, Blue Mesa Reservoir, 6135, 6136
26 changes: 26 additions & 0 deletions .github/workflows/ml_training.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@ on:
schedule:
- cron: '0 0 * * 1' # Run weekly at midnight on Monday
workflow_dispatch:
inputs:
disable_smap:
description: 'Skip SMAP soil moisture (ablation)'
type: boolean
default: false
disable_drought:
description: 'Skip USDM drought (ablation)'
type: boolean
default: false
disable_reservoir:
description: 'Skip USBR reservoir (ablation)'
type: boolean
default: false

jobs:
train:
Expand All @@ -19,11 +32,24 @@ jobs:
python-version: '3.11'

- name: Install dependencies
# earthaccess + h5py + shapely are needed for the SMAP soil-moisture
# fetch wired into combine_data. They are installed AFTER tensorflow so
# pip resolves the shared transitive deps (notably typing-extensions)
# against the tensorflow pin. If the version resolver cannot find a
# compatible earthaccess, combine_data still runs -- the SMAP fetch
# silently degrades to "no data" and soil_moisture defaults to 0.
run: |
python -m pip install --upgrade pip
pip install -r openFlowML/requirements.txt
pip install earthaccess h5py shapely

- name: Train model
env:
EARTHDATA_USERNAME: ${{ secrets.EARTHDATA_USERNAME }}
EARTHDATA_PASSWORD: ${{ secrets.EARTHDATA_PASSWORD }}
OPENFLOW_DISABLE_SMAP: ${{ inputs.disable_smap && '1' || '' }}
OPENFLOW_DISABLE_DROUGHT: ${{ inputs.disable_drought && '1' || '' }}
OPENFLOW_DISABLE_RESERVOIR: ${{ inputs.disable_reservoir && '1' || '' }}
run: python openFlowML/train.py

# The model is not usable for inference without the scaler parameters
Expand Down
209 changes: 183 additions & 26 deletions openFlowML/combine_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
gaps are dropped rather than filled with a pooled (cross-station) mean
- the per-station frames carry a clean 'site_id' column

TODO (later in Phase 2): wire in SWE as a history-window feature.
Phase 4 (SMAP soil moisture):
- Soil moisture is fetched per station from NASA SMAP L3 enhanced via
nasa_moisture (lazy import; failure degrades to "no SMAP" for that site).
- Treated like SWE in the spine: slow-varying, longer interior interpolation
limit, missing rows default to 0 rather than being dropped.
- Set OPENFLOW_DISABLE_SMAP=1 to run the ablation baseline without SMAP.
"""

if not logging.getLogger().hasHandlers():
Expand All @@ -36,6 +41,17 @@
# SWE changes slowly (snowpack accumulates/melts over weeks), so we tolerate
# longer interior gaps in the SWE series before giving up on a value.
MAX_SWE_GAP_DAYS = 30
# SMAP has a ~1-3 day revisit cadence per pass; gaps come from RFI / dense
# vegetation / frozen ground. Soil moisture itself is slow-varying so the same
# generous interior interpolation limit as SWE is appropriate.
MAX_SM_GAP_DAYS = 30
# USDM is published weekly; ffill across 8-day gaps is the natural cadence
# for ffill-only handling. Drought intensity is even slower-varying than SWE.
MAX_DROUGHT_GAP_DAYS = 14
# Reservoir storage / release are reported daily by USBR; gaps are short
# (occasional missing days), so the regular MAX_GAP_DAYS interpolation limit
# is enough. Reservoir state is also slow-varying.
MAX_RESERVOIR_GAP_DAYS = 14


def _to_daily_series(df, value_columns, daily_index):
Expand All @@ -52,15 +68,19 @@ def _to_daily_series(df, value_columns, daily_index):


def merge_dataframes(noaa_data, flow_data, site_id, start_date, end_date,
swe_data=None, huc8=None):
swe_data=None, sm_data=None,
drought_data=None, reservoir_data=None,
huc8=None):
"""
Merge a site's NOAA temperature, flow, and SWE data onto one regular daily
index.
Merge a site's flow, NOAA temperature, SWE, SMAP soil moisture, USDM
drought, and USBR reservoir series onto one regular daily index.

Short interior flow/temp gaps (<= MAX_GAP_DAYS) are interpolated time-aware;
rows still missing a core value afterwards are dropped (no pooled-mean
fill). SWE is interpolated with a longer limit (it's slow-varying) and any
remaining missing values default to 0 -- they don't drop the row.
fill). All Phase 4/5 auxiliary features (SWE, soil_moisture, drought_index,
reservoir_storage, reservoir_release) are slow-varying, get longer
interpolation limits, and missing values are imputed per the column-
specific semantics rather than dropping the row.
"""
if 'Date' not in noaa_data or 'Date' not in flow_data:
raise ValueError("'Date' column missing in one of the dataframes")
Expand Down Expand Up @@ -96,12 +116,88 @@ def merge_dataframes(noaa_data, flow_data, site_id, start_date, end_date,
else:
combined['SWE'] = float('nan')

# Drop rows still missing any core flow/temp value. SWE is NOT in
# CORE_COLUMNS, so a missing SWE never drops a row.
# SMAP soil moisture: slow-varying surface state; generous interior
# interpolation limit (matches SWE). Edge handling happens AFTER the
# core-column dropna below, where we have the final row set.
if sm_data is not None and not sm_data.empty:
sm_daily = _to_daily_series(sm_data, ['soil_moisture'], daily_index)
sm_daily['soil_moisture'] = pd.to_numeric(sm_daily['soil_moisture'], errors='coerce')
sm_daily = sm_daily.interpolate(method='time', limit=MAX_SM_GAP_DAYS, limit_area='inside')
combined['soil_moisture'] = sm_daily['soil_moisture']
else:
combined['soil_moisture'] = float('nan')

# USDM drought intensity: weekly snapshots, forward-filled. ffill is
# the right move for a step-function weekly series (interpolation
# would smear discrete category changes into ramps).
if drought_data is not None and not drought_data.empty:
d_daily = _to_daily_series(drought_data, ['drought_index'], daily_index)
d_daily['drought_index'] = pd.to_numeric(d_daily['drought_index'], errors='coerce')
d_daily['drought_index'] = d_daily['drought_index'].ffill(limit=MAX_DROUGHT_GAP_DAYS)
combined['drought_index'] = d_daily['drought_index']
else:
combined['drought_index'] = float('nan')

# USBR reservoir storage + release: daily series, slow-varying.
# Interpolate short interior gaps. Empty df = unregulated station
# (no mapping entry); the column stays NaN and is handled below.
if reservoir_data is not None and not reservoir_data.empty:
r_daily = _to_daily_series(
reservoir_data,
['reservoir_storage', 'reservoir_release'],
daily_index,
)
for col in ('reservoir_storage', 'reservoir_release'):
r_daily[col] = pd.to_numeric(r_daily[col], errors='coerce')
r_daily = r_daily.interpolate(method='time',
limit=MAX_RESERVOIR_GAP_DAYS,
limit_area='inside')
combined['reservoir_storage'] = r_daily['reservoir_storage']
combined['reservoir_release'] = r_daily['reservoir_release']
else:
combined['reservoir_storage'] = float('nan')
combined['reservoir_release'] = float('nan')

# Drop rows still missing any core flow/temp value. SWE / soil_moisture
# are NOT in CORE_COLUMNS, so a missing one never drops a row.
before = len(combined)
combined = combined.dropna(subset=CORE_COLUMNS)
# Any remaining SWE gaps default to 0 ("no snow data" / out-of-season).
# SWE: 0 means "no snow", which IS a legitimate default; keep that.
combined['SWE'] = combined['SWE'].fillna(0.0)
# Soil moisture: 0 means "Sahara desert", which is NOT a legitimate
# default for a SMAP gap (RFI / frozen ground / sensor outage). We
# carry the last interpolated value forward and backward (SM is slow-
# varying), then fall back to the station's median observed SM where
# ffill/bfill can't reach, and only to 0 if the station has no
# observations at all. An sm_observed indicator (1 = real or short-
# gap interpolated retrieval, 0 = imputed via ffill / median fallback)
# gives the model a way to tell the truth from the imputation.
sm_series = combined['soil_moisture']
combined['sm_observed'] = sm_series.notna().astype('int64')
sm_series = sm_series.ffill().bfill()
median = sm_series.median(skipna=True)
if pd.isna(median):
median = 0.0
combined['soil_moisture'] = sm_series.fillna(median)

# Drought: 0 means "no drought anywhere in the HUC", which IS a
# legitimate default for a missing weekly value. Fill remaining gaps
# with 0 -- the model can treat absence as "we don't have a drought
# signal here" (which behaves equivalently to no drought).
combined['drought_index'] = combined['drought_index'].fillna(0.0)

# Reservoir: 0 storage / 0 release reads as "tiny / empty reservoir",
# which IS misleading -- the right semantic for an unregulated gauge
# is "no upstream reservoir at all". A reservoir_observed indicator
# (1 = mapped + retrieval succeeded for this row, 0 = unmapped / no
# data) tells the model when the storage / release columns are real.
# The numeric columns then default to 0 only as a placeholder; the
# indicator carries the truth.
res_series = combined['reservoir_storage']
combined['reservoir_observed'] = res_series.notna().astype('int64')
combined['reservoir_storage'] = res_series.ffill().bfill().fillna(0.0)
rel_series = combined['reservoir_release']
combined['reservoir_release'] = rel_series.ffill().bfill().fillna(0.0)
logger.info(
"Site %s: %d/%d daily rows usable after gap handling",
site_id, len(combined), before,
Expand All @@ -119,14 +215,25 @@ def merge_dataframes(noaa_data, flow_data, site_id, start_date, end_date,
return pd.DataFrame()


def _disabled(env_var):
"""Truthy-toggle env-var check, used for the per-source ablation levers."""
return os.getenv(env_var, '').strip() in ('1', 'true', 'True')


def fetch_and_process_data(prefix, site_id, start_date, end_date, flow_data):
"""
Resolve a site's coordinates and fetch its NOAA temperature + HUC SWE
series, and its HUC8 basin id (used as a Phase 3 basin embedding key).

Returns (noaa_data, swe_data, huc8). Either dataframe may be empty (SWE
degrades gracefully; missing NOAA causes the caller to skip the site).
huc8 may be None when the lookup fails.
Resolve a site's coordinates and fetch all its auxiliary timeseries:
NOAA temperature, NRCS SWE, SMAP soil moisture, USDM drought, USBR
reservoir storage + release, and the enclosing HUC8 basin id.

Returns a dict with keys:
noaa_data, swe_data, sm_data, drought_data, reservoir_data, huc8
Any data frame may be empty (auxiliary features degrade gracefully; only
missing NOAA causes the caller to skip the site). huc8 may be None.

Per-source env vars short-circuit the corresponding fetch to empty for
ablation runs:
OPENFLOW_DISABLE_SMAP, OPENFLOW_DISABLE_DROUGHT, OPENFLOW_DISABLE_RESERVOIR
"""
if prefix == "USGS":
coords_dict = get_usgs_coordinates(site_id)
Expand All @@ -137,7 +244,7 @@ def fetch_and_process_data(prefix, site_id, start_date, end_date, flow_data):

if not coords_dict:
logger.error(f"Could not resolve coordinates for {prefix}:{site_id}. Skipping...")
return None, None, None
return None

latitude = float(coords_dict['latitude'])
longitude = float(coords_dict['longitude'])
Expand All @@ -149,7 +256,7 @@ def fetch_and_process_data(prefix, site_id, start_date, end_date, flow_data):

if noaa_data is None or noaa_data.empty:
logger.warning(f"No NOAA data available for site ID {site_id}. Skipping...")
return None, None, None
return None

noaa_data = noaa_data.copy()
noaa_data['USGS_site_ID'] = site_id
Expand All @@ -165,17 +272,60 @@ def fetch_and_process_data(prefix, site_id, start_date, end_date, flow_data):
if not huc8:
logger.warning("No HUC8 resolved for %s -- basin embedding will fall back", site_id)

# SWE is a history-window feature; degrade gracefully if the AWDB fetch
# fails -- the row stays, SWE defaults to 0 in merge_dataframes.
# SWE: graceful failure -> empty.
try:
swe_data = get_swe.get_swe(latitude, longitude, start_date, end_date)
except Exception as e:
logger.warning("SWE fetch failed for %s: %s", site_id, e)
swe_data = pd.DataFrame(columns=['Date', 'SWE'])

# Gap handling and numeric coercion happen in merge_dataframes (per station,
# on the regular daily index) -- not here, and never via a pooled mean.
return noaa_data, swe_data, huc8
# SMAP soil moisture: lazy import (heavy deps), env-var lever, graceful.
sm_data = pd.DataFrame(columns=['Date', 'soil_moisture'])
if _disabled('OPENFLOW_DISABLE_SMAP'):
logger.info("OPENFLOW_DISABLE_SMAP set -- skipping SMAP for %s", site_id)
else:
try:
from data import nasa_moisture
sm_data = nasa_moisture.main(latitude, longitude, start_date, end_date)
except ImportError as e:
logger.warning("Soil-moisture deps unavailable (%s); skipping SMAP for %s",
e, site_id)
except Exception as e:
logger.warning("SMAP fetch failed for %s: %s", site_id, e)

# USDM drought: light deps, env-var lever, graceful.
drought_data = pd.DataFrame(columns=['Date', 'drought_index'])
if _disabled('OPENFLOW_DISABLE_DROUGHT'):
logger.info("OPENFLOW_DISABLE_DROUGHT set -- skipping USDM for %s", site_id)
else:
try:
from data import get_drought
drought_data = get_drought.get_drought(latitude, longitude, start_date, end_date)
except Exception as e:
logger.warning("USDM drought fetch failed for %s: %s", site_id, e)

# USBR reservoir: needs a station->reservoir mapping; empty when no entry.
reservoir_data = pd.DataFrame(
columns=['Date', 'reservoir_storage', 'reservoir_release'])
if _disabled('OPENFLOW_DISABLE_RESERVOIR'):
logger.info("OPENFLOW_DISABLE_RESERVOIR set -- skipping RISE for %s",
site_id)
else:
try:
from data import get_reservoir
reservoir_data = get_reservoir.get_reservoir(
f"{prefix}:{site_id}", start_date, end_date)
except Exception as e:
logger.warning("USBR reservoir fetch failed for %s: %s", site_id, e)

return {
'noaa_data': noaa_data,
'swe_data': swe_data,
'sm_data': sm_data,
'drought_data': drought_data,
'reservoir_data': reservoir_data,
'huc8': huc8,
}


def get_site_ids(filename=None):
Expand Down Expand Up @@ -231,15 +381,22 @@ def main(training_num_years=7):
logger.warning(f"Unrecognized prefix for site ID {site_id}. Skipping...")
continue

noaa_dataframe, swe_dataframe, huc8 = fetch_and_process_data(
fetched = fetch_and_process_data(
prefix, id, start_date, end_date, flow_dataframe)
if noaa_dataframe is None or noaa_dataframe.empty or flow_dataframe.empty:
if (fetched is None
or fetched['noaa_data'] is None
or fetched['noaa_data'].empty
or flow_dataframe.empty):
logger.warning(f"No usable data for site ID {site_id}. Skipping...")
continue

merged = merge_dataframes(
noaa_dataframe, flow_dataframe, site_id, start_date, end_date,
swe_data=swe_dataframe, huc8=huc8)
fetched['noaa_data'], flow_dataframe, site_id, start_date, end_date,
swe_data=fetched['swe_data'],
sm_data=fetched['sm_data'],
drought_data=fetched['drought_data'],
reservoir_data=fetched['reservoir_data'],
huc8=fetched['huc8'])
if merged.empty:
logger.warning(f"No usable merged data for site ID {site_id}. Skipping...")
continue
Expand Down
Loading
Loading