-
Notifications
You must be signed in to change notification settings - Fork 0
feat: extensible transforms, process registry, temporal resampling, and reprojection (CLIM-679) #87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
0e57332
Reapply "feat: extensible transforms pipeline for zarr build"
turban 4110792
fix: move transforms package to flat layout (climate_api/transforms/)
turban 470b549
Reapply "feat: temporal resampling for derived datasets (CLIM-679)"
turban 686b42f
fix: move processing package to flat layout (climate_api/processing/)
turban 74a8b50
Merge branch 'main' into restore/temporal-resampling
turban 18b4f0d
Merge branch 'main' into restore/transforms-pipeline
turban f4d3021
refactor: decouple temporal resampling from dataset templates
turban c8e0d2d
refactor: replace period_type+week_start with pandas frequency alias
turban 5fde859
chore: merge main into restore/temporal-resampling
turban 3f96a3e
chore: merge restore/transforms-pipeline into restore/temporal-resamp…
turban 4bf7fe1
Merge remote-tracking branch 'origin/main' into restore/temporal-resa…
turban 842b683
feat: process registry with plugin support — mirrors dataset registry…
turban cb74557
Merge remote-tracking branch 'origin/main' into restore/temporal-resa…
turban 93cf871
fix: add return type annotation to _get_dynamic_function
turban fd76922
feat: add reproject_to_instance_crs transform to zarr build pipeline
turban 771a053
Merge pull request #93 from dhis2/feat/reproject-transform
turban 3d439c2
refactor: make reprojection implicit in build_dataset_zarr pipeline
turban f0bc91d
fix: address Copilot review issues in transforms, resample, and proce…
turban ea525b4
refactor: nest sync_kind/sync_execution/sync_availability under sync:…
turban ac664ae
refactor: remove deaccumulate_era5 transform from this PR
turban 6277a0a
refactor: replace generic convert_units with named kelvin_to_celsius …
turban 1845b2f
docs: add extensibility guide and update adding_custom_datasets for n…
turban 0babfff
fix: pre-merge cleanup from PR review
turban File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ climate-api.yaml | |
| eo_api.egg-info/ | ||
| data/downloads | ||
| data/artifacts | ||
| data/derived | ||
| data/pygeoapi | ||
| docs/internal/ | ||
| site/ | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| - id: resample | ||
| name: Temporal resampling | ||
| description: Aggregate a source dataset to a coarser temporal resolution using pandas frequency aliases (e.g. 1D, W-MON, MS). | ||
| execution_function: climate_api.processing.services.execute_resample |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,136 @@ | ||
| """Process registry backed by YAML config files.""" | ||
|
|
||
| import importlib | ||
| import importlib.resources | ||
| import logging | ||
| import sys | ||
| from pathlib import Path | ||
| from typing import Any | ||
|
|
||
| import yaml | ||
|
|
||
| from climate_api import config as api_config | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| # Overridden in tests via monkeypatch to point to a temporary directory. | ||
| # When set, only this directory is loaded (no built-ins, no config override). | ||
| CONFIGS_DIR: Path | None = None | ||
|
|
||
|
|
||
| def list_processes() -> list[dict[str, Any]]: | ||
| """Load all process definitions and return a flat list. | ||
|
|
||
| Built-in definitions from climate_api/data/processes/ are always loaded. When | ||
| plugins_dir is set in CLIMATE_API_CONFIG, definitions from plugins_dir/processes/ | ||
| are merged on top — a custom definition with the same id overrides the built-in. | ||
|
|
||
| CONFIGS_DIR (test override via monkeypatch) bypasses this and loads only | ||
| from the given directory. | ||
| """ | ||
| if CONFIGS_DIR is not None: | ||
| return _load_from_dir(CONFIGS_DIR) | ||
|
|
||
| merged: dict[str, dict[str, Any]] = {p["id"]: p for p in _load_builtin_processes()} | ||
|
turban marked this conversation as resolved.
|
||
|
|
||
| config = api_config.get_config() | ||
| config_plugins_dir = config.get("plugins_dir") | ||
| if config_plugins_dir: | ||
| if not isinstance(config_plugins_dir, (str, Path)): | ||
| raise ValueError( | ||
| f"plugins_dir in CLIMATE_API_CONFIG must be a path string, got {type(config_plugins_dir).__name__}" | ||
| ) | ||
| config_path = api_config.get_config_path() | ||
| base = config_path.parent if config_path else Path() | ||
| root = (base / config_plugins_dir).resolve() | ||
| if not root.is_dir(): | ||
| raise ValueError(f"plugins_dir '{root}' does not exist or is not a directory") | ||
| root_str = str(root) | ||
| if root_str not in sys.path: | ||
| sys.path.append(root_str) | ||
| processes_subdir = root / "processes" | ||
| if processes_subdir.is_dir(): | ||
| for process in _load_from_dir(processes_subdir): | ||
| merged[process["id"]] = process | ||
|
|
||
| return list(merged.values()) | ||
|
|
||
|
|
||
| def get_process(process_id: str) -> dict[str, Any] | None: | ||
| """Get process definition for a given id.""" | ||
| return {p["id"]: p for p in list_processes()}.get(process_id) | ||
|
turban marked this conversation as resolved.
|
||
|
|
||
|
|
||
| def _load_builtin_processes() -> list[dict[str, Any]]: | ||
| """Load built-in process definitions from package data via importlib.resources.""" | ||
| pkg = importlib.resources.files("climate_api") / "data" / "processes" | ||
| processes: list[dict[str, Any]] = [] | ||
| for resource in pkg.iterdir(): | ||
| if not resource.name.endswith((".yaml", ".yml")): | ||
| continue | ||
| try: | ||
| content = resource.read_text(encoding="utf-8") | ||
| file_processes = yaml.safe_load(content) | ||
| if not isinstance(file_processes, list): | ||
| raise ValueError(f"{resource.name} must contain a list of process definitions") | ||
| for process in file_processes: | ||
| _validate_process(process, source=resource.name) | ||
| processes.extend(file_processes) | ||
| except Exception: | ||
| logger.exception("Error loading %s", resource.name) | ||
| raise | ||
| return processes | ||
|
|
||
|
|
||
| def _load_from_dir(folder: Path) -> list[dict[str, Any]]: | ||
| """Load process definitions from a directory on disk.""" | ||
| processes: list[dict[str, Any]] = [] | ||
|
|
||
| if not folder.is_dir(): | ||
| raise ValueError(f"Path is not a directory: {folder}") | ||
|
|
||
| for file_path in folder.glob("*.y*ml"): | ||
| try: | ||
| with open(file_path, encoding="utf-8") as f: | ||
| file_processes = yaml.safe_load(f) | ||
| if not isinstance(file_processes, list): | ||
| raise ValueError(f"{file_path.name} must contain a list of process definitions") | ||
| for process in file_processes: | ||
| _validate_process(process, source=str(file_path)) | ||
| processes.extend(file_processes) | ||
| except Exception: | ||
| logger.exception("Error loading %s", file_path.name) | ||
| raise | ||
|
|
||
| return processes | ||
|
|
||
|
|
||
| def _validate_process(process: object, *, source: str) -> None: | ||
| """Validate a process definition dict.""" | ||
| if not isinstance(process, dict): | ||
| raise ValueError(f"{source} contains a non-object process definition") | ||
|
|
||
| process_id = process.get("id") | ||
| if not isinstance(process_id, str) or not process_id: | ||
| raise ValueError(f"{source} contains a process definition with a missing or invalid id") | ||
|
|
||
| name = process.get("name") | ||
| if not isinstance(name, str) or not name: | ||
| raise ValueError(f"Process '{process_id}' in {source} must define name") | ||
|
|
||
| execution_function = process.get("execution_function") | ||
| if not isinstance(execution_function, str) or not execution_function: | ||
| raise ValueError(f"Process '{process_id}' in {source} must define execution_function") | ||
|
|
||
|
|
||
| def _get_dynamic_function(full_path: str) -> Any: | ||
| """Import and return a function given its dotted module path.""" | ||
| parts = [p for p in full_path.split(".") if p] | ||
| if len(parts) < 2: | ||
| raise ValueError( | ||
| f"execution_function must be a dotted path with at least one module and one attribute, got '{full_path}'" | ||
| ) | ||
| module_path = ".".join(parts[:-1]) | ||
| function_name = parts[-1] | ||
| module = importlib.import_module(module_path) | ||
| return getattr(module, function_name) | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.