Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0e57332
Reapply "feat: extensible transforms pipeline for zarr build"
turban May 9, 2026
4110792
fix: move transforms package to flat layout (climate_api/transforms/)
turban May 9, 2026
470b549
Reapply "feat: temporal resampling for derived datasets (CLIM-679)"
turban May 9, 2026
686b42f
fix: move processing package to flat layout (climate_api/processing/)
turban May 9, 2026
74a8b50
Merge branch 'main' into restore/temporal-resampling
turban May 9, 2026
18b4f0d
Merge branch 'main' into restore/transforms-pipeline
turban May 9, 2026
f4d3021
refactor: decouple temporal resampling from dataset templates
turban May 9, 2026
c8e0d2d
refactor: replace period_type+week_start with pandas frequency alias
turban May 9, 2026
5fde859
chore: merge main into restore/temporal-resampling
turban May 9, 2026
3f96a3e
chore: merge restore/transforms-pipeline into restore/temporal-resamp…
turban May 9, 2026
4bf7fe1
Merge remote-tracking branch 'origin/main' into restore/temporal-resa…
turban May 9, 2026
842b683
feat: process registry with plugin support — mirrors dataset registry…
turban May 9, 2026
cb74557
Merge remote-tracking branch 'origin/main' into restore/temporal-resa…
turban May 9, 2026
93cf871
fix: add return type annotation to _get_dynamic_function
turban May 9, 2026
fd76922
feat: add reproject_to_instance_crs transform to zarr build pipeline
turban May 9, 2026
771a053
Merge pull request #93 from dhis2/feat/reproject-transform
turban May 9, 2026
3d439c2
refactor: make reprojection implicit in build_dataset_zarr pipeline
turban May 9, 2026
f0bc91d
fix: address Copilot review issues in transforms, resample, and proce…
turban May 9, 2026
ea525b4
refactor: nest sync_kind/sync_execution/sync_availability under sync:…
turban May 9, 2026
ac664ae
refactor: remove deaccumulate_era5 transform from this PR
turban May 9, 2026
6277a0a
refactor: replace generic convert_units with named kelvin_to_celsius …
turban May 9, 2026
1845b2f
docs: add extensibility guide and update adding_custom_datasets for n…
turban May 9, 2026
0babfff
fix: pre-merge cleanup from PR review
turban May 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ climate-api.yaml
eo_api.egg-info/
data/downloads
data/artifacts
data/derived
data/pygeoapi
docs/internal/
site/
9 changes: 5 additions & 4 deletions climate_api/data/datasets/chirps3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
short_name: Total precipitation
variable: precip
period_type: daily
sync_kind: temporal
sync_execution: append
sync_availability:
latest_available_function: climate_api.providers.availability.chirps3_daily_latest_available
sync:
kind: temporal
execution: append
availability:
latest_available_function: climate_api.providers.availability.chirps3_daily_latest_available
extents:
spatial:
bbox: [-180, -50, 180, 50]
Expand Down
35 changes: 19 additions & 16 deletions climate_api/data/datasets/era5_land.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
short_name: 2m temperature
variable: t2m
period_type: hourly
sync_kind: temporal
sync_execution: append
sync_availability:
latest_available_function: climate_api.providers.availability.lagged_latest_available
lag_hours: 120
sync:
kind: temporal
execution: append
availability:
latest_available_function: climate_api.providers.availability.lagged_latest_available
lag_hours: 120
extents:
spatial:
bbox: [-180, -90, 180, 90]
Expand All @@ -20,25 +21,27 @@
function: dhis2eo.data.destine.era5_land.hourly.download
default_params:
variables: ['t2m']
units: kelvin
convert_units: degC
transforms:
- climate_api.transforms.kelvin_to_celsius
units: degC
resolution: 9 km x 9 km
source: ERA5-Land Reanalysis
source_url: https://earthdatahub.destine.eu/collections/era5/datasets/reanalysis-era5-land
display:
colormap: rdbu_r
range: [233.0, 313.0]
range: [15.0, 40.0]

