diff --git a/data_validation.py b/data_validation.py index 60ad578..b534597 100644 --- a/data_validation.py +++ b/data_validation.py @@ -1,15 +1,19 @@ from prefect import task, flow, get_run_logger -from prefect.blocks.system import Secret import time as ttime -from tiled.client import from_profile +from tiled.client import from_uri + + +@task +def get_run(uid, api_key=None): + cl = from_uri("https://tiled.nsls2.bnl.gov", api_key=api_key) + run = cl["tst/raw"][uid] + return run @task(retries=2, retry_delay_seconds=10) -def read_all_streams(uid, beamline_acronym): +def read_all_streams(uid, beamline_acronym, api_key=None): logger = get_run_logger() - api_key = Secret.load("tiled-tst-api-key").get() - cl = from_profile("nsls2", api_key=api_key) - run = cl["tst"]["raw"][uid] + run = get_run(uid, api_key=api_key) logger.info(f"Validating uid {run.start['uid']}") start_time = ttime.monotonic() for stream in run: @@ -24,5 +28,5 @@ def read_all_streams(uid, beamline_acronym): @flow -def data_validation(uid): - read_all_streams(uid, beamline_acronym="tst") +def data_validation(uid, api_key=None, dry_run=False): + read_all_streams(uid, beamline_acronym="tst", api_key=api_key) diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index f56291b..880166e 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -1,20 +1,32 @@ +import os + from prefect import task, flow, get_run_logger from data_validation import data_validation from test_extra_client import get_other_docs +from dotenv import load_dotenv # from long_flow import long_flow +def get_api_key_from_env(api_key=None): + with open("/srv/container.secret", "r") as secrets: + load_dotenv(stream=secrets) + api_key = os.environ["TILED_API_KEY"] + return api_key + + @task -def log_completion(): +def log_completion(dry_run=False): logger = get_run_logger() - logger.info("Complete") + logger.info(f"Complete! Dry run = {dry_run}") @flow -def end_of_run_workflow(stop_doc): +def end_of_run_workflow(stop_doc, api_key=None, dry_run=False): uid = stop_doc["run_start"] # hello_world() - data_validation(uid, return_state=True) - get_other_docs(uid) - # long_flow(iterations=100, sleep_length=10) - log_completion() + if not api_key: + api_key = get_api_key_from_env(api_key=None) + data_validation(uid, return_state=True, api_key=api_key) + get_other_docs(uid, api_key=api_key) + # long_flow(iterations=100, sleep_length=10, dry_run=dry_run) + log_completion(dry_run=dry_run) diff --git a/long_flow.py b/long_flow.py index ca48572..4b9fd5b 100644 --- a/long_flow.py +++ b/long_flow.py @@ -1,18 +1,22 @@ -from prefect import task, flow +from prefect import task, flow, get_run_logger import time as ttime @task -def print_and_sleep(iterations, sleep_length): +def print_and_sleep(iterations, sleep_length, dry_run=False): + logger = get_run_logger() # Long running task print("Long task...") + if dry_run: + logger.info("Dry run: skipping long task") + return for i in range(int(iterations)): print(f"Iteration number {i}") ttime.sleep(int(sleep_length)) @flow(log_prints=True) -def long_flow(iterations, sleep_length): +def long_flow(iterations, sleep_length, dry_run=False): print("Starting long flow...") - print_and_sleep(iterations, sleep_length) + print_and_sleep(iterations, sleep_length, dry_run=dry_run) print("Done!") diff --git a/pixi.toml b/pixi.toml index 50e3351..6438d0d 100644 --- a/pixi.toml +++ b/pixi.toml @@ -8,6 +8,7 @@ prefect = "3.*" python = "<3.14" tiled-client = ">=0.2.3" bluesky-tiled-plugins = ">=2" +python-dotenv = ">=1.2.1,<2" [pypi-dependencies] lixtools = "==2023.1.23.0" diff --git a/prefect.yaml b/prefect.yaml index 065a601..ca5b5c6 100644 --- a/prefect.yaml +++ b/prefect.yaml @@ -14,7 +14,7 @@ pull: deployments: - name: tst-end-of-run-workflow-docker - version: 0.1.1 + version: 0.1.3 tags: - tst - main @@ -24,14 +24,12 @@ deployments: schedule: {} work_pool: job_variables: - env: - TILED_SITE_PROFILES: /nsls2/software/etc/tiled/profiles image: ghcr.io/nsls2/tst-workflows:main image_pull_policy: Always network_mode: slirp4netns userns: "keep-id:uid=402974,gid=402974" # workflow-tst:workflow-tst volumes: - /nsls2/data/tst/proposals:/nsls2/data/tst/proposals - - /nsls2/software/etc/tiled:/nsls2/software/etc/tiled + - /srv/prefect3-docker-worker-tst/app:/srv auto_remove: true name: tst-work-pool-docker diff --git a/test_extra_client.py b/test_extra_client.py index 30d0b88..8910303 100644 --- a/test_extra_client.py +++ b/test_extra_client.py @@ -1,10 +1,10 @@ from prefect import task, get_run_logger -from utils import get_tiled_client +from data_validation import get_run @task -def get_other_docs(uid): +def get_other_docs(uid, api_key=None): logger = get_run_logger() - result = get_tiled_client()["raw"][uid] + result = get_run(uid, api_key=api_key) for name, doc in result.documents(): logger.info(f"name: {name}, doc: {doc}") diff --git a/utils.py b/utils.py deleted file mode 100644 index 1cd0072..0000000 --- a/utils.py +++ /dev/null @@ -1,13 +0,0 @@ -from tiled.client import from_profile -from prefect.blocks.system import Secret - -import os - -LOCATION = "tst" - - -def get_tiled_client(): - os.environ["TILED_API_KEY"] = Secret.load(f"tiled-{LOCATION}-api-key").get() - tiled_client = from_profile("nsls2")[LOCATION] - os.environ.pop("TILED_API_KEY") - return tiled_client