Implement temporal resampling for derived managed datasets [CLIM-679]#63
Implement temporal resampling for derived managed datasets [CLIM-679]#63
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces temporal resampling as a first-class “derived dataset” workflow in Climate API, enabling locally managed Zarr artifacts to be materialized into coarser temporal resolutions (e.g., hourly→daily, daily→weekly/monthly) and surfaced through the existing managed-dataset/publication surfaces.
Changes:
- Adds
sync_kind: derived+resamplevalidation to the dataset registry and corresponding schemas/logic, including new derived dataset templates for CHIRPS3 and ERA5-Land. - Extends shared time utilities + sync planning helpers to support ISO weekly periods (
YYYY-Www). - Adds a new synchronous derived processing API (
POST /processes/resample) plus a local resampling/materialization service that persists derived Zarr artifacts and registers them in the artifact store.
Reviewed changes
Copilot reviewed 20 out of 21 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_shared_time.py | Adds weekly-period parsing/normalization test coverage. |
| tests/test_processing_routes.py | Adds route-level tests for POST /processes/resample. |
| tests/test_processing_resample.py | Adds resampling/materialization tests (hourly→daily, daily→weekly/monthly, overwrite/reuse cases). |
| tests/test_datasets_sync.py | Adds weekly support tests for sync planning helpers and derived sync-kind behavior. |
| tests/test_dataset_registry.py | Adds validation tests for sync_kind: derived and resample blocks. |
| src/climate_api/shared/time.py | Implements weekly period support and weekly-aware numpy period conversion. |
| src/climate_api/publications/services.py | Extracts managed_dataset_id_for_scope() for stable managed dataset IDs from scope inputs. |
| src/climate_api/processing/init.py | Introduces the processing package marker. |
| src/climate_api/processing/routes.py | Adds /processes/resample route handler. |
| src/climate_api/processing/schemas.py | Adds Pydantic request/response models for the resample process API. |
| src/climate_api/processing/services.py | Adds service wrapper to run resample materialization and return dataset summaries. |
| src/climate_api/processing/resample.py | Implements the derived resampling materialization pipeline and Zarr persistence. |
| src/climate_api/main.py | Mounts the new processing router under /processes. |
| src/climate_api/ingestions/sync_engine.py | Adds weekly handling in _default_target_end / _next_period_start, and marks derived datasets as not syncable (for now). |
| src/climate_api/ingestions/services.py | Adds derived artifact storage helper and overwrite-aware artifact-record upsert support. |
| src/climate_api/ingestions/schemas.py | Adds SyncKind.DERIVED. |
| src/climate_api/data_registry/services/datasets.py | Adds dataset-template validation for resample blocks and supported derived sync kind. |
| src/climate_api/data_accessor/services/accessor.py | Normalizes temporal coverage scalar conversion for numpy/xarray outputs (incl. weekly). |
| data/datasets/era5_land.yaml | Adds derived daily ERA5-Land templates (temperature + precipitation). |
| data/datasets/chirps3.yaml | Adds derived weekly/monthly CHIRPS3 precipitation templates. |
| .gitignore | Ignores runtime-generated data/derived. |
| ), | ||
| prefer_zarr=True, | ||
| ) | ||
| if existing is not None and not overwrite: |
| existing = ingestion_services._find_existing_artifact( | ||
| dataset_id=str(target_dataset["id"]), | ||
| request_scope=ArtifactRequestScope( | ||
| start=normalized_start, | ||
| end=normalized_end, | ||
| extent_id=extent_id, | ||
| bbox=(bbox[0], bbox[1], bbox[2], bbox[3]) if bbox is not None and extent_id is None else None, | ||
| ), |
| time_dim = get_time_dim(source_ds) | ||
| target_end_exclusive = parse_period_string_to_datetime( | ||
| sync_engine._next_period_start(end, period_type=target_period_type) | ||
| ).replace(tzinfo=None) | ||
| target_start = parse_period_string_to_datetime(start).replace(tzinfo=None) | ||
| subset = source_ds.where(source_ds[time_dim] >= np.datetime64(target_start), drop=True) | ||
| subset = subset.where(subset[time_dim] < np.datetime64(target_end_exclusive), drop=True) | ||
| if subset.sizes.get(time_dim, 0) == 0: | ||
| raise HTTPException(status_code=409, detail="Source artifact contains no data for the requested resample range") | ||
| source_end = _coord_to_datetime(subset[time_dim].values[-1]) |
| DATA_DIR = Path(__file__).resolve().parent.parent.parent.parent / "data" | ||
| ARTIFACTS_DIR = DATA_DIR / "artifacts" | ||
| ARTIFACTS_INDEX_PATH = ARTIFACTS_DIR / "records.json" | ||
| DERIVED_DIR = DATA_DIR / "derived" |
turban
left a comment
There was a problem hiding this comment.
This is a solid foundation for derived datasets. The core resampling logic is well-structured, the test coverage (26 new test functions across test_processing_resample.py and test_processing_routes.py) is thorough, and the week/month edge-case handling in _drop_incomplete_edge_periods is the kind of correctness work that often gets skipped. The observations below are organised by theme.
Blockers are things that will cause a direct conflict or rework when PR #59 (CLIM-683) or the process registry (#65) lands. Suggestions are improvements that aren't blocking.
Blockers — conflicts with PR #59 (CLIM-683)
1. data_registry/services/datasets.py — reverts the importlib.resources and CLIMATE_API_CONFIG work
PR #63 is branched from main before CLIM-683 merges. The version of datasets.py on this branch retains the Path(__file__)-relative CONFIGS_DIR constant and a list_datasets() that knows nothing about CLIMATE_API_CONFIG. CLIM-683 replaces this with importlib.resources for bundled data and a merged overlay from CLIMATE_API_CONFIG.datasets_dir. When rebasing, keep the _load_builtin_datasets() / _load_from_dir() split, and integrate _validate_resample into the merged _validate_dataset_template.
2. data_registry/services/datasets.py — ingestion block validation is absent
CLIM-683 requires an ingestion.eo_function key for non-derived templates. PR #63's validator doesn't have this (it predates the rename from cache_info). After rebase, validation needs a conditional:
if sync_kind != "derived":
# require ingestion.eo_function (from CLIM-683)
else:
# require resample block (from PR #63)Without this, a non-derived template with a missing eo_function will load silently and fail only when the sync engine tries to call it.
3. main.py — /extents prefix reverts to pre-CLIM-683 form
CLIM-683 changes the route prefix from /extents to /extent (singular). PR #63 re-registers it at /extents. After rebase this should be /extent.
4. main.py — module-level app instead of create_app() factory
CLIM-683 wraps all router registration inside a create_app() factory function. The new processing_routes import and include_router call should go inside create_app(), not at module level.
5. Dataset YAMLs — cache_info retained; ingestion absent on derived templates
CLIM-683 renames cache_info to ingestion across all source dataset templates. The derived templates have neither, which is correct — but only once validation correctly exempts derived from the ingestion requirement (see point 2).
Blockers — conflicts with issue #65 (process registry)
6. resample YAML key will need renaming when the process registry lands
Issue #65 proposes a two-level design:
sync_kind: derived
processing:
process_id: resample
params:
method: mean
period: monthlyPR #63 uses a top-level resample: key. When normals or any second process is added, it needs its own top-level key and its own dispatch branch. Renaming resample: to processing: now (keeping the contents the same, adding process_id: resample) makes the #65 migration a schema addition rather than a rename across all built-in YAMLs.
7. processing/routes.py — hardcoded /resample path is not extensible
Issue #65 specifies POST /ogcapi/processes/{id}/execution. POST /processes/resample is a different surface. When normals is added, this becomes either a second hardcoded endpoint or a full refactor. Moving to /processes/{process_id}/execution now — even with a single registered process — makes the extension path clear.
8. ResampleProcessResponse.status: str is incompatible with the async 202 model in #65
The route always returns status="completed" synchronously. Issue #65 describes an async flow: 202 Accepted + jobID, polled via GET /ogcapi/jobs/{jobID}. Typing status as Literal["completed"] now at least surfaces the mismatch at the schema level when async execution is added.
Suggestions
9. processing/resample.py — DERIVED_DATA_DIR uses __file__-relative path arithmetic
DERIVED_DATA_DIR = Path(__file__).resolve().parent.parent.parent.parent / "data" / "derived"Four levels of .parent from a deeply nested file breaks in wheel installs exactly as the old CONFIGS_DIR did (see CLIM-683). A derived_data_dir config key (with a default relative to the config file) would be consistent with how CLIM-683 resolved datasets_dir.
10. processing/resample.py — calls three private functions from ingestion_services
_default_request_end, _normalize_request_period, and _find_existing_artifact are imported across module boundaries with underscore names. _default_request_end and _normalize_request_period are pure period utilities that belong in shared/time.py. _find_existing_artifact is a registry read that should either be made public or wrapped. This matters especially for #65's ProcessContext, which should surface these capabilities without requiring every process function to know ingestion service internals.
11. processing/resample.py — ds.chunk("auto") diverges from the pipeline's chunk strategy
build_dataset_zarr computes chunk sizes tuned to period_type. chunk("auto") delegates to Dask's heuristic, which has no knowledge of the Climate API's temporal access pattern. Consider passing period_type into _write_resampled_zarr and reusing the same sizing logic.
12. _coord_to_datetime duplicated between resample.py and shared/time.py
Both files define a private function that converts a numpy datetime scalar to a Python datetime. resample.py's version has an isinstance(value, datetime) guard the shared/time.py version lacks. Consolidate into one exported function in shared/time.py (keeping the guard).
13. processing/routes.py — imports private _get_dataset_or_404 from a sibling router
from climate_api.data_registry.routes import _get_dataset_or_404This couples two route modules and bypasses the service layer. Call registry_datasets.get_dataset(dataset_id) directly and raise HTTPException(404) at the call site, matching the pattern in extents/routes.py.
14. processing/routes.py — no sync_kind guard at the route level
If a caller sends a non-derived dataset id, the error surfaces as HTTPException(400, "Dataset ... is not configured for resampling") from deep inside the service. A one-line check at the route entry point:
if dataset.get("sync_kind") != "derived":
raise HTTPException(status_code=400, detail=f"'{request.dataset_id}' is not a derived dataset")makes the intent clear and avoids traversing normalisation and source-resolution logic for the common miscall case. A route-level test for this path is also missing from test_processing_routes.py.
15. ingestions/sync_engine.py — datetime.min.time() in the weekly branch
datetime.min.time() reads oddly — time() or time(0) is clearer that midnight is intended. The daily branch just adds timedelta(days=1) to a date; keeping the weekly branch at the same level of abstraction would be consistent.
16. ingestions/services.py:store_materialized_zarr_artifact — parameters shadowed by normalised versions
start and end are rebound in-place after normalisation. A local name (normalised_start, normalised_end) prevents accidentally using the un-normalised value above the rebind.
Summary
| # | Location | Theme | Kind |
|---|---|---|---|
| 1 | data_registry/services/datasets.py |
importlib.resources regression |
Blocker vs #59 |
| 2 | data_registry/services/datasets.py |
ingestion validation removed |
Blocker vs #59 |
| 3 | main.py |
/extents → /extent prefix |
Blocker vs #59 |
| 4 | main.py |
module-level app vs create_app() |
Blocker vs #59 |
| 5 | data/datasets/*.yaml |
cache_info → ingestion |
Blocker vs #59 |
| 6 | data/datasets/*.yaml |
resample: key vs processing.process_id |
Blocker vs #65 |
| 7 | processing/routes.py |
hardcoded /resample path |
Blocker vs #65 |
| 8 | processing/schemas.py |
status: str vs async 202 model |
Blocker vs #65 |
| 9 | processing/resample.py |
__file__-relative DERIVED_DATA_DIR |
Suggestion |
| 10 | processing/resample.py |
private cross-module calls | Suggestion |
| 11 | processing/resample.py |
chunk("auto") inconsistency |
Suggestion |
| 12 | resample.py / shared/time.py |
_coord_to_datetime duplication |
Suggestion |
| 13 | processing/routes.py |
private import from sibling router | Suggestion |
| 14 | processing/routes.py |
missing sync_kind guard |
Suggestion |
| 15 | ingestions/sync_engine.py |
datetime.min.time() clarity |
Suggestion |
| 16 | ingestions/services.py |
parameter shadowing | Suggestion |
Summary
This PR adds temporal resampling as a first-class derived-data workflow in Climate API. It allows coarser-period GeoZarr datasets to be materialized from existing locally managed source datasets and exposed through the normal managed dataset, STAC, and OGC API surfaces.
Initial derived templates included here:
chirps3_precipitation_weeklychirps3_precipitation_monthlyera5land_temperature_dailyera5land_precipitation_dailyWhat Changed
sync_kind: derivedto dataset registry and sync schemasresampleblocksPOST /processes/resampledata/derivedin git since it is generated runtime stateBehavior
/processes/resampledata/derived/{managed_dataset_id}.zarrpublish=truepublishes the derived artifact immediately after materialization/ogcapi/collectionsand the other normal publication surfacespublish=falsekeeps the derived dataset materialized but unpublishedoverwrite=falsereuses an existing matching artifactoverwrite=truerematerializes the derived Zarr and updates the artifact record