- id: era5land_precipitation_hourly
name: Total precipitation (ERA5-Land)
short_name: Total precipitation
variable: tp
period_type: hourly
sync_kind: temporal
sync_execution: append
sync_availability:
latest_available_function: climate_api.providers.availability.lagged_latest_available
lag_hours: 120
sync:
kind: temporal
execution: append
availability:
latest_available_function: climate_api.providers.availability.lagged_latest_available
lag_hours: 120
extents:
spatial:
bbox: [-180, -90, 180, 90]
Expand All @@ -51,9 +54,9 @@
function: dhis2eo.data.destine.era5_land.hourly.download
default_params:
variables: ['tp']
pre_process: ['deaccumulate_era5']
units: m
convert_units: mm
transforms:
- climate_api.transforms.metres_to_mm
units: mm
resolution: 9 km x 9 km
source: ERA5-Land Reanalysis
source_url: https://earthdatahub.destine.eu/collections/era5/datasets/reanalysis-era5-land
Expand Down
11 changes: 6 additions & 5 deletions climate_api/data/datasets/worldpop.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
short_name: Total population
variable: pop_total
period_type: yearly
sync_kind: release
sync_availability:
latest_available_function: climate_api.providers.availability.worldpop_release_latest_available
# WorldPop projections are intentionally request-driven for future years.
allow_future: true
sync:
kind: release
availability:
latest_available_function: climate_api.providers.availability.worldpop_release_latest_available
# WorldPop projections are intentionally request-driven for future years.
allow_future: true
extents:
spatial:
bbox: [-180, -90, 180, 90]
Expand Down
4 changes: 4 additions & 0 deletions climate_api/data/processes/resample.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- id: resample
name: Temporal resampling
description: Aggregate a source dataset to a coarser temporal resolution using pandas frequency aliases (e.g. 1D, W-MON, MS).
execution_function: climate_api.processing.services.execute_resample
12 changes: 10 additions & 2 deletions climate_api/data_accessor/services/accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import tempfile
from typing import Any

import numpy as np
import xarray as xr

