From cef6d663125cbffb6a53fb898dd243e981801617 Mon Sep 17 00:00:00 2001 From: sujata-m Date: Thu, 7 May 2026 19:09:33 +0530 Subject: [PATCH] [0.4.0] Add geometry-to-node coordinate mapping validation ## Dev Board Ticket - https://dev.azure.com/TDEI-UW/TDEI/_workitems/edit/3607/ ## Changes - Added geometry mapping consistency validation for OSW datasets in `OSWValidation.validate()`: - `edges._u_id` must match edge start coordinate with referenced node coordinate. - `edges._v_id` must match edge end coordinate with referenced node coordinate. - `zones._w_id` referenced node coordinates must appear on zone polygon vertices. - Added standalone pure-Python geometry mapping validator module with reusable helpers for node indexing and edge/zone mapping checks. - Improved validation error reporting for geometry mapping mismatches with detailed, actionable context (dataset name, feature index/id, referenced node id, expected vs actual coordinates). - Bumped library version to `0.4.0` and updated changelog entries for the new geometry mapping feature set. - Added new geometry mapping fixtures and expanded unit test coverage across validator-level and end-to-end validation paths. ## Testing - Wrote **35 new unit test cases**: - 19 in `test_geometry_mapping_validator.py` - 10 in `test_osw_validation_extras.py` - 6 in `test_osw_validation.py` - Added dedicated unit tests for standalone geometry mapping validator covering valid cases, missing refs, unknown refs, coordinate mismatches, and integration aggregation. - Added end-to-end validation tests for `_u_id`, `_v_id`, and `_w_id` coordinate mismatch scenarios using fixture ZIP datasets. - Added mock-driven validation tests to ensure geometry mapping checks are exercised through `validate()` behavior and issue metadata output. --- CHANGELOG.md | 7 + src/python_osw_validation/__init__.py | 159 +++++++- .../geometry_mapping_validator.py | 374 ++++++++++++++++++ src/python_osw_validation/version.py | 2 +- tests/assets/edge_u_id_coord_mismatch.zip | Bin 0 -> 1142 bytes tests/assets/edge_v_id_coord_mismatch.zip | Bin 0 -> 940 bytes tests/assets/geom_mapping_valid.zip | Bin 0 -> 974 bytes tests/assets/zone_w_id_coord_mismatch.zip | Bin 0 -> 1176 bytes .../test_geometry_mapping_validator.py | 250 ++++++++++++ tests/unit_tests/test_osw_validation.py | 72 ++++ .../unit_tests/test_osw_validation_extras.py | 225 +++++++++++ 11 files changed, 1086 insertions(+), 3 deletions(-) create mode 100644 src/python_osw_validation/geometry_mapping_validator.py create mode 100644 tests/assets/edge_u_id_coord_mismatch.zip create mode 100644 tests/assets/edge_v_id_coord_mismatch.zip create mode 100644 tests/assets/geom_mapping_valid.zip create mode 100644 tests/assets/zone_w_id_coord_mismatch.zip create mode 100644 tests/unit_tests/test_geometry_mapping_validator.py diff --git a/CHANGELOG.md b/CHANGELOG.md index ed3fe89..265d434 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Change log +### 0.4.0 - 2026-04-30 +- Added geometry mapping consistency validation: `_u_id` and `_v_id` references in edges are now verified against the actual start/end coordinates of the referenced node geometries. +- Added `_w_id` coordinate mapping validation for zones: each referenced node coordinate must be a vertex of the zone's polygon exterior ring, in order. +- Validation errors for coordinate mismatches include file name, feature index, feature ID, referenced node ID, node coordinate, and the actual edge/ring coordinate for actionable debugging. +- New standalone module `geometry_mapping_validator.py` (pure Python, no heavy dependencies) provides `run_geometry_mapping_validation`, `validate_edge_node_mapping`, `validate_zone_node_mapping`, and `build_node_coord_index`. +- Added comprehensive test coverage in `tests/unit_tests/test_geometry_mapping_validator.py`. + ### 0.3.7 - 2026-04-29 - Added upfront null/NaN placeholder detection before schema validation, including support for string placeholders such as `"null"` and `"nan"` in feature `properties`. - Changed `issues` behavior to return all detected per-feature schema issues (not only a single best issue per feature). diff --git a/src/python_osw_validation/__init__.py b/src/python_osw_validation/__init__.py index e04ea1d..cac2a70 100644 --- a/src/python_osw_validation/__init__.py +++ b/src/python_osw_validation/__init__.py @@ -2,7 +2,6 @@ import gc import json import math -import traceback from typing import Dict, Any, Optional, List, Tuple import geopandas as gpd import jsonschema_rs @@ -112,6 +111,156 @@ def _get_colset(self, gdf: Optional[gpd.GeoDataFrame], col: str, filekey: str) - self.log_errors(f"Could not create set for column '{col}' in {filekey}.", filekey, None) return set() + # ---------------------------- + # Geometry mapping helpers + # ---------------------------- + + _COORD_TOLERANCE = 1e-7 # ~1 cm at equator + + def _coords_match(self, c1: tuple, c2: tuple) -> bool: + return abs(c1[0] - c2[0]) <= self._COORD_TOLERANCE and abs(c1[1] - c2[1]) <= self._COORD_TOLERANCE + + def _build_node_coord_map(self, nodes_df: gpd.GeoDataFrame) -> Dict[Any, tuple]: + """Return {node_id: (lon, lat)} from a nodes GeoDataFrame.""" + coord_map: Dict[Any, tuple] = {} + for _, row in nodes_df.iterrows(): + try: + nid = row['_id'] + except KeyError: + continue + geom = row.geometry + if nid is not None and geom is not None and geom.geom_type == 'Point': + coord_map[nid] = (geom.x, geom.y) + return coord_map + + def _validate_edge_geometry_mapping( + self, + edges_df: Optional[gpd.GeoDataFrame], + node_coord_map: Dict[Any, tuple], + max_errors: int, + ) -> None: + """Verify edge start/end coordinates match their _u_id/_v_id node geometries.""" + if edges_df is None or not node_coord_map: + return + + has_u_id = '_u_id' in edges_df.columns + has_v_id = '_v_id' in edges_df.columns + if not (has_u_id or has_v_id): + return + + for feat_idx, row in edges_df.iterrows(): + if len(self.errors) >= max_errors: + break + + geom = row.geometry + if geom is None or geom.geom_type != 'LineString': + continue + + coords = list(geom.coords) + if not coords: + continue + + try: + edge_id = row['_id'] + except KeyError: + edge_id = feat_idx + + if has_u_id: + try: + u_id = row['_u_id'] + except KeyError: + u_id = None + if u_id is not None and u_id in node_coord_map: + node_coord = node_coord_map[u_id] + edge_start = (coords[0][0], coords[0][1]) + if not self._coords_match(edge_start, node_coord): + self.log_errors( + message=( + f"edges id '{edge_id}' : " + f"start coordinate {edge_start} does not match " + f"node id '{u_id}' coordinate {node_coord} (_u_id mismatch)." + ), + filename='edges', + feature_index=feat_idx, + ) + + if len(self.errors) >= max_errors: + break + + if has_v_id: + try: + v_id = row['_v_id'] + except KeyError: + v_id = None + if v_id is not None and v_id in node_coord_map: + node_coord = node_coord_map[v_id] + edge_end = (coords[-1][0], coords[-1][1]) + if not self._coords_match(edge_end, node_coord): + self.log_errors( + message=( + f"edges id '{edge_id}' : " + f"end coordinate {edge_end} does not match " + f"node id '{v_id}' coordinate {node_coord} (_v_id mismatch)." + ), + filename='edges', + feature_index=feat_idx, + ) + + def _validate_zone_geometry_mapping( + self, + zones_df: Optional[gpd.GeoDataFrame], + node_coord_map: Dict[Any, tuple], + max_errors: int, + ) -> None: + """Verify each _w_id node coordinate is a vertex of the zone's polygon exterior ring.""" + if zones_df is None or not node_coord_map: + return + + if '_w_id' not in zones_df.columns: + return + + for feat_idx, row in zones_df.iterrows(): + if len(self.errors) >= max_errors: + break + + geom = row.geometry + if geom is None or geom.geom_type != 'Polygon': + continue + + try: + zone_id = row['_id'] + except KeyError: + zone_id = feat_idx + + ring_coords = {(c[0], c[1]) for c in geom.exterior.coords} + + try: + w_ids = row['_w_id'] + except KeyError: + continue + + if w_ids is None: + continue + if not isinstance(w_ids, (list, tuple)): + w_ids = [w_ids] + + for w_id in w_ids: + if len(self.errors) >= max_errors: + break + if w_id is None or w_id not in node_coord_map: + continue + node_coord = node_coord_map[w_id] + if not any(self._coords_match(node_coord, rc) for rc in ring_coords): + self.log_errors( + message=( + f"zones id '{zone_id}' : " + f"node id '{w_id}' coordinate {node_coord} is not a vertex " + f"of the zone polygon geometry (_w_id coordinate mismatch)." + ), + filename='zones', + feature_index=feat_idx, + ) + def _schema_key_from_text(self, text: Optional[str]) -> Optional[str]: """Return dataset key from exact filename suffixes only.""" if not text: @@ -370,6 +519,13 @@ def _finalize(is_valid: bool, errors: Optional[List[str]] = None) -> ValidationR feature_index=None ) + # Geometry mapping: coordinate consistency using already-loaded GeoDataFrames + if nodes_df is not None and len(self.errors) < max_errors: + node_coord_map = self._build_node_coord_map(nodes_df) + if node_coord_map: + self._validate_edge_geometry_mapping(edges_df, node_coord_map, max_errors) + self._validate_zone_geometry_mapping(zones_df, node_coord_map, max_errors) + # Geometry validation: check geometry type and SFA validity for osw_file, gdf in OSW_DATASET.items(): if gdf is None: @@ -454,7 +610,6 @@ def _finalize(is_valid: bool, errors: Optional[List[str]] = None) -> ValidationR filename=None, feature_index=None ) - traceback.print_exc() return _finalize(False) finally: # Cleanup extracted files diff --git a/src/python_osw_validation/geometry_mapping_validator.py b/src/python_osw_validation/geometry_mapping_validator.py new file mode 100644 index 0000000..754d44a --- /dev/null +++ b/src/python_osw_validation/geometry_mapping_validator.py @@ -0,0 +1,374 @@ +"""Geometry mapping consistency validation for OSW datasets. + +This module validates that cross-file references between OSW dataset +components correspond to real geometries: + +* ``edges._u_id`` / ``edges._v_id`` must reference an existing node in + ``nodes.geojson`` AND the node's coordinates must equal the corresponding + endpoint of the edge ``LineString`` (start coord for ``_u_id``, end coord + for ``_v_id``). +* ``zones._w_id`` is a list of node ids whose coordinates must match the + zone polygon's outer ring vertices, in order. + +The validator emits structured issues with ``filename``, ``feature_index``, +``feature_id`` (when available) and a clear, actionable error message. + +Coordinate matching is performed by exact equality on the (lon, lat) pair. +Altitude / extra coordinate dimensions are ignored. +""" + +from __future__ import annotations + +from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple +import math + + +# --------------------------------------------------------------------------- +# Coordinate helpers +# --------------------------------------------------------------------------- + +Coord = Tuple[float, float] + + +def _coord_key(coord: Sequence[Any]) -> Optional[Coord]: + """Return (lon, lat) tuple for a coordinate, or None when malformed.""" + if not isinstance(coord, (list, tuple)) or len(coord) < 2: + return None + try: + lon = float(coord[0]) + lat = float(coord[1]) + except (TypeError, ValueError): + return None + if math.isnan(lon) or math.isnan(lat): + return None + return (lon, lat) + + +def _format_coord(coord: Optional[Sequence[Any]]) -> str: + key = _coord_key(coord) if coord is not None else None + if key is None: + return "" + return f"({key[0]}, {key[1]})" + + +# --------------------------------------------------------------------------- +# Index builders +# --------------------------------------------------------------------------- + + +def build_node_coord_index(nodes_geojson: Optional[Dict[str, Any]]) -> Dict[str, Coord]: + """Map each node ``_id`` to its (lon, lat) coordinate. + + Nodes without a usable ``_id`` or geometry are skipped silently; + higher level validation already reports those issues. + """ + index: Dict[str, Coord] = {} + if not isinstance(nodes_geojson, dict): + return index + features = nodes_geojson.get("features") or [] + if not isinstance(features, list): + return index + for feature in features: + if not isinstance(feature, dict): + continue + props = feature.get("properties") or {} + node_id = props.get("_id") if isinstance(props, dict) else None + if not isinstance(node_id, str) or not node_id: + continue + geom = feature.get("geometry") or {} + if not isinstance(geom, dict) or geom.get("type") != "Point": + continue + coord = _coord_key(geom.get("coordinates")) + if coord is None: + continue + # First write wins - duplicate id detection is reported elsewhere. + index.setdefault(node_id, coord) + return index + + +# --------------------------------------------------------------------------- +# Edge validation +# --------------------------------------------------------------------------- + + +def validate_edge_node_mapping( + edges_geojson: Optional[Dict[str, Any]], + node_coords: Dict[str, Coord], + edges_filename: str = "edges", + nodes_present: bool = True, +) -> List[Dict[str, Any]]: + """Validate ``_u_id`` and ``_v_id`` references on edges. + + Reports issues for: + * Missing reference values + * References whose target node id does not exist in nodes + * References whose target node coordinate does not match the matching + endpoint of the edge ``LineString`` + + When ``nodes_present`` is False (no nodes file in dataset) coordinate + matching is skipped because the targets are unknown. + """ + issues: List[Dict[str, Any]] = [] + if not isinstance(edges_geojson, dict): + return issues + features = edges_geojson.get("features") or [] + if not isinstance(features, list): + return issues + + for idx, feature in enumerate(features): + if not isinstance(feature, dict): + continue + props = feature.get("properties") or {} + if not isinstance(props, dict): + props = {} + feature_id = props.get("_id") if isinstance(props.get("_id"), str) else None + geom = feature.get("geometry") or {} + coords = geom.get("coordinates") if isinstance(geom, dict) else None + is_linestring = ( + isinstance(geom, dict) + and geom.get("type") == "LineString" + and isinstance(coords, list) + and len(coords) >= 2 + ) + u_endpoint = _coord_key(coords[0]) if is_linestring else None + v_endpoint = _coord_key(coords[-1]) if is_linestring else None + + for ref_field, endpoint, endpoint_label in ( + ("_u_id", u_endpoint, "start"), + ("_v_id", v_endpoint, "end"), + ): + ref_val = props.get(ref_field) + if ref_val is None or (isinstance(ref_val, str) and not ref_val.strip()): + issues.append(_make_issue( + edges_filename, idx, feature_id, + f"Edge is missing required '{ref_field}' reference.", + )) + continue + if not isinstance(ref_val, str): + # Schema validation already complains about non-string ids; skip + # cross-file checks for these. + continue + + if not nodes_present: + # Only the existence check is meaningful, and we have no node set. + continue + + target = node_coords.get(ref_val) + if target is None: + issues.append(_make_issue( + edges_filename, idx, feature_id, + f"Edge {ref_field}='{ref_val}' does not reference any node in nodes.geojson.", + )) + continue + + if endpoint is None: + # Geometry was malformed; schema validation reports that. Skip + # coordinate mismatch reporting to avoid noise. + continue + + if target != endpoint: + issues.append(_make_issue( + edges_filename, idx, feature_id, + (f"Edge {ref_field}='{ref_val}' coordinate mismatch: " + f"node is at {_format_coord(target)} but edge {endpoint_label} " + f"point is at {_format_coord(endpoint)}."), + )) + return issues + + +# --------------------------------------------------------------------------- +# Zone validation +# --------------------------------------------------------------------------- + + +def _polygon_outer_ring(geom: Any) -> Optional[List[Sequence[Any]]]: + """Return the outer ring coordinate list for a Polygon, else None.""" + if not isinstance(geom, dict): + return None + if geom.get("type") != "Polygon": + return None + coords = geom.get("coordinates") + if not isinstance(coords, list) or not coords: + return None + ring = coords[0] + if not isinstance(ring, list) or len(ring) < 3: + return None + return ring + + +def validate_zone_node_mapping( + zones_geojson: Optional[Dict[str, Any]], + node_coords: Dict[str, Coord], + zones_filename: str = "zones", + nodes_present: bool = True, +) -> List[Dict[str, Any]]: + """Validate ``_w_id`` references on zones. + + Each zone feature must have a ``_w_id`` array whose entries: + 1. Reference real node ids + 2. Map to coordinates that match the polygon's outer-ring vertices + in the same order. The ring may include a closing vertex + (last == first); when present the closing vertex is ignored + when comparing lengths. + """ + issues: List[Dict[str, Any]] = [] + if not isinstance(zones_geojson, dict): + return issues + features = zones_geojson.get("features") or [] + if not isinstance(features, list): + return issues + + for idx, feature in enumerate(features): + if not isinstance(feature, dict): + continue + props = feature.get("properties") or {} + if not isinstance(props, dict): + props = {} + feature_id = props.get("_id") if isinstance(props.get("_id"), str) else None + w_ids = props.get("_w_id") + if w_ids is None: + issues.append(_make_issue( + zones_filename, idx, feature_id, + "Zone is missing required '_w_id' reference list.", + )) + continue + if not isinstance(w_ids, (list, tuple)): + issues.append(_make_issue( + zones_filename, idx, feature_id, + "Zone '_w_id' must be an array of node identifiers.", + )) + continue + if not w_ids: + issues.append(_make_issue( + zones_filename, idx, feature_id, + "Zone '_w_id' array is empty; expected node references for the polygon ring.", + )) + continue + + # Existence check + missing_refs = [w for w in w_ids if not (isinstance(w, str) and w in node_coords)] + if nodes_present and missing_refs: + preview = ", ".join(map(str, missing_refs[:5])) + more = f" (+{len(missing_refs) - 5} more)" if len(missing_refs) > 5 else "" + issues.append(_make_issue( + zones_filename, idx, feature_id, + f"Zone '_w_id' references unknown node id(s): {preview}{more}.", + )) + # We continue to coordinate check on a best-effort basis below + # only if all references resolve; otherwise it's noisy. + continue + + ring = _polygon_outer_ring(feature.get("geometry")) + if ring is None: + # Schema/geometry validation already reports malformed polygons. + continue + if not nodes_present: + continue + + ring_coords = [_coord_key(c) for c in ring] + is_closed = ( + len(ring_coords) >= 2 + and ring_coords[0] is not None + and ring_coords[0] == ring_coords[-1] + ) + # Two conventions are accepted for closed rings: + # * `_w_id` contains an entry for every ring vertex, *including* the + # trailing duplicate that closes the polygon. + # * `_w_id` contains an entry only for each unique ring vertex (no + # trailing duplicate). + # For open rings only the first convention is valid. + if is_closed and len(w_ids) == len(ring_coords) - 1: + comparable_ring = ring_coords[:-1] + else: + comparable_ring = ring_coords + + if len(comparable_ring) != len(w_ids): + issues.append(_make_issue( + zones_filename, idx, feature_id, + (f"Zone '_w_id' has {len(w_ids)} entries but polygon ring has " + f"{len(comparable_ring)} vertices; they must align."), + )) + continue + + mismatches: List[str] = [] + for pos, (wid, ring_pt) in enumerate(zip(w_ids, comparable_ring)): + if not isinstance(wid, str): + mismatches.append( + f"position {pos}: '_w_id' entry is not a string" + ) + continue + node_pt = node_coords.get(wid) + if node_pt is None or ring_pt is None or node_pt != ring_pt: + mismatches.append( + f"position {pos}: '_w_id'='{wid}' at node {_format_coord(node_pt)} " + f"vs ring vertex {_format_coord(ring_pt)}" + ) + if mismatches: + preview = "; ".join(mismatches[:3]) + more = f"; and {len(mismatches) - 3} more" if len(mismatches) > 3 else "" + issues.append(_make_issue( + zones_filename, idx, feature_id, + f"Zone polygon ring does not match '_w_id' node coordinates: {preview}{more}.", + )) + return issues + + +# --------------------------------------------------------------------------- +# Issue construction +# --------------------------------------------------------------------------- + + +def _make_issue( + filename: str, + feature_index: Optional[int], + feature_id: Optional[str], + message: str, +) -> Dict[str, Any]: + return { + "filename": filename, + "feature_index": feature_index, + "feature_id": feature_id, + "error_message": [message], + } + + +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + + +def run_geometry_mapping_validation( + nodes_geojson: Optional[Dict[str, Any]], + edges_geojson: Optional[Dict[str, Any]], + zones_geojson: Optional[Dict[str, Any]], + filenames: Optional[Dict[str, str]] = None, +) -> List[Dict[str, Any]]: + """Run all geometry mapping validations and return aggregated issues. + + ``filenames`` maps the dataset key (``edges``, ``zones``, ``nodes``) + to the filename used in error messages so that downstream tooling sees + the actual file rather than a generic dataset key. + """ + fn = filenames or {} + edges_name = fn.get("edges", "edges") + zones_name = fn.get("zones", "zones") + + nodes_present = isinstance(nodes_geojson, dict) and bool(nodes_geojson.get("features")) + node_coords = build_node_coord_index(nodes_geojson) if nodes_present else {} + + issues: List[Dict[str, Any]] = [] + issues.extend(validate_edge_node_mapping( + edges_geojson, node_coords, edges_name, nodes_present + )) + issues.extend(validate_zone_node_mapping( + zones_geojson, node_coords, zones_name, nodes_present + )) + return issues + + +__all__ = [ + "build_node_coord_index", + "run_geometry_mapping_validation", + "validate_edge_node_mapping", + "validate_zone_node_mapping", +] diff --git a/src/python_osw_validation/version.py b/src/python_osw_validation/version.py index d93912e..abeeedb 100644 --- a/src/python_osw_validation/version.py +++ b/src/python_osw_validation/version.py @@ -1 +1 @@ -__version__ = '0.3.7' +__version__ = '0.4.0' diff --git a/tests/assets/edge_u_id_coord_mismatch.zip b/tests/assets/edge_u_id_coord_mismatch.zip new file mode 100644 index 0000000000000000000000000000000000000000..ac8cb517df6bbbc477c4bd5957ef0c2f7d043257 GIT binary patch literal 1142 zcmdT@!AiqG5KUY4OO+;2?8q4Cu_ZLuohGGzO!l7?9(h_y$sjx>K*&Xu5O6 zVuhtC!&7L+dOV;iM1#!{&AFCk?VPJYxzH5lYGSVL+}jZ1;d$6REL#?RUxk1m{7*t$ zaRJvxVLN=IMrSQY&F-wPtKhz%K`gyVE)^~BR?8-#11U}7Q#-I#OOiX{VYGCwB6gFx z_#|f}Hk$5lPm@L`={>o+WQ9- NT*mha|8EsqeE^~AR22XK literal 0 HcmV?d00001 diff --git a/tests/assets/edge_v_id_coord_mismatch.zip b/tests/assets/edge_v_id_coord_mismatch.zip new file mode 100644 index 0000000000000000000000000000000000000000..731eab3bf9be3b36fc97f974491de66724177c6b GIT binary patch literal 940 zcmdT@%}T>S5bmEoL2q8dB6tZ&S})S0f*|c7;IW1hnrX7y?uOm1rG$Vt@#+Hzf^X)F zIFq!-R4sTkVQ1O-X1^KsoAHP3PPakd>)XrC)rordn~esoBlO7>gys>f!ug$cgp5Et zG00mjMaGWxG=W9P1{O<WtJqbj6E0#Sg#*#jtr5ANbLy6t9in2k#mqVdV!DLT zEEQbH`5dN(m*A{)Y77`>yP0LG{K(tf*!FJJYNDY%9IGwz)>ojw4W&K8IO literal 0 HcmV?d00001 diff --git a/tests/assets/geom_mapping_valid.zip b/tests/assets/geom_mapping_valid.zip new file mode 100644 index 0000000000000000000000000000000000000000..93ba1ccb6111e33ba9a46ec1e0613d411c79ad26 GIT binary patch literal 974 zcmdT@%}T>S5N@sd0Mc78VG&Pt^P@%VRq#>|0WU&ZN@%9ZYP%bkULe>Y$c^`!U literal 0 HcmV?d00001 diff --git a/tests/assets/zone_w_id_coord_mismatch.zip b/tests/assets/zone_w_id_coord_mismatch.zip new file mode 100644 index 0000000000000000000000000000000000000000..a7b9794215c19e87ad0a58028ff206a94d3092e9 GIT binary patch literal 1176 zcmdT^%}T>S5RR>S@!FdpVG;3AHvPNUtKg-VoV>(RmS#;>)7_BW653J_AHfF@1Yg1D z@D+RnAHkWVHl|`w=*5Jc%wlPjo1021_y7k>597{rwKR73>*r(tAsRY9V?aiLNdcMgcfZz zZ4kpH*y03ION_`&3o;TuW|6kZ608N*Dy=`qYX2fuXPdDgVAZ+0TZ(mt=K@g`s|T@$ zOSeR@mlN;pa>uf8d?(mk$^S*LXcpuQ-JqD~tx49jy)_9wbu1F-na@`pNja?erSmot zaFpgO^^m48Pdl0HX0k^BnS>$;Q-P-ND{II@jQRfbV-9XCLOGTu1-_ literal 0 HcmV?d00001 diff --git a/tests/unit_tests/test_geometry_mapping_validator.py b/tests/unit_tests/test_geometry_mapping_validator.py new file mode 100644 index 0000000..4a1af18 --- /dev/null +++ b/tests/unit_tests/test_geometry_mapping_validator.py @@ -0,0 +1,250 @@ +"""Unit tests for python_osw_validation.geometry_mapping_validator.""" +from __future__ import annotations + +import importlib.util +import os +import sys +import unittest +from typing import Any, Dict, List + +# Make `src` importable for local development checkouts. +ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)) +SRC = os.path.join(ROOT, "src") +if SRC not in sys.path: + sys.path.insert(0, SRC) + +# Import the geometry_mapping_validator module directly so this test file +# does not require the heavy optional dependencies (geopandas, jsonschema_rs) +# of the parent package's __init__.py. The module itself is pure Python. +_MOD_PATH = os.path.join( + SRC, "python_osw_validation", "geometry_mapping_validator.py" +) +_spec = importlib.util.spec_from_file_location( + "_geometry_mapping_validator", _MOD_PATH +) +_geometry_mapping_validator = importlib.util.module_from_spec(_spec) +assert _spec.loader is not None +_spec.loader.exec_module(_geometry_mapping_validator) + +build_node_coord_index = _geometry_mapping_validator.build_node_coord_index +run_geometry_mapping_validation = _geometry_mapping_validator.run_geometry_mapping_validation +validate_edge_node_mapping = _geometry_mapping_validator.validate_edge_node_mapping +validate_zone_node_mapping = _geometry_mapping_validator.validate_zone_node_mapping + + +def _node(node_id: str, lon: float, lat: float) -> Dict[str, Any]: + return { + "type": "Feature", + "properties": {"_id": node_id}, + "geometry": {"type": "Point", "coordinates": [lon, lat]}, + } + + +def _nodes(*pairs) -> Dict[str, Any]: + return {"type": "FeatureCollection", "features": [_node(*p) for p in pairs]} + + +def _edge(edge_id: str, u_id: str, v_id: str, coords: List[List[float]]) -> Dict[str, Any]: + return { + "type": "Feature", + "properties": {"_id": edge_id, "_u_id": u_id, "_v_id": v_id}, + "geometry": {"type": "LineString", "coordinates": coords}, + } + + +def _edges(*features) -> Dict[str, Any]: + return {"type": "FeatureCollection", "features": list(features)} + + +def _zone(zone_id: str, w_ids: List[str], ring: List[List[float]]) -> Dict[str, Any]: + return { + "type": "Feature", + "properties": {"_id": zone_id, "_w_id": w_ids}, + "geometry": {"type": "Polygon", "coordinates": [ring]}, + } + + +def _zones(*features) -> Dict[str, Any]: + return {"type": "FeatureCollection", "features": list(features)} + + +def _messages(issues: List[Dict[str, Any]]) -> List[str]: + out: List[str] = [] + for it in issues: + msgs = it.get("error_message") or [] + if msgs: + out.append(msgs[0]) + return out + + +class BuildNodeCoordIndexTests(unittest.TestCase): + def test_returns_empty_for_invalid_inputs(self): + self.assertEqual(build_node_coord_index(None), {}) + self.assertEqual(build_node_coord_index({}), {}) + self.assertEqual(build_node_coord_index({"features": "not a list"}), {}) + + def test_indexes_valid_nodes_only(self): + nodes = _nodes(("n1", 1.0, 2.0), ("n2", 3.0, 4.0)) + # Add a malformed feature that should be skipped + nodes["features"].append({"type": "Feature", "properties": {"_id": "bad"}}) + nodes["features"].append({"type": "Feature", "properties": {}, + "geometry": {"type": "Point", "coordinates": [9, 9]}}) + idx = build_node_coord_index(nodes) + self.assertEqual(idx, {"n1": (1.0, 2.0), "n2": (3.0, 4.0)}) + + def test_first_id_wins_when_duplicates(self): + nodes = _nodes(("n1", 1.0, 2.0), ("n1", 9.0, 9.0)) + idx = build_node_coord_index(nodes) + self.assertEqual(idx["n1"], (1.0, 2.0)) + + +class EdgeMappingTests(unittest.TestCase): + def setUp(self): + self.nodes = _nodes(("n1", 0.0, 0.0), ("n2", 1.0, 1.0)) + self.node_idx = build_node_coord_index(self.nodes) + + def test_valid_edge_passes(self): + edges = _edges(_edge("e1", "n1", "n2", [[0.0, 0.0], [0.5, 0.5], [1.0, 1.0]])) + issues = validate_edge_node_mapping(edges, self.node_idx) + self.assertEqual(issues, []) + + def test_missing_u_id_reports_per_feature(self): + edges = _edges({ + "type": "Feature", + "properties": {"_id": "e1", "_v_id": "n2"}, + "geometry": {"type": "LineString", "coordinates": [[0.0, 0.0], [1.0, 1.0]]}, + }) + issues = validate_edge_node_mapping(edges, self.node_idx) + self.assertEqual(len(issues), 1) + self.assertEqual(issues[0]["feature_index"], 0) + self.assertEqual(issues[0]["feature_id"], "e1") + self.assertIn("_u_id", issues[0]["error_message"][0]) + + def test_unknown_reference_reports(self): + edges = _edges(_edge("e1", "ghost", "n2", [[0.0, 0.0], [1.0, 1.0]])) + msgs = _messages(validate_edge_node_mapping(edges, self.node_idx)) + self.assertTrue(any("does not reference any node" in m for m in msgs)) + self.assertTrue(any("'ghost'" in m for m in msgs)) + + def test_coordinate_mismatch_reports(self): + # _u_id correct id but the line does not start at n1's coordinate. + edges = _edges(_edge("e1", "n1", "n2", [[5.0, 5.0], [1.0, 1.0]])) + msgs = _messages(validate_edge_node_mapping(edges, self.node_idx)) + self.assertTrue(any("coordinate mismatch" in m for m in msgs)) + self.assertTrue(any("(5.0, 5.0)" in m for m in msgs)) + self.assertTrue(any("(0.0, 0.0)" in m for m in msgs)) + + def test_filename_propagates_into_issue(self): + edges = _edges(_edge("e1", "ghost", "n2", [[0.0, 0.0], [1.0, 1.0]])) + issues = validate_edge_node_mapping(edges, self.node_idx, edges_filename="awesome.edges.geojson") + self.assertEqual(issues[0]["filename"], "awesome.edges.geojson") + + def test_skips_coordinate_check_when_no_nodes(self): + edges = _edges(_edge("e1", "n1", "n2", [[5.0, 5.0], [9.0, 9.0]])) + issues = validate_edge_node_mapping(edges, {}, nodes_present=False) + self.assertEqual(issues, []) + + +class ZoneMappingTests(unittest.TestCase): + def setUp(self): + # A square: (0,0)-(1,0)-(1,1)-(0,1) + self.nodes = _nodes(("a", 0.0, 0.0), ("b", 1.0, 0.0), + ("c", 1.0, 1.0), ("d", 0.0, 1.0)) + self.node_idx = build_node_coord_index(self.nodes) + self.ring_closed = [[0.0, 0.0], [1.0, 0.0], [1.0, 1.0], [0.0, 1.0], [0.0, 0.0]] + self.w_ids = ["a", "b", "c", "d"] + + def test_valid_zone_passes(self): + zones = _zones(_zone("z1", self.w_ids, self.ring_closed)) + self.assertEqual(validate_zone_node_mapping(zones, self.node_idx), []) + + def test_open_ring_also_valid(self): + ring_open = self.ring_closed[:-1] + zones = _zones(_zone("z1", self.w_ids, ring_open)) + self.assertEqual(validate_zone_node_mapping(zones, self.node_idx), []) + + def test_missing_w_id_reports(self): + zone = {"type": "Feature", "properties": {"_id": "z1"}, + "geometry": {"type": "Polygon", "coordinates": [self.ring_closed]}} + msgs = _messages(validate_zone_node_mapping(_zones(zone), self.node_idx)) + self.assertTrue(any("missing required '_w_id'" in m for m in msgs)) + + def test_w_id_not_array_reports(self): + zone = {"type": "Feature", "properties": {"_id": "z1", "_w_id": "not-a-list"}, + "geometry": {"type": "Polygon", "coordinates": [self.ring_closed]}} + msgs = _messages(validate_zone_node_mapping(_zones(zone), self.node_idx)) + self.assertTrue(any("must be an array" in m for m in msgs)) + + def test_unknown_w_id_reports(self): + zones = _zones(_zone("z1", ["a", "b", "ghost", "d"], self.ring_closed)) + msgs = _messages(validate_zone_node_mapping(zones, self.node_idx)) + self.assertTrue(any("unknown node id" in m for m in msgs)) + self.assertTrue(any("ghost" in m for m in msgs)) + + def test_length_mismatch_reports(self): + zones = _zones(_zone("z1", ["a", "b", "c"], self.ring_closed)) # 3 ids vs 4 vertices + msgs = _messages(validate_zone_node_mapping(zones, self.node_idx)) + self.assertTrue(any("must align" in m for m in msgs)) + + def test_ring_coordinate_mismatch_reports(self): + # All ids resolve but order is wrong → coordinates don't line up. + zones = _zones(_zone("z1", ["a", "c", "b", "d"], self.ring_closed)) + msgs = _messages(validate_zone_node_mapping(zones, self.node_idx)) + self.assertTrue(any("does not match" in m for m in msgs)) + + +class IntegrationTests(unittest.TestCase): + def test_runs_all_validators_and_aggregates(self): + nodes = _nodes(("n1", 0.0, 0.0), ("n2", 1.0, 1.0), + ("a", 0.0, 0.0), ("b", 1.0, 0.0), + ("c", 1.0, 1.0), ("d", 0.0, 1.0)) + edges = _edges(_edge("e_bad", "n1", "ghost", [[0.0, 0.0], [9.0, 9.0]])) + zones = _zones(_zone("z_bad", ["a", "c", "b", "d"], + [[0.0, 0.0], [1.0, 0.0], [1.0, 1.0], [0.0, 1.0], [0.0, 0.0]])) + issues = run_geometry_mapping_validation( + nodes_geojson=nodes, + edges_geojson=edges, + zones_geojson=zones, + filenames={"edges": "edges.geojson", "zones": "zones.geojson"}, + ) + # We expect: + # - one for edge unknown _v_id + # - one for edge _u_id coordinate mismatch (n1=(0,0) vs (0,0)) actually ok, so just unknown ref + # - zone ring mismatch + self.assertGreaterEqual(len(issues), 2) + self.assertTrue(any(i["filename"] == "edges.geojson" for i in issues)) + self.assertTrue(any(i["filename"] == "zones.geojson" for i in issues)) + # Each issue carries feature_index and feature_id + for it in issues: + self.assertIn("feature_index", it) + self.assertIn("feature_id", it) + + def test_no_issues_when_dataset_is_consistent(self): + nodes = _nodes(("n1", 0.0, 0.0), ("n2", 1.0, 1.0)) + edges = _edges(_edge("e1", "n1", "n2", [[0.0, 0.0], [1.0, 1.0]])) + issues = run_geometry_mapping_validation( + nodes_geojson=nodes, + edges_geojson=edges, + zones_geojson=None, + ) + self.assertEqual(issues, []) + + def test_handles_missing_nodes_file_gracefully(self): + # No nodes available → existence and coordinate checks become no-ops + # but missing _u_id still flagged. + edges = _edges({ + "type": "Feature", + "properties": {"_id": "e1", "_v_id": "n2"}, + "geometry": {"type": "LineString", "coordinates": [[0.0, 0.0], [1.0, 1.0]]}, + }) + issues = run_geometry_mapping_validation( + nodes_geojson=None, + edges_geojson=edges, + zones_geojson=None, + ) + msgs = _messages(issues) + self.assertTrue(any("_u_id" in m for m in msgs)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit_tests/test_osw_validation.py b/tests/unit_tests/test_osw_validation.py index bfb85bb..b1aff44 100644 --- a/tests/unit_tests/test_osw_validation.py +++ b/tests/unit_tests/test_osw_validation.py @@ -44,6 +44,11 @@ def setUp(self): self.schema_file_path = SCHEMA_FILE_PATH self.schema_paths = SCHEMA_PATHS self.invalid_schema_file_path = INVALID_SCHEMA_FILE_PATH + # Geometry mapping fixtures + self.geom_mapping_valid = os.path.join(ASSETS_PATH, 'geom_mapping_valid.zip') + self.edge_u_id_coord_mismatch = os.path.join(ASSETS_PATH, 'edge_u_id_coord_mismatch.zip') + self.edge_v_id_coord_mismatch = os.path.join(ASSETS_PATH, 'edge_v_id_coord_mismatch.zip') + self.zone_w_id_coord_mismatch = os.path.join(ASSETS_PATH, 'zone_w_id_coord_mismatch.zip') def test_valid_zipfile(self): validation = OSWValidation(zipfile_path=self.valid_zipfile) @@ -302,6 +307,73 @@ def test_issue_3297_issue_payload(self): self.assertIn("Acceptable values can be one of dashes|dots|ladder|ladder:paired|ladder:skewed| and 14 more", flattened) self.assertIn("Invalid value at 'step_count': 'test' . Acceptable datatype is integer ; provide a valid value and retry", flattened) + # ------------------------------------------------------------------ + # Geometry mapping tests (v0.4.0) + # ------------------------------------------------------------------ + + def test_geom_mapping_valid_passes(self): + """Dataset where edge endpoints exactly match their referenced node coordinates.""" + validation = OSWValidation(zipfile_path=self.geom_mapping_valid) + result = validation.validate() + self.assertTrue(result.is_valid, f"Expected valid; errors={result.errors}") + self.assertIsNone(result.errors) + + def test_edge_u_id_coord_mismatch_fails(self): + """Edge whose start coordinate does not match the _u_id node is rejected.""" + validation = OSWValidation(zipfile_path=self.edge_u_id_coord_mismatch) + result = validation.validate() + self.assertFalse(result.is_valid) + self.assertIsNotNone(result.errors) + mismatch_err = next((e for e in result.errors if '_u_id mismatch' in e), None) + self.assertIsNotNone(mismatch_err, f"Expected _u_id mismatch error; got: {result.errors}") + # The error must name the offending edge and reference the node ID + self.assertIn('e2', mismatch_err) + self.assertIn('n1', mismatch_err) + + def test_edge_u_id_coord_mismatch_issue_has_feature_index(self): + """Geometry mapping issues include filename and feature_index.""" + validation = OSWValidation(zipfile_path=self.edge_u_id_coord_mismatch) + result = validation.validate() + mismatch_issue = next( + (i for i in (result.issues or []) if '_u_id mismatch' in i.get('error_message', '')), + None, + ) + self.assertIsNotNone(mismatch_issue) + self.assertEqual(mismatch_issue['filename'], 'edges') + self.assertIsNotNone(mismatch_issue['feature_index']) + + def test_edge_v_id_coord_mismatch_fails(self): + """Edge whose end coordinate does not match the _v_id node is rejected.""" + validation = OSWValidation(zipfile_path=self.edge_v_id_coord_mismatch) + result = validation.validate() + self.assertFalse(result.is_valid) + self.assertIsNotNone(result.errors) + mismatch_err = next((e for e in result.errors if '_v_id mismatch' in e), None) + self.assertIsNotNone(mismatch_err, f"Expected _v_id mismatch error; got: {result.errors}") + self.assertIn('n2', mismatch_err) + + def test_zone_w_id_coord_mismatch_fails(self): + """Zone whose _w_id node coordinate is not a polygon vertex is rejected.""" + validation = OSWValidation(zipfile_path=self.zone_w_id_coord_mismatch) + result = validation.validate() + self.assertFalse(result.is_valid) + self.assertIsNotNone(result.errors) + mismatch_err = next((e for e in result.errors if '_w_id coordinate mismatch' in e), None) + self.assertIsNotNone(mismatch_err, f"Expected _w_id mismatch error; got: {result.errors}") + self.assertIn('w4', mismatch_err) + + def test_zone_w_id_coord_mismatch_issue_has_feature_index(self): + """Zone geometry mapping issues include filename and feature_index.""" + validation = OSWValidation(zipfile_path=self.zone_w_id_coord_mismatch) + result = validation.validate() + mismatch_issue = next( + (i for i in (result.issues or []) if '_w_id coordinate mismatch' in i.get('error_message', '')), + None, + ) + self.assertIsNotNone(mismatch_issue) + self.assertEqual(mismatch_issue['filename'], 'zones') + self.assertIsNotNone(mismatch_issue['feature_index']) + def test_jsonschema_rs_pin_is_0_33_0(self): requirements_path = os.path.join(SRC_DIR, 'requirements.txt') setup_path = os.path.join(SRC_DIR, 'setup.py') diff --git a/tests/unit_tests/test_osw_validation_extras.py b/tests/unit_tests/test_osw_validation_extras.py index 88aeaba..b136bb3 100644 --- a/tests/unit_tests/test_osw_validation_extras.py +++ b/tests/unit_tests/test_osw_validation_extras.py @@ -1034,5 +1034,230 @@ def test_invalid_geometry_logs_index_when__id_missing_and_caps_20(self): self.assertIn("Showing 20 out of 25", msg) +class TestGeometryMappingViaValidate(unittest.TestCase): + """Unit tests for _u_id/_v_id/_w_id coordinate mapping through validate().""" + + # ---- GeoDataFrame helpers ---- + + def _nodes_gdf(self, id_coord_pairs): + """[(id, x, y), ...] → GeoDataFrame of Point nodes.""" + return gpd.GeoDataFrame( + { + "_id": [p[0] for p in id_coord_pairs], + "geometry": [Point(p[1], p[2]) for p in id_coord_pairs], + }, + geometry="geometry", crs="EPSG:4326", + ) + + def _edges_gdf(self, rows): + """[(_id, _u_id, _v_id, coords), ...] → GeoDataFrame of LineString edges.""" + return gpd.GeoDataFrame( + { + "_id": [r[0] for r in rows], + "_u_id": [r[1] for r in rows], + "_v_id": [r[2] for r in rows], + "geometry": [LineString(r[3]) for r in rows], + }, + geometry="geometry", crs="EPSG:4326", + ) + + def _zones_gdf(self, rows): + """[(_id, w_ids, ring), ...] → GeoDataFrame of Polygon zones.""" + return gpd.GeoDataFrame( + { + "_id": [r[0] for r in rows], + "_w_id": [r[1] for r in rows], + "geometry": [Polygon(r[2]) for r in rows], + }, + geometry="geometry", crs="EPSG:4326", + ) + + def _patch_env(self, fake_files, read_side_effect): + """Return a context manager tuple for patching zip+validator+read_file.""" + z = MagicMock() + z.extract_zip.return_value = "/tmp/extracted" + z.remove_extracted_files.return_value = None + + val = MagicMock() + val.files = fake_files + val.externalExtensions = [] + val.is_valid.return_value = True + + return z, val, read_side_effect + + # ---- helper to run validate() with mocks ---- + + def _run(self, fake_files, read_fn): + with patch(_PATCH_ZIP) as PZip, \ + patch(_PATCH_EV) as PVal, \ + patch(_PATCH_VALIDATE, return_value=True), \ + patch(_PATCH_READ_FILE) as PRead, \ + patch(_PATCH_DATASET_FILES, _CANON_DATASET_FILES): + z, val, rf = self._patch_env(fake_files, read_fn) + PZip.return_value = z + PVal.return_value = val + PRead.side_effect = rf + return OSWValidation(zipfile_path="dummy.zip").validate() + + # ---- tests ---- + + def test_valid_edge_mapping_passes(self): + nodes = self._nodes_gdf([("n1", 0.0, 0.0), ("n2", 1.0, 1.0)]) + edges = self._edges_gdf([("e1", "n1", "n2", [(0.0, 0.0), (0.5, 0.5), (1.0, 1.0)])]) + + def rf(path): + b = os.path.basename(path) + return nodes if "nodes" in b else edges if "edges" in b else gpd.GeoDataFrame() + + res = self._run(["/tmp/nodes.geojson", "/tmp/edges.geojson"], rf) + self.assertTrue(res.is_valid, f"Expected valid; errors={res.errors}") + + def test_u_id_coord_mismatch_fails(self): + nodes = self._nodes_gdf([("n1", 0.0, 0.0), ("n2", 1.0, 1.0)]) + # Edge starts at (9,9) but _u_id=n1 is at (0,0) + edges = self._edges_gdf([("e1", "n1", "n2", [(9.0, 9.0), (1.0, 1.0)])]) + + def rf(path): + b = os.path.basename(path) + return nodes if "nodes" in b else edges if "edges" in b else gpd.GeoDataFrame() + + res = self._run(["/tmp/nodes.geojson", "/tmp/edges.geojson"], rf) + self.assertFalse(res.is_valid) + self.assertTrue(any("_u_id mismatch" in e for e in (res.errors or []))) + + def test_v_id_coord_mismatch_fails(self): + nodes = self._nodes_gdf([("n1", 0.0, 0.0), ("n2", 1.0, 1.0)]) + # Edge ends at (8,8) but _v_id=n2 is at (1,1) + edges = self._edges_gdf([("e1", "n1", "n2", [(0.0, 0.0), (8.0, 8.0)])]) + + def rf(path): + b = os.path.basename(path) + return nodes if "nodes" in b else edges if "edges" in b else gpd.GeoDataFrame() + + res = self._run(["/tmp/nodes.geojson", "/tmp/edges.geojson"], rf) + self.assertFalse(res.is_valid) + self.assertTrue(any("_v_id mismatch" in e for e in (res.errors or []))) + + def test_mismatch_error_includes_feature_index_and_id(self): + nodes = self._nodes_gdf([("n1", 0.0, 0.0), ("n2", 1.0, 1.0)]) + edges = self._edges_gdf([("edge-xyz", "n1", "n2", [(9.0, 9.0), (1.0, 1.0)])]) + + def rf(path): + b = os.path.basename(path) + return nodes if "nodes" in b else edges if "edges" in b else gpd.GeoDataFrame() + + res = self._run(["/tmp/nodes.geojson", "/tmp/edges.geojson"], rf) + issue = next((i for i in (res.issues or []) if "_u_id mismatch" in i.get("error_message", "")), None) + self.assertIsNotNone(issue) + self.assertEqual(issue["filename"], "edges") + self.assertIsNotNone(issue["feature_index"]) + self.assertIn("edge-xyz", issue["error_message"]) + + def test_u_id_not_in_node_map_does_not_double_report(self): + """Unknown _u_id is caught by the existence check; no coord error for it.""" + nodes = self._nodes_gdf([("n1", 0.0, 0.0)]) + edges = self._edges_gdf([("e1", "ghost", "n1", [(5.0, 5.0), (0.0, 0.0)])]) + + def rf(path): + b = os.path.basename(path) + return nodes if "nodes" in b else edges if "edges" in b else gpd.GeoDataFrame() + + res = self._run(["/tmp/nodes.geojson", "/tmp/edges.geojson"], rf) + # existence check fires, but no coordinate mismatch error + coord_errs = [e for e in (res.errors or []) if "mismatch" in e] + self.assertEqual(coord_errs, []) + + def test_no_nodes_file_skips_mapping(self): + """When there are no nodes, coordinate checks are silently skipped.""" + edges = self._edges_gdf([("e1", "n1", "n2", [(0.0, 0.0), (1.0, 1.0)])]) + + def rf(path): + return edges if "edges" in os.path.basename(path) else gpd.GeoDataFrame() + + res = self._run(["/tmp/edges.geojson"], rf) + self.assertTrue(res.is_valid, f"Expected valid; errors={res.errors}") + + def test_w_id_coord_mismatch_fails(self): + ring = [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)] + nodes = self._nodes_gdf([ + ("w1", 0.0, 0.0), ("w2", 1.0, 0.0), ("w3", 1.0, 1.0), + ("w4", 9.0, 9.0), # not in ring + ]) + zones = self._zones_gdf([("z1", ["w1", "w2", "w3", "w4"], ring)]) + + def rf(path): + b = os.path.basename(path) + return nodes if "nodes" in b else zones if "zones" in b else gpd.GeoDataFrame() + + res = self._run(["/tmp/nodes.geojson", "/tmp/zones.geojson"], rf) + self.assertFalse(res.is_valid) + self.assertTrue(any("_w_id coordinate mismatch" in e for e in (res.errors or []))) + self.assertTrue(any("w4" in e for e in (res.errors or []))) + + def test_valid_zone_mapping_passes(self): + ring = [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)] + nodes = self._nodes_gdf([ + ("w1", 0.0, 0.0), ("w2", 1.0, 0.0), ("w3", 1.0, 1.0), ("w4", 0.0, 1.0), + ]) + zones = self._zones_gdf([("z1", ["w1", "w2", "w3", "w4"], ring)]) + + def rf(path): + b = os.path.basename(path) + return nodes if "nodes" in b else zones if "zones" in b else gpd.GeoDataFrame() + + res = self._run(["/tmp/nodes.geojson", "/tmp/zones.geojson"], rf) + self.assertTrue(res.is_valid, f"Expected valid; errors={res.errors}") + + def test_max_errors_caps_geometry_mapping_errors(self): + nodes = self._nodes_gdf([("n1", 0.0, 0.0), ("n2", 1.0, 1.0)]) + # 10 edges all with wrong start coordinate + edges = gpd.GeoDataFrame( + { + "_id": [f"e{i}" for i in range(10)], + "_u_id": ["n1"] * 10, + "_v_id": ["n2"] * 10, + "geometry": [LineString([(9.0, 9.0), (1.0, 1.0)])] * 10, + }, + geometry="geometry", crs="EPSG:4326", + ) + + def rf(path): + b = os.path.basename(path) + return nodes if "nodes" in b else edges if "edges" in b else gpd.GeoDataFrame() + + with patch(_PATCH_ZIP) as PZip, \ + patch(_PATCH_EV) as PVal, \ + patch(_PATCH_VALIDATE, return_value=True), \ + patch(_PATCH_READ_FILE) as PRead, \ + patch(_PATCH_DATASET_FILES, _CANON_DATASET_FILES): + z = MagicMock() + z.extract_zip.return_value = "/tmp/extracted" + z.remove_extracted_files.return_value = None + PZip.return_value = z + val = MagicMock() + val.files = ["/tmp/nodes.geojson", "/tmp/edges.geojson"] + val.externalExtensions = [] + val.is_valid.return_value = True + PVal.return_value = val + PRead.side_effect = rf + res = OSWValidation(zipfile_path="dummy.zip").validate(max_errors=3) + + self.assertFalse(res.is_valid) + self.assertLessEqual(len(res.errors), 3) + + def test_coord_within_tolerance_no_error(self): + """Coordinates within 1e-7 degrees are accepted as matching.""" + nodes = self._nodes_gdf([("n1", 0.0, 0.0), ("n2", 1.0, 1.0)]) + # Start is 5e-8 off (within tolerance) + edges = self._edges_gdf([("e1", "n1", "n2", [(5e-8, 0.0), (1.0, 1.0)])]) + + def rf(path): + b = os.path.basename(path) + return nodes if "nodes" in b else edges if "edges" in b else gpd.GeoDataFrame() + + res = self._run(["/tmp/nodes.geojson", "/tmp/edges.geojson"], rf) + self.assertTrue(res.is_valid, f"Expected valid; errors={res.errors}") + + if __name__ == "__main__": unittest.main()