Skip to content

Implement temporal resampling for derived managed datasets [CLIM-679]#63

Open
abyot wants to merge 6 commits intomainfrom
temporal-resampling
Open

Implement temporal resampling for derived managed datasets [CLIM-679]#63
abyot wants to merge 6 commits intomainfrom
temporal-resampling

Conversation

@abyot
Copy link
Copy Markdown
Member

@abyot abyot commented May 6, 2026

Summary

This PR adds temporal resampling as a first-class derived-data workflow in Climate API. It allows coarser-period GeoZarr datasets to be materialized from existing locally managed source datasets and exposed through the normal managed dataset, STAC, and OGC API surfaces.

Initial derived templates included here:

  • chirps3_precipitation_weekly
  • chirps3_precipitation_monthly
  • era5land_temperature_daily
  • era5land_precipitation_daily

What Changed

  • added sync_kind: derived to dataset registry and sync schemas
  • added registry validation for resample blocks
  • added weekly period support across shared time helpers and sync planning helpers
  • added a local resampling materialization service for derived Zarr outputs
  • added artifact persistence for locally materialized derived datasets
  • added POST /processes/resample
  • added real derived dataset YAML templates for CHIRPS and ERA5-Land
  • ignored data/derived in git since it is generated runtime state

Behavior

  • resampling uses already-managed local source artifacts only
  • no upstream download is triggered by /processes/resample
  • output is written under data/derived/{managed_dataset_id}.zarr
  • derived artifacts are stored in the normal artifact registry
  • publish=true publishes the derived artifact immediately after materialization
  • published derived datasets appear in /ogcapi/collections and the other normal publication surfaces
  • publish=false keeps the derived dataset materialized but unpublished
  • incomplete trailing target periods are dropped
  • overwrite=false reuses an existing matching artifact
  • overwrite=true rematerializes the derived Zarr and updates the artifact record

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces temporal resampling as a first-class “derived dataset” workflow in Climate API, enabling locally managed Zarr artifacts to be materialized into coarser temporal resolutions (e.g., hourly→daily, daily→weekly/monthly) and surfaced through the existing managed-dataset/publication surfaces.

Changes:

  • Adds sync_kind: derived + resample validation to the dataset registry and corresponding schemas/logic, including new derived dataset templates for CHIRPS3 and ERA5-Land.
  • Extends shared time utilities + sync planning helpers to support ISO weekly periods (YYYY-Www).
  • Adds a new synchronous derived processing API (POST /processes/resample) plus a local resampling/materialization service that persists derived Zarr artifacts and registers them in the artifact store.

Reviewed changes

