diff --git a/.gitignore b/.gitignore index cc04cc7..01cbee9 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,8 @@ frontend/requirements.txt frontend/output_timeseries backend/test00112233.csv frontend/api/cache +dagsterio/config/*.dev.yaml +dagsterio/dagster_home # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/backend/config.py b/backend/config.py index 96d379d..0ce94d2 100644 --- a/backend/config.py +++ b/backend/config.py @@ -557,6 +557,8 @@ def _update_output_units(self): parameter = self.parameter.lower() if parameter == "ph": self.analyte_output_units = "" + elif parameter == "waterlevels": + self.analyte_output_units = "dtwbgs (ft)" @property def start_dt(self): diff --git a/backend/persisters/geoserver.py b/backend/persisters/geoserver.py index a6a38de..f1d27fa 100644 --- a/backend/persisters/geoserver.py +++ b/backend/persisters/geoserver.py @@ -5,96 +5,17 @@ # You may not use this file except in compliance with the License. # You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # =============================================================================== -import json -import os import time from itertools import groupby +from typing import Callable -import psycopg2 from shapely.geometry.multipoint import MultiPoint from shapely.geometry.point import Point -from sqlalchemy.dialects.postgresql import JSONB, insert -from sqlalchemy.orm import declarative_base, sessionmaker, relationship +from sqlalchemy.dialects.postgresql import insert from backend.persister import BasePersister -from sqlalchemy import Column, ForeignKey, create_engine, UUID, String, Integer, Float, Date, Time -from geoalchemy2 import Geometry - -Base = declarative_base() -# dbname=db.get('dbname'), -# user=db.get('user'), -# password=db.get('password'), -# host=db.get('host'), -# port=db.get('port'), -def session_factory(connection: dict): - user = connection.get("user", "postgres") - password = connection.get("password", "") - host = connection.get("host", "localhost") - port = connection.get("port", 5432) - database = connection.get("dbname", "gis") - - url = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}" - engine = create_engine(url) - SessionFactory = sessionmaker(autocommit=False, autoflush=False, bind=engine) - return SessionFactory - - -class Location(Base): - __tablename__ = "tbl_location" - - id = Column(Integer, primary_key=True, index=True) - name = Column(String) - data_source_uid = Column(String, index=True) - - properties = Column(JSONB) - geometry = Column(Geometry(geometry_type="POINT", srid=4326)) - source_slug = Column(String, ForeignKey("tbl_sources.name")) - - source = relationship("Sources", backref="locations") - - -class Summary(Base): - __tablename__ = "tbl_summary" - - id = Column(Integer, primary_key=True, index=True) - name = Column(String) - data_source_uid = Column(String, index=True) - - properties = Column(JSONB) - geometry = Column(Geometry(geometry_type="POINT", srid=4326)) - source_slug = Column(String, ForeignKey("tbl_sources.name")) - parameter_slug = Column(String, ForeignKey("tbl_parameters.name")) - - source = relationship("Sources", backref="summaries") - - value = Column(Float) - nrecords = Column(Integer) - min = Column(Float) - max = Column(Float) - mean = Column(Float) - - latest_value = Column(Float) - latest_date = Column(Date) - latest_time = Column(Time) - - earliest_value = Column(Float) - earliest_date = Column(Date) - earliest_time = Column(Time) - - -class Parameters(Base): - __tablename__ = "tbl_parameters" - name = Column(String, primary_key=True, index=True) - units = Column(String) - - -class Sources(Base): - __tablename__ = "tbl_sources" - id = Column(Integer) - name = Column(String, primary_key=True, index=True) - convex_hull = Column(Geometry(geometry_type="POLYGON", srid=4326)) - +from backend.persisters.geoserver_db_models import session_factory, Location, Summary, Parameters, Sources class GeoServerPersister(BasePersister): def __init__(self, *args, **kwargs): @@ -102,7 +23,7 @@ def __init__(self, *args, **kwargs): self._connection = None self._connect() - def dump_sites(self, path: str): + def dump_sites(self, path: str = None): if self.sites: db = self.config.get('geoserver').get('db') dbname = db.get('db_name') @@ -111,7 +32,7 @@ def dump_sites(self, path: str): else: self.log("no sites to dump", fg="red") - def dump_summary(self, path: str): + def dump_summary(self, path: str = None): if self.records: db = self.config.get('geoserver').get('db') dbname = db.get('db_name') @@ -166,7 +87,6 @@ def _write_parameters(self): sql = insert(Parameters).values([{"name": self.config.parameter, "units": self.config.analyte_output_units}]).on_conflict_do_nothing( index_elements=[Parameters.name],) - print(sql) conn.execute(sql) conn.commit() @@ -207,19 +127,6 @@ def make_stmt(chunk): self._chunk_insert(make_stmt, records) - def _chunk_insert(self, make_stmt, records: list, chunk_size: int = 10): - for i in range(0, len(records), chunk_size): - chunk = records[i:i + chunk_size] - print(f"Writing chunk {i // chunk_size + 1} of {len(records) // chunk_size + 1}") - st = time.time() - - stmt = make_stmt(chunk) - with self._connection as conn: - conn.execute(stmt) - conn.commit() - - print('Chunk write time:', time.time() - st) - def _write_to_sites(self, records: list): """ Write records to a PostgreSQL database in optimized chunks. @@ -250,66 +157,17 @@ def make_stmt(chunk): self._chunk_insert(make_stmt, records, chunk_size) - # - # newrecords = [] - # records = sorted(records, key=lambda r: str(r.id)) - # for name, gs in groupby(records, lambda r: str(r.id)): - # gs = list(gs) - # n = len(gs) - # # print(f"Writing {n} records for {name}") - # if n>1: - # if n > len({r.source for r in gs}): - # print("Duplicate source name found. Skipping...", name, [(r.name, r.source) for r in gs]) - # continue - # newrecords.extend(gs) - # # break - # # pass - # # print("Duplicate source name found. Skipping...", name, [r.source for r in gs]) - # # break - # - # - # for i in range(0, len(newrecords), chunk_size): - # chunk = newrecords[i:i + chunk_size] - # print(f"Writing chunk {i // chunk_size + 1} of {len(records) // chunk_size + 1}") - # st = time.time() - # - # values = [ - # { - # "name": record.name, - # "data_source_uid": record.id, - # "properties": record.to_dict(keys), - # "geometry": f"SRID=4326;POINT({record.longitude} {record.latitude})", - # "source_slug": record.source, - # } - # for record in chunk - # ] - # - # # stmt = insert(Location).values(values).on_conflict_do_nothing() - # linsert = insert(Location) - # stmt = linsert.values(values).on_conflict_do_update( - # index_elements=[Location.data_source_uid], - # set_={"properties": linsert.excluded.properties} - # ) - # - # with self._connection as conn: - # conn.execute(stmt) - # conn.commit() - # - # print('Chunk write time:', time.time() - st) + def _chunk_insert(self, make_stmt: Callable, records: list, chunk_size: int = 10): + for i in range(0, len(records), chunk_size): + chunk = records[i:i + chunk_size] + print(f"Writing chunk {i // chunk_size + 1} of {len(records) // chunk_size + 1}") + st = time.time() + + stmt = make_stmt(chunk) + with self._connection as conn: + conn.execute(stmt) + conn.commit() + + print('Chunk write time:', time.time() - st) - # # Pre-serialize properties to reduce processing time - # values = [ - # (record.name, json.dumps(record.to_dict(keys)), record.longitude, record.latitude, record.source) - # for record in chunk - # ] - # - # with self._connection.cursor() as cursor: - # sql = """INSERT INTO public.tbl_location (name, properties, geometry, source_slug) - # VALUES (%s, %s, public.ST_SetSRID(public.ST_MakePoint(%s, %s), 4326), %s) - # ON CONFLICT (name) DO UPDATE SET properties = EXCLUDED.properties;""" - # cursor.executemany(sql, values) - # - # self._connection.commit() # Commit once per chunk - # print('Chunk write time:', time.time() - st) - # break # ============= EOF ============================================= diff --git a/backend/persisters/geoserver_db_models.py b/backend/persisters/geoserver_db_models.py new file mode 100644 index 0000000..acab86b --- /dev/null +++ b/backend/persisters/geoserver_db_models.py @@ -0,0 +1,100 @@ +# =============================================================================== +# Author: Jake Ross +# Copyright 2025 New Mexico Bureau of Geology & Mineral Resources +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# =============================================================================== +from geoalchemy2 import Geometry +from google.cloud.sql.connector import Connector +from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Float, Date, Time +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.orm import declarative_base, sessionmaker, relationship + +Base = declarative_base() + + +def session_factory(connection: dict): + user = connection.get("user", "postgres") + password = connection.get("password", "") + port = connection.get("port", 5432) + database = connection.get("dbname", "gis") + driver = connection.get("driver", "pg8000") + + url = f'postgresql+{driver}://' + if connection.get("cloud_sql"): + connector= Connector() + instance_connection_name = connection.get("instance_connection_name") + print("Connecting to Cloud SQL instance:", instance_connection_name) + def get_conn(): + return connector.connect( + instance_connection_name, + 'pg8000', + user=user, + password=password, + db=database, + ) + engine = create_engine(url, creator=get_conn) + else: + host = connection.get("host", "localhost") + url = f"{url}{user}:{password}@{host}:{port}/{database}" + engine = create_engine(url) + + return sessionmaker(autocommit=False, autoflush=False, bind=engine) + + +class Location(Base): + __tablename__ = "tbl_location" + + id = Column(Integer, primary_key=True, index=True) + name = Column(String) + data_source_uid = Column(String, index=True) + + properties = Column(JSONB) + geometry = Column(Geometry(geometry_type="POINT", srid=4326)) + source_slug = Column(String, ForeignKey("tbl_sources.name")) + + source = relationship("Sources", backref="locations") + + +class Summary(Base): + __tablename__ = "tbl_summary" + + id = Column(Integer, primary_key=True, index=True) + name = Column(String) + data_source_uid = Column(String, index=True) + + properties = Column(JSONB) + geometry = Column(Geometry(geometry_type="POINT", srid=4326)) + source_slug = Column(String, ForeignKey("tbl_sources.name")) + parameter_slug = Column(String, ForeignKey("tbl_parameters.name")) + + source = relationship("Sources", backref="summaries") + + value = Column(Float) + nrecords = Column(Integer) + min = Column(Float) + max = Column(Float) + mean = Column(Float) + + latest_value = Column(Float) + latest_date = Column(Date) + latest_time = Column(Time) + + earliest_value = Column(Float) + earliest_date = Column(Date) + earliest_time = Column(Time) + + +class Parameters(Base): + __tablename__ = "tbl_parameters" + name = Column(String, primary_key=True, index=True) + units = Column(String) + + +class Sources(Base): + __tablename__ = "tbl_sources" + id = Column(Integer) + name = Column(String, primary_key=True, index=True) + convex_hull = Column(Geometry(geometry_type="POLYGON", srid=4326)) +# ============= EOF ============================================= diff --git a/dagsterio/__init__.py b/dagsterio/__init__.py new file mode 100644 index 0000000..3ce460e --- /dev/null +++ b/dagsterio/__init__.py @@ -0,0 +1,44 @@ +# =============================================================================== +# Author: Jake Ross +# Copyright 2025 New Mexico Bureau of Geology & Mineral Resources +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# =============================================================================== +from os import getenv +from typing import Callable + +from backend.config import Config +from backend.unifier import unify_waterlevels, unify_analytes + + +def base_waterlevels_asset(**payload): + _unify(unify_waterlevels, 'waterlevels', payload) + + +def base_analyte_asset(param: str, **payload: object) -> None: + _unify(unify_analytes, param, payload) + + +def _get_geoserver_connection(): + return { 'db': + { + 'dbname': getenv('GEOSERVER_DBNAME'), + 'user': getenv('GEOSERVER_USER'), + 'password': getenv('GEOSERVER_PASSWORD'), + 'instance_connection_name': getenv('GEOSERVER_INSTANCE_CONNECTION_NAME'), + 'cloud_sql': True + } + } + +def _unify(func: Callable[[Config,], None], parameter: str, payload: dict): + payload['yes'] = True + payload['geoserver'] = _get_geoserver_connection() + payload['output_summary'] = True + payload['output_format']= 'geoserver' + config = Config(payload=payload) + config.parameter = parameter + config.finalize() + + func(config) +# ============= EOF ============================================= diff --git a/dagsterio/assets.py b/dagsterio/assets.py new file mode 100644 index 0000000..6456af9 --- /dev/null +++ b/dagsterio/assets.py @@ -0,0 +1,68 @@ +# =============================================================================== +# Author: Jake Ross +# Copyright 2025 New Mexico Bureau of Geology & Mineral Resources +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# =============================================================================== + + +import dagster as dg + +from dagsterio import base_analyte_asset, base_waterlevels_asset +from dagsterio.config.source_constants import ALL_SOURCES +from dagsterio.nmbgmr import tds_request_sensor, nmbgmr_tds, nmbgmr_waterlevels + + +@dg.asset +def all_tds(): + """TDS asset""" + + base_analyte_asset( + 'tds', + sources=ALL_SOURCES + ) + + +@dg.asset +def all_waterlevels(): + """Summary water levels asset""" + base_waterlevels_asset( + sources=ALL_SOURCES, + ) + +defs = dg.Definitions( + sensors=[tds_request_sensor], + assets=[all_tds, all_waterlevels, nmbgmr_tds, nmbgmr_waterlevels], + schedules=[ + dg.ScheduleDefinition( + name='all_tds', + target=dg.AssetSelection.keys("all_tds"), + cron_schedule='0 11 * * *', + execution_timezone='America/Denver', + ), + dg.ScheduleDefinition( + name='all_waterlevels', + target=dg.AssetSelection.keys("all_waterlevels"), + cron_schedule='0 12 * * *', + execution_timezone='America/Denver', + ), + + # dg.ScheduleDefinition( + # name='nmbgmr_tds', + # target=dg.AssetSelection.keys("nmbgmr_tds"), + # cron_schedule='0 3 * * *', + # execution_timezone='America/Denver', + # ), + # dg.ScheduleDefinition( + # name='nmbgmr_waterlevels', + # target=dg.AssetSelection.keys("nmbgmr_waterlevels"), + # cron_schedule='0 4 * * *', + # execution_timezone='America/Denver', + # ), + ], + # resources={ + # 'config': dg.ResourceDefinition.hardcoded_resource(Config()), + # }, +) +# ============= EOF ============================================= diff --git a/dagsterio/config/example_config.yaml b/dagsterio/config/example_config.yaml new file mode 100644 index 0000000..ccfbbda --- /dev/null +++ b/dagsterio/config/example_config.yaml @@ -0,0 +1,24 @@ +yes: True +output_format: geoserver +output_summary: True +geoserver: + db: + host: localhost + port: 5432 + dbname: + user: + password: + +sources: + bernco: True + bor: True + cabq: True + ebid: False + nmbgmr_amp: False + nmed_dwb: False + nmose_isc_seven_rivers: True + nmose_roswell: False + nwis: False + pvacd: True + wqp: False + nmose_pod: False diff --git a/dagsterio/config/source_constants.py b/dagsterio/config/source_constants.py new file mode 100644 index 0000000..95ccce7 --- /dev/null +++ b/dagsterio/config/source_constants.py @@ -0,0 +1,38 @@ +# =============================================================================== +# Author: Jake Ross +# Copyright 2025 New Mexico Bureau of Geology & Mineral Resources +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# =============================================================================== + +ALL_SOURCES = { + 'bernco': True, + 'bor': True, + 'cabq': True, + 'ebid': True, + 'nmbgmr_amp': True, + 'nmed_dwb': True, + 'nmose_isc_seven_rivers': True, + 'nmose_roswell': True, + 'nwis': True, + 'pvacd': True, + 'wqp': True, + 'nmose_pod': False, +} + +NMBGMR_SOURCES = { + 'bernco': False, + 'bor': False, + 'cabq': False, + 'ebid': False, + 'nmbgmr_amp': True, + 'nmed_dwb': False, + 'nmose_isc_seven_rivers': False, + 'nmose_roswell': False, + 'nwis': False, + 'pvacd': False, + 'wqp': False, + 'nmose_pod': False, +} +# ============= EOF ============================================= diff --git a/dagsterio/nmbgmr.py b/dagsterio/nmbgmr.py new file mode 100644 index 0000000..45b300e --- /dev/null +++ b/dagsterio/nmbgmr.py @@ -0,0 +1,71 @@ +# =============================================================================== +# Author: Jake Ross +# Copyright 2025 New Mexico Bureau of Geology & Mineral Resources +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# =============================================================================== +import json + +import dagster as dg +import httpx + +from dagsterio import base_waterlevels_asset, base_analyte_asset +from dagsterio.config.source_constants import NMBGMR_SOURCES + + +@dg.asset +def nmbgmr_waterlevels(): + """NMBGMR water levels asset""" + + base_waterlevels_asset( + sources=NMBGMR_SOURCES, + ) + + +@dg.asset +def nmbgmr_tds(): + """NMBGMR TDS asset""" + base_analyte_asset( + 'tds', + sources=NMBGMR_SOURCES, + ) + + +def get_latest_analyte(param: str, state: dict): + url = 'http://localhost:8009/latest/stats/majorchemistry' + queryparams = {'analyte': param} + resp = httpx.get(url, params=queryparams) + return resp.json().get('count', 0) + + +request_job = dg.define_asset_job( + name='nmbgmr_tds_job', + selection=dg.AssetSelection.assets("nmbgmr_tds"), +) + +@dg.sensor(job=request_job, minimum_interval_seconds=3600) +def tds_request_sensor(context: dg.SensorEvaluationContext): + return analyte_sensor('tds', context) + + +def analyte_sensor(param, context: dg.SensorEvaluationContext): + if context.cursor: + return None + + previous_state = json.loads(context.cursor) if context.cursor else {} + current_state = {} + runs = [] + + latest = get_latest_analyte(param, previous_state) + if latest: + key = f'latest_{param}' + current_state[key] = latest + if latest > previous_state.get(key, 0): + runs.append(dg.RunRequest(run_key=param)) + + return dg.SensorResult( + run_requests=runs, cursor=json.dumps(current_state) + ) + +# ============= EOF ============================================= diff --git a/frontend/cli.py b/frontend/cli.py index 68d8d4a..6dc1bef 100644 --- a/frontend/cli.py +++ b/frontend/cli.py @@ -267,26 +267,27 @@ def weave( config.parameter = parameter - # output type - if output == "summary": - summary = True - timeseries_unified = False - timeseries_separated = False - elif output == "timeseries_unified": - summary = False - timeseries_unified = True - timeseries_separated = False - elif output == "timeseries_separated": - summary = False - timeseries_unified = False - timeseries_separated = True - else: - click.echo(f"Invalid output type: {output}") - return + if config_path is None: + # output type + if output == "summary": + summary = True + timeseries_unified = False + timeseries_separated = False + elif output == "timeseries_unified": + summary = False + timeseries_unified = True + timeseries_separated = False + elif output == "timeseries_separated": + summary = False + timeseries_unified = False + timeseries_separated = True + else: + click.echo(f"Invalid output type: {output}") + return - config.output_summary = summary - config.output_timeseries_unified = timeseries_unified - config.output_timeseries_separated = timeseries_separated + # config.output_summary = summary + # config.output_timeseries_unified = timeseries_unified + # config.output_timeseries_separated = timeseries_separated config_agencies, false_agencies = config.get_config_and_false_agencies() diff --git a/requirements.txt b/requirements.txt index 4e9f7c5..f6d013e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,6 @@ geopandas frost_sta_client google-cloud-storage pytest -urllib3>=2.2.0,<3.0.0 \ No newline at end of file +urllib3>=2.2.2,<3.0.0 +cloud-sql-python-connector[pg8000] +pg8000 \ No newline at end of file