from ...data_manager.services.downloader import get_cache_files, get_zarr_path
Expand Down Expand Up @@ -135,8 +136,8 @@ def _coverage_from_dataset(*, ds: xr.Dataset, period_type: str) -> dict[str, Any
time_dim = get_time_dim(ds)
lon_dim, lat_dim = get_lon_lat_dims(ds)

start = numpy_datetime_to_period_string(ds[time_dim].min(), period_type) # type: ignore[arg-type]
end = numpy_datetime_to_period_string(ds[time_dim].max(), period_type) # type: ignore[arg-type]
start = _period_string_scalar(numpy_datetime_to_period_string(ds[time_dim].min(), period_type)) # type: ignore[arg-type]
end = _period_string_scalar(numpy_datetime_to_period_string(ds[time_dim].max(), period_type)) # type: ignore[arg-type]

xmin, xmax = ds[lon_dim].min().item(), ds[lon_dim].max().item()
ymin, ymax = ds[lat_dim].min().item(), ds[lat_dim].max().item()
Expand All @@ -150,6 +151,13 @@ def _coverage_from_dataset(*, ds: xr.Dataset, period_type: str) -> dict[str, Any
}


def _period_string_scalar(value: Any) -> str:
"""Normalize a numpy scalar or 0-d array period string to plain Python str."""
if isinstance(value, np.ndarray):
return str(value.item())
return str(value)


def xarray_to_temporary_netcdf(ds: xr.Dataset) -> str:
"""Write a dataset to a temporary NetCDF file and return the path."""
fd = tempfile.NamedTemporaryFile(suffix=".nc", delete=False)
Expand Down
28 changes: 28 additions & 0 deletions climate_api/data_manager/services/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from topozarr.coarsen import create_pyramid

from climate_api import config as api_config
from climate_api.transforms.reproject import reproject_to_instance_crs

from .utils import get_lon_lat_dims, get_time_dim

Expand Down Expand Up @@ -139,6 +140,10 @@ def build_dataset_zarr(dataset: dict[str, Any], *, start: str | None = None, end
dims = [lon_dim, lat_dim]

ds = _select_time_range(ds, dataset=dataset, start=start, end=end)
ds = _run_transforms(ds, dataset)

source_crs: str = dataset.get("source_crs", "EPSG:4326")
ds = reproject_to_instance_crs(ds, dataset, source_crs=source_crs)

xmin = ds[lon_dim].min().item()
xmax = ds[lon_dim].max().item()
Expand Down Expand Up @@ -239,6 +244,29 @@ def _select_time_range(
return selected


def _run_transforms(ds: xr.Dataset, dataset: dict[str, Any]) -> xr.Dataset:
dataset_id = dataset.get("id", "?")
for entry in dataset.get("transforms", []):
if isinstance(entry, str):
func_path, params = entry, {}
elif isinstance(entry, dict):
if "function" not in entry:
raise ValueError(
f"Transform entry in dataset '{dataset_id}' is missing required key 'function': {entry!r}"
)
func_path = entry["function"]
params = entry.get("params", {})
else:
raise ValueError(
f"Transform entry in dataset '{dataset_id}' must be a string or dict,"
f" got {type(entry).__name__!r}: {entry!r}"
)
func = _get_dynamic_function(func_path)
logger.info("Applying transform %s to dataset %s", func_path, dataset_id)
ds = func(ds, dataset, **params)
return ds
Comment thread
turban marked this conversation as resolved.


def _compute_time_space_chunks(
ds: xr.Dataset,
dataset: dict[str, Any],
Expand Down
19 changes: 10 additions & 9 deletions climate_api/data_registry/services/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,24 +127,25 @@ def _validate_dataset_template(dataset: object, *, source: str) -> None:
dataset_id = dataset.get("id")
if not isinstance(dataset_id, str) or not dataset_id:
raise ValueError(f"{source} contains a dataset template with a missing or invalid id")
sync_kind = dataset.get("sync_kind")
sync_block = dataset.get("sync", {})
sync_kind = sync_block.get("kind") if isinstance(sync_block, dict) else None
if not isinstance(sync_kind, str) or not sync_kind:
raise ValueError(f"Dataset template '{dataset_id}' in {source} must define sync_kind")
raise ValueError(f"Dataset template '{dataset_id}' in {source} must define sync.kind")
if sync_kind not in SUPPORTED_SYNC_KINDS:
supported = ", ".join(sorted(SUPPORTED_SYNC_KINDS))
raise ValueError(
f"Dataset template '{dataset_id}' in {source} has unsupported sync_kind "
f"Dataset template '{dataset_id}' in {source} has unsupported sync.kind "
f"'{sync_kind}'. Supported values: {supported}"
)

sync_execution = dataset.get("sync_execution")
sync_execution = sync_block.get("execution") if isinstance(sync_block, dict) else None
if sync_execution is not None:
if not isinstance(sync_execution, str) or not sync_execution:
raise ValueError(f"Dataset template '{dataset_id}' in {source} has invalid sync_execution")
raise ValueError(f"Dataset template '{dataset_id}' in {source} has invalid sync.execution")
if sync_execution not in SUPPORTED_SYNC_EXECUTIONS:
supported = ", ".join(sorted(SUPPORTED_SYNC_EXECUTIONS))
raise ValueError(
f"Dataset template '{dataset_id}' in {source} has unsupported sync_execution "
f"Dataset template '{dataset_id}' in {source} has unsupported sync.execution "
f"'{sync_execution}'. Supported values: {supported}"
)

Expand All @@ -155,20 +156,20 @@ def _validate_dataset_template(dataset: object, *, source: str) -> None:
if not isinstance(function, str) or not function:
raise ValueError(f"Dataset template '{dataset_id}' in {source} must define ingestion.function")

sync_availability = dataset.get("sync_availability")
sync_availability = sync_block.get("availability") if isinstance(sync_block, dict) else None
if sync_availability is not None:
_validate_sync_availability(sync_availability, dataset_id=dataset_id, source=source)


def _validate_sync_availability(sync_availability: object, *, dataset_id: str, source: str) -> None:
"""Validate optional source availability policy metadata."""
if not isinstance(sync_availability, dict):
raise ValueError(f"Dataset template '{dataset_id}' in {source} has invalid sync_availability")
raise ValueError(f"Dataset template '{dataset_id}' in {source} has invalid sync.availability")

latest_available_function = sync_availability.get("latest_available_function")
if latest_available_function is not None and (
not isinstance(latest_available_function, str) or not latest_available_function
):
raise ValueError(
f"Dataset template '{dataset_id}' in {source} has invalid sync_availability.latest_available_function"
f"Dataset template '{dataset_id}' in {source} has invalid sync.availability.latest_available_function"
)
136 changes: 136 additions & 0 deletions climate_api/data_registry/services/processes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""Process registry backed by YAML config files."""

import importlib
import importlib.resources
import logging
import sys
from pathlib import Path
from typing import Any

import yaml

from climate_api import config as api_config

logger = logging.getLogger(__name__)

# Overridden in tests via monkeypatch to point to a temporary directory.
# When set, only this directory is loaded (no built-ins, no config override).
CONFIGS_DIR: Path | None = None


def list_processes() -> list[dict[str, Any]]:
"""Load all process definitions and return a flat list.

Built-in definitions from climate_api/data/processes/ are always loaded. When
plugins_dir is set in CLIMATE_API_CONFIG, definitions from plugins_dir/processes/
are merged on top — a custom definition with the same id overrides the built-in.

CONFIGS_DIR (test override via monkeypatch) bypasses this and loads only
from the given directory.
"""
if CONFIGS_DIR is not None:
return _load_from_dir(CONFIGS_DIR)

merged: dict[str, dict[str, Any]] = {p["id"]: p for p in _load_builtin_processes()}
Comment thread
turban marked this conversation as resolved.

config = api_config.get_config()
config_plugins_dir = config.get("plugins_dir")
if config_plugins_dir:
if not isinstance(config_plugins_dir, (str, Path)):
raise ValueError(
f"plugins_dir in CLIMATE_API_CONFIG must be a path string, got {type(config_plugins_dir).__name__}"
)
config_path = api_config.get_config_path()
base = config_path.parent if config_path else Path()
root = (base / config_plugins_dir).resolve()
if not root.is_dir():
raise ValueError(f"plugins_dir '{root}' does not exist or is not a directory")
root_str = str(root)
if root_str not in sys.path:
sys.path.append(root_str)
processes_subdir = root / "processes"
if processes_subdir.is_dir():
for process in _load_from_dir(processes_subdir):
merged[process["id"]] = process

return list(merged.values())


def get_process(process_id: str) -> dict[str, Any] | None:
"""Get process definition for a given id."""
return {p["id"]: p for p in list_processes()}.get(process_id)
Comment thread
turban marked this conversation as resolved.


def _load_builtin_processes() -> list[dict[str, Any]]:
"""Load built-in process definitions from package data via importlib.resources."""
pkg = importlib.resources.files("climate_api") / "data" / "processes"
processes: list[dict[str, Any]] = []
for resource in pkg.iterdir():
if not resource.name.endswith((".yaml", ".yml")):
continue
try:
content = resource.read_text(encoding="utf-8")
file_processes = yaml.safe_load(content)
if not isinstance(file_processes, list):
raise ValueError(f"{resource.name} must contain a list of process definitions")
for process in file_processes:
_validate_process(process, source=resource.name)
processes.extend(file_processes)
except Exception:
logger.exception("Error loading %s", resource.name)
raise
return processes


def _load_from_dir(folder: Path) -> list[dict[str, Any]]:
"""Load process definitions from a directory on disk."""
processes: list[dict[str, Any]] = []

if not folder.is_dir():
raise ValueError(f"Path is not a directory: {folder}")

for file_path in folder.glob("*.y*ml"):
try:
with open(file_path, encoding="utf-8") as f:
file_processes = yaml.safe_load(f)
if not isinstance(file_processes, list):
raise ValueError(f"{file_path.name} must contain a list of process definitions")
for process in file_processes:
_validate_process(process, source=str(file_path))
processes.extend(file_processes)
except Exception:
logger.exception("Error loading %s", file_path.name)
raise

return processes


def _validate_process(process: object, *, source: str) -> None:
"""Validate a process definition dict."""
if not isinstance(process, dict):
raise ValueError(f"{source} contains a non-object process definition")

process_id = process.get("id")
if not isinstance(process_id, str) or not process_id:
raise ValueError(f"{source} contains a process definition with a missing or invalid id")

name = process.get("name")
if not isinstance(name, str) or not name:
raise ValueError(f"Process '{process_id}' in {source} must define name")

execution_function = process.get("execution_function")
if not isinstance(execution_function, str) or not execution_function:
raise ValueError(f"Process '{process_id}' in {source} must define execution_function")


def _get_dynamic_function(full_path: str) -> Any:
"""Import and return a function given its dotted module path."""
parts = [p for p in full_path.split(".") if p]
if len(parts) < 2:
raise ValueError(
f"execution_function must be a dotted path with at least one module and one attribute, got '{full_path}'"
)
module_path = ".".join(parts[:-1])
function_name = parts[-1]
module = importlib.import_module(module_path)
return getattr(module, function_name)
Loading
Loading