Copilot reviewed 20 out of 21 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
tests/test_shared_time.py Adds weekly-period parsing/normalization test coverage.
tests/test_processing_routes.py Adds route-level tests for POST /processes/resample.
tests/test_processing_resample.py Adds resampling/materialization tests (hourly→daily, daily→weekly/monthly, overwrite/reuse cases).
tests/test_datasets_sync.py Adds weekly support tests for sync planning helpers and derived sync-kind behavior.
tests/test_dataset_registry.py Adds validation tests for sync_kind: derived and resample blocks.
src/climate_api/shared/time.py Implements weekly period support and weekly-aware numpy period conversion.
src/climate_api/publications/services.py Extracts managed_dataset_id_for_scope() for stable managed dataset IDs from scope inputs.
src/climate_api/processing/init.py Introduces the processing package marker.
src/climate_api/processing/routes.py Adds /processes/resample route handler.
src/climate_api/processing/schemas.py Adds Pydantic request/response models for the resample process API.
src/climate_api/processing/services.py Adds service wrapper to run resample materialization and return dataset summaries.
src/climate_api/processing/resample.py Implements the derived resampling materialization pipeline and Zarr persistence.
src/climate_api/main.py Mounts the new processing router under /processes.
src/climate_api/ingestions/sync_engine.py Adds weekly handling in _default_target_end / _next_period_start, and marks derived datasets as not syncable (for now).
src/climate_api/ingestions/services.py Adds derived artifact storage helper and overwrite-aware artifact-record upsert support.
src/climate_api/ingestions/schemas.py Adds SyncKind.DERIVED.
src/climate_api/data_registry/services/datasets.py Adds dataset-template validation for resample blocks and supported derived sync kind.
src/climate_api/data_accessor/services/accessor.py Normalizes temporal coverage scalar conversion for numpy/xarray outputs (incl. weekly).
data/datasets/era5_land.yaml Adds derived daily ERA5-Land templates (temperature + precipitation).
data/datasets/chirps3.yaml Adds derived weekly/monthly CHIRPS3 precipitation templates.
.gitignore Ignores runtime-generated data/derived.

),
prefer_zarr=True,
)
if existing is not None and not overwrite:
Comment on lines +54 to +61
existing = ingestion_services._find_existing_artifact(
dataset_id=str(target_dataset["id"]),
request_scope=ArtifactRequestScope(
start=normalized_start,
end=normalized_end,
extent_id=extent_id,
bbox=(bbox[0], bbox[1], bbox[2], bbox[3]) if bbox is not None and extent_id is None else None,
),
Comment on lines +164 to +173
time_dim = get_time_dim(source_ds)
target_end_exclusive = parse_period_string_to_datetime(
sync_engine._next_period_start(end, period_type=target_period_type)
).replace(tzinfo=None)
target_start = parse_period_string_to_datetime(start).replace(tzinfo=None)
subset = source_ds.where(source_ds[time_dim] >= np.datetime64(target_start), drop=True)
subset = subset.where(subset[time_dim] < np.datetime64(target_end_exclusive), drop=True)
if subset.sizes.get(time_dim, 0) == 0:
raise HTTPException(status_code=409, detail="Source artifact contains no data for the requested resample range")
source_end = _coord_to_datetime(subset[time_dim].values[-1])
Comment thread src/climate_api/ingestions/services.py Outdated
DATA_DIR = Path(__file__).resolve().parent.parent.parent.parent / "data"
ARTIFACTS_DIR = DATA_DIR / "artifacts"
ARTIFACTS_INDEX_PATH = ARTIFACTS_DIR / "records.json"
DERIVED_DIR = DATA_DIR / "derived"
Copy link
Copy Markdown
Contributor

@turban turban left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a solid foundation for derived datasets. The core resampling logic is well-structured, the test coverage (26 new test functions across test_processing_resample.py and test_processing_routes.py) is thorough, and the week/month edge-case handling in _drop_incomplete_edge_periods is the kind of correctness work that often gets skipped. The observations below are organised by theme.

Blockers are things that will cause a direct conflict or rework when PR #59 (CLIM-683) or the process registry (#65) lands. Suggestions are improvements that aren't blocking.


Blockers — conflicts with PR #59 (CLIM-683)

1. data_registry/services/datasets.py — reverts the importlib.resources and CLIMATE_API_CONFIG work

PR #63 is branched from main before CLIM-683 merges. The version of datasets.py on this branch retains the Path(__file__)-relative CONFIGS_DIR constant and a list_datasets() that knows nothing about CLIMATE_API_CONFIG. CLIM-683 replaces this with importlib.resources for bundled data and a merged overlay from CLIMATE_API_CONFIG.datasets_dir. When rebasing, keep the _load_builtin_datasets() / _load_from_dir() split, and integrate _validate_resample into the merged _validate_dataset_template.

2. data_registry/services/datasets.pyingestion block validation is absent

CLIM-683 requires an ingestion.eo_function key for non-derived templates. PR #63's validator doesn't have this (it predates the rename from cache_info). After rebase, validation needs a conditional:

if sync_kind != "derived":
    # require ingestion.eo_function (from CLIM-683)
else:
    # require resample block (from PR #63)

Without this, a non-derived template with a missing eo_function will load silently and fail only when the sync engine tries to call it.

3. main.py/extents prefix reverts to pre-CLIM-683 form

CLIM-683 changes the route prefix from /extents to /extent (singular). PR #63 re-registers it at /extents. After rebase this should be /extent.

4. main.py — module-level app instead of create_app() factory

CLIM-683 wraps all router registration inside a create_app() factory function. The new processing_routes import and include_router call should go inside create_app(), not at module level.

5. Dataset YAMLs — cache_info retained; ingestion absent on derived templates

CLIM-683 renames cache_info to ingestion across all source dataset templates. The derived templates have neither, which is correct — but only once validation correctly exempts derived from the ingestion requirement (see point 2).


Blockers — conflicts with issue #65 (process registry)

6. resample YAML key will need renaming when the process registry lands

Issue #65 proposes a two-level design:

sync_kind: derived
processing:
  process_id: resample
  params:
    method: mean
    period: monthly

PR #63 uses a top-level resample: key. When normals or any second process is added, it needs its own top-level key and its own dispatch branch. Renaming resample: to processing: now (keeping the contents the same, adding process_id: resample) makes the #65 migration a schema addition rather than a rename across all built-in YAMLs.

7. processing/routes.py — hardcoded /resample path is not extensible

Issue #65 specifies POST /ogcapi/processes/{id}/execution. POST /processes/resample is a different surface. When normals is added, this becomes either a second hardcoded endpoint or a full refactor. Moving to /processes/{process_id}/execution now — even with a single registered process — makes the extension path clear.

8. ResampleProcessResponse.status: str is incompatible with the async 202 model in #65

The route always returns status="completed" synchronously. Issue #65 describes an async flow: 202 Accepted + jobID, polled via GET /ogcapi/jobs/{jobID}. Typing status as Literal["completed"] now at least surfaces the mismatch at the schema level when async execution is added.


Suggestions

9. processing/resample.pyDERIVED_DATA_DIR uses __file__-relative path arithmetic

DERIVED_DATA_DIR = Path(__file__).resolve().parent.parent.parent.parent / "data" / "derived"

Four levels of .parent from a deeply nested file breaks in wheel installs exactly as the old CONFIGS_DIR did (see CLIM-683). A derived_data_dir config key (with a default relative to the config file) would be consistent with how CLIM-683 resolved datasets_dir.

10. processing/resample.py — calls three private functions from ingestion_services

_default_request_end, _normalize_request_period, and _find_existing_artifact are imported across module boundaries with underscore names. _default_request_end and _normalize_request_period are pure period utilities that belong in shared/time.py. _find_existing_artifact is a registry read that should either be made public or wrapped. This matters especially for #65's ProcessContext, which should surface these capabilities without requiring every process function to know ingestion service internals.

11. processing/resample.pyds.chunk("auto") diverges from the pipeline's chunk strategy

build_dataset_zarr computes chunk sizes tuned to period_type. chunk("auto") delegates to Dask's heuristic, which has no knowledge of the Climate API's temporal access pattern. Consider passing period_type into _write_resampled_zarr and reusing the same sizing logic.

12. _coord_to_datetime duplicated between resample.py and shared/time.py

Both files define a private function that converts a numpy datetime scalar to a Python datetime. resample.py's version has an isinstance(value, datetime) guard the shared/time.py version lacks. Consolidate into one exported function in shared/time.py (keeping the guard).

13. processing/routes.py — imports private _get_dataset_or_404 from a sibling router

from climate_api.data_registry.routes import _get_dataset_or_404

This couples two route modules and bypasses the service layer. Call registry_datasets.get_dataset(dataset_id) directly and raise HTTPException(404) at the call site, matching the pattern in extents/routes.py.

14. processing/routes.py — no sync_kind guard at the route level

If a caller sends a non-derived dataset id, the error surfaces as HTTPException(400, "Dataset ... is not configured for resampling") from deep inside the service. A one-line check at the route entry point:

if dataset.get("sync_kind") != "derived":
    raise HTTPException(status_code=400, detail=f"'{request.dataset_id}' is not a derived dataset")

makes the intent clear and avoids traversing normalisation and source-resolution logic for the common miscall case. A route-level test for this path is also missing from test_processing_routes.py.

15. ingestions/sync_engine.pydatetime.min.time() in the weekly branch

datetime.min.time() reads oddly — time() or time(0) is clearer that midnight is intended. The daily branch just adds timedelta(days=1) to a date; keeping the weekly branch at the same level of abstraction would be consistent.

16. ingestions/services.py:store_materialized_zarr_artifact — parameters shadowed by normalised versions

start and end are rebound in-place after normalisation. A local name (normalised_start, normalised_end) prevents accidentally using the un-normalised value above the rebind.


Summary

# Location Theme Kind
1 data_registry/services/datasets.py importlib.resources regression Blocker vs #59
2 data_registry/services/datasets.py ingestion validation removed Blocker vs #59
3 main.py /extents/extent prefix Blocker vs #59
4 main.py module-level app vs create_app() Blocker vs #59
5 data/datasets/*.yaml cache_infoingestion Blocker vs #59
6 data/datasets/*.yaml resample: key vs processing.process_id Blocker vs #65
7 processing/routes.py hardcoded /resample path Blocker vs #65
8 processing/schemas.py status: str vs async 202 model Blocker vs #65
9 processing/resample.py __file__-relative DERIVED_DATA_DIR Suggestion
10 processing/resample.py private cross-module calls Suggestion
11 processing/resample.py chunk("auto") inconsistency Suggestion
12 resample.py / shared/time.py _coord_to_datetime duplication Suggestion
13 processing/routes.py private import from sibling router Suggestion
14 processing/routes.py missing sync_kind guard Suggestion
15 ingestions/sync_engine.py datetime.min.time() clarity Suggestion
16 ingestions/services.py parameter shadowing Suggestion

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants