Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions data_validation.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
from prefect import task, flow, get_run_logger
from prefect.blocks.system import Secret
import time as ttime
from tiled.client import from_profile
from tiled.client import from_uri


@task
def get_run(uid, api_key=None):
cl = from_uri("https://tiled.nsls2.bnl.gov", api_key=api_key)
run = cl["tst/raw"][uid]
return run


@task(retries=2, retry_delay_seconds=10)
def read_all_streams(uid, beamline_acronym):
def read_all_streams(uid, beamline_acronym, api_key=None):
logger = get_run_logger()
api_key = Secret.load("tiled-tst-api-key").get()
cl = from_profile("nsls2", api_key=api_key)
run = cl["tst"]["raw"][uid]
run = get_run(uid, api_key=api_key)
logger.info(f"Validating uid {run.start['uid']}")
start_time = ttime.monotonic()
for stream in run:
Expand All @@ -24,5 +28,5 @@ def read_all_streams(uid, beamline_acronym):


@flow
def data_validation(uid):
read_all_streams(uid, beamline_acronym="tst")
def data_validation(uid, api_key=None, dry_run=False):
read_all_streams(uid, beamline_acronym="tst", api_key=api_key)
26 changes: 19 additions & 7 deletions end_of_run_workflow.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,32 @@
import os

from prefect import task, flow, get_run_logger
from data_validation import data_validation
from test_extra_client import get_other_docs
from dotenv import load_dotenv
# from long_flow import long_flow


def get_api_key_from_env(api_key=None):
with open("/srv/container.secret", "r") as secrets:
load_dotenv(stream=secrets)
api_key = os.environ["TILED_API_KEY"]
return api_key


@task
def log_completion():
def log_completion(dry_run=False):
logger = get_run_logger()
logger.info("Complete")
logger.info(f"Complete! Dry run = {dry_run}")


@flow
def end_of_run_workflow(stop_doc):
def end_of_run_workflow(stop_doc, api_key=None, dry_run=False):
uid = stop_doc["run_start"]
# hello_world()
data_validation(uid, return_state=True)
get_other_docs(uid)
# long_flow(iterations=100, sleep_length=10)
log_completion()
if not api_key:
api_key = get_api_key_from_env(api_key=None)
data_validation(uid, return_state=True, api_key=api_key)
get_other_docs(uid, api_key=api_key)
# long_flow(iterations=100, sleep_length=10, dry_run=dry_run)
log_completion(dry_run=dry_run)
12 changes: 8 additions & 4 deletions long_flow.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
from prefect import task, flow
from prefect import task, flow, get_run_logger
import time as ttime


@task
def print_and_sleep(iterations, sleep_length):
def print_and_sleep(iterations, sleep_length, dry_run=False):
logger = get_run_logger()
# Long running task
print("Long task...")
if dry_run:
logger.info("Dry run: skipping long task")
return
for i in range(int(iterations)):
print(f"Iteration number {i}")
ttime.sleep(int(sleep_length))


@flow(log_prints=True)
def long_flow(iterations, sleep_length):
def long_flow(iterations, sleep_length, dry_run=False):
print("Starting long flow...")
print_and_sleep(iterations, sleep_length)
print_and_sleep(iterations, sleep_length, dry_run=dry_run)
print("Done!")
1 change: 1 addition & 0 deletions pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ prefect = "3.*"
python = "<3.14"
tiled-client = ">=0.2.3"
bluesky-tiled-plugins = ">=2"
python-dotenv = ">=1.2.1,<2"

[pypi-dependencies]
lixtools = "==2023.1.23.0"
6 changes: 2 additions & 4 deletions prefect.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pull:

deployments:
- name: tst-end-of-run-workflow-docker
version: 0.1.1
version: 0.1.3
tags:
- tst
- main
Expand All @@ -24,14 +24,12 @@ deployments:
schedule: {}
work_pool:
job_variables:
env:
TILED_SITE_PROFILES: /nsls2/software/etc/tiled/profiles
image: ghcr.io/nsls2/tst-workflows:main
image_pull_policy: Always
network_mode: slirp4netns
userns: "keep-id:uid=402974,gid=402974" # workflow-tst:workflow-tst
volumes:
- /nsls2/data/tst/proposals:/nsls2/data/tst/proposals
- /nsls2/software/etc/tiled:/nsls2/software/etc/tiled
- /srv/prefect3-docker-worker-tst/app:/srv
auto_remove: true
name: tst-work-pool-docker
6 changes: 3 additions & 3 deletions test_extra_client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from prefect import task, get_run_logger
from utils import get_tiled_client
from data_validation import get_run


@task
def get_other_docs(uid):
def get_other_docs(uid, api_key=None):
logger = get_run_logger()
result = get_tiled_client()["raw"][uid]
result = get_run(uid, api_key=api_key)
for name, doc in result.documents():
logger.info(f"name: {name}, doc: {doc}")
13 changes: 0 additions & 13 deletions utils.py

This file was deleted.

Loading