diff --git a/README.md b/README.md index bc4dbeb..a5d058d 100644 --- a/README.md +++ b/README.md @@ -688,6 +688,61 @@ The system is designed to scale horizontally. The conserver service can be scale docker compose up --scale conserver=4 -d ``` +## SCITT Lifecycle Registration + +The `links.scitt` module registers vCon lifecycle events on a [SCRAPI](https://datatracker.ietf.org/doc/draft-ietf-scitt-scrapi/)-compatible transparency service, creating an immutable audit trail per [draft-howe-vcon-lifecycle](https://www.ietf.org/archive/id/draft-howe-vcon-lifecycle-00.html). + +Each registration creates a COSE Sign1 signed statement from the vCon's SHA-256 hash and registers it via `POST /entries`. The receipt is stored as a `scitt_receipt` analysis entry on the vCon. + +### Configuration + +```yaml +links: + scitt_created: + module: links.scitt + options: + scrapi_url: http://scittles:8000 # SCRAPI service URL + signing_key_path: /etc/scitt/signing-key.pem # EC P-256 key + issuer: conserver # CWT issuer claim + key_id: conserver-key-1 # COSE key ID + vcon_operation: vcon_created # Lifecycle event type + + scitt_enhanced: + module: links.scitt + options: + scrapi_url: http://scittles:8000 + signing_key_path: /etc/scitt/signing-key.pem + issuer: conserver + key_id: conserver-key-1 + vcon_operation: vcon_enhanced +``` + +Use two instances in a chain to capture the vCon hash before and after transcription: + +```yaml +chains: + transcription_chain: + links: + - tag + - scitt_created # Hash before transcription + - wtf_transcribe + - keyword_tagger + - scitt_enhanced # Hash after transcription + - expire_vcon +``` + +### Signing Key + +Generate an EC P-256 signing key: + +```bash +openssl ecparam -name prime256v1 -genkey -noout -out scitt-signing-key.pem +``` + +### Transparency Service + +The link is compatible with any SCRAPI service. [SCITTLEs](https://github.com/vcon-dev/scittles) is a lightweight, self-hosted option using SQLite. + ## Storage Modules ### PostgreSQL Storage diff --git a/server/links/scitt/__init__.py b/server/links/scitt/__init__.py index ff010eb..b42cef2 100644 --- a/server/links/scitt/__init__.py +++ b/server/links/scitt/__init__.py @@ -1,28 +1,22 @@ import os -import requests from links.scitt import create_hashed_signed_statement, register_signed_statement -from datetime import datetime, timedelta, timezone from fastapi import HTTPException from lib.vcon_redis import VconRedis from lib.logging_utils import init_logger -from starlette.status import HTTP_404_NOT_FOUND, HTTP_501_NOT_IMPLEMENTED - -import hashlib -import json -import requests +from starlette.status import HTTP_404_NOT_FOUND logger = init_logger(__name__) # Increment for any API/attribute changes -link_version = "0.1.0" +link_version = "0.3.0" default_options = { - "client_id": "", - "client_secret": "", - "scrapi_url": "https://app.datatrails.ai/archivist/v2", - "auth_url": "https://app.datatrails.ai/archivist/iam/v1/appidp/token", - "signing_key_path": None, - "issuer": "ANONYMOUS CONSERVER" + "scrapi_url": "http://scittles:8000", + "signing_key_path": "/etc/scitt/signing-key.pem", + "issuer": "conserver", + "key_id": "conserver-key-1", + "vcon_operation": "vcon_created", + "store_receipt": True, } def run( @@ -31,98 +25,105 @@ def run( opts: dict = default_options ) -> str: """ - Main function to run the SCITT link. + SCITT lifecycle registration link. + + Creates a COSE Sign1 signed statement from the vCon hash and registers + it on a SCRAPI-compatible Transparency Service (SCITTLEs). - This function creates a SCITT Signed Statement based on the vCon data, - registering it on a SCITT Transparency Service. + The vcon_operation option controls the lifecycle event type: + - "vcon_created": registered before transcription + - "vcon_enhanced": registered after transcription Args: - vcon_uuid (str): UUID of the vCon to process. - link_name (str): Name of the link (for logging purposes). - opts (dict): Options for the link, including API URLs and credentials. + vcon_uuid: UUID of the vCon to process. + link_name: Name of the link instance (for logging). + opts: Configuration options. Returns: - str: The UUID of the processed vCon. - - Raises: - ValueError: If client_id or client_secret is not provided in the options. + The UUID of the processed vCon. """ module_name = __name__.split(".")[-1] - logger.info(f"Starting {module_name}: {link_name} plugin for: {vcon_uuid}") + logger.info(f"Starting {module_name}: {link_name} for: {vcon_uuid}") merged_opts = default_options.copy() merged_opts.update(opts) opts = merged_opts - if not opts["client_id"] or not opts["client_secret"]: - raise ValueError(f"{module_name} client ID and client secret must be provided") - - # Get the vCon + # Get the vCon from Redis vcon_redis = VconRedis() vcon = vcon_redis.get_vcon(vcon_uuid) if not vcon: - logger.info(f"{link_name}: vCon not found: {vcon_uuid}") + logger.info(f"{link_name}: vCon not found: {vcon_uuid}") raise HTTPException( status_code=HTTP_404_NOT_FOUND, detail=f"vCon not found: {vcon_uuid}" ) - ############################### - # Create a Signed Statement - ############################### - - # Set the subject to the vcon identifier - subject = vcon.subject or f"vcon://{vcon_uuid}" - - # SCITT metadata for the vCon - meta_map = { - "vcon_operation" : opts["vcon_operation"] - } - # Set the payload to the hash of the vCon consistent with - # cose-hash-envelope: https://datatracker.ietf.org/doc/draft-steele-cose-hash-envelope - + # Build per-participant SCITT registrations payload = vcon.hash - # TODO: pull hash_alg from the vcon - payload_hash_alg = "SHA-256" - # TODO: pull the payload_location from the vcon.url - payload_location = "" # vcon.url - - key_id = opts["key_id"] + operation = opts["vcon_operation"] - signing_key_path = os.path.join(opts["signing_key_path"]) + signing_key_path = opts["signing_key_path"] signing_key = create_hashed_signed_statement.open_signing_key(signing_key_path) - signed_statement = create_hashed_signed_statement.create_hashed_signed_statement( - issuer=opts["issuer"], - signing_key=signing_key, - subject=subject, - kid=key_id.encode('utf-8'), - meta_map=meta_map, - payload=payload.encode('utf-8'), - payload_hash_alg=payload_hash_alg, - payload_location=payload_location, - pre_image_content_type="application/vcon+json" - ) - logger.info(f"signed_statement: {signed_statement}") - - ############################### - # Register the Signed Statement - ############################### - - # Construct an OIDC Auth Object - oidc_flow = opts["OIDC_flow"] - if oidc_flow == "client-credentials": - auth = register_signed_statement.OIDC_Auth(opts) - else: - raise HTTPException( - status_code=HTTP_501_NOT_IMPLEMENTED, - detail=f"OIDC_flow not found or unsupported. OIDC_flow: {oidc_flow}" + # Collect tel URIs from parties (Party objects use attrs, dicts use keys) + party_tels = [] + for party in (vcon.parties or []): + tel = party.get("tel") if isinstance(party, dict) else getattr(party, "tel", None) + if tel: + party_tels.append(tel) + else: + logger.warning(f"{link_name}: party without tel in {vcon_uuid}, skipping") + + # Fall back to vcon:// subject if no parties have tel + if not party_tels: + party_tels = [None] + + scrapi_url = opts["scrapi_url"] + receipts = [] + + for tel in party_tels: + if tel: + subject = f"tel:{tel}" + operation_payload = f"{payload}:{operation}:{tel}" + meta_map = {"vcon_operation": operation, "party_tel": tel} + else: + subject = f"vcon://{vcon_uuid}" + operation_payload = f"{payload}:{operation}" + meta_map = {"vcon_operation": operation} + + signed_statement = create_hashed_signed_statement.create_hashed_signed_statement( + issuer=opts["issuer"], + signing_key=signing_key, + subject=subject, + kid=opts["key_id"].encode("utf-8"), + meta_map=meta_map, + payload=operation_payload.encode("utf-8"), + payload_hash_alg="SHA-256", + payload_location="", + pre_image_content_type="application/vcon+json", ) - - operation_id = register_signed_statement.register_statement( - opts=opts, - auth=auth, - signed_statement=signed_statement - ) - logger.info(f"operation_id: {operation_id}") + logger.info(f"{link_name}: Created signed statement for {vcon_uuid} subject={subject} ({operation})") + + result = register_signed_statement.register_statement(scrapi_url, signed_statement) + logger.info(f"{link_name}: Registered entry_id={result['entry_id']} subject={subject} for {vcon_uuid}") + + receipts.append({ + "entry_id": result["entry_id"], + "vcon_operation": operation, + "subject": subject, + "vcon_hash": payload, + "scrapi_url": scrapi_url, + }) + + # Store receipts as analysis entry on the vCon + if opts.get("store_receipt", True): + vcon.add_analysis( + type="scitt_receipt", + dialog=0, + vendor="scittles", + body=receipts if len(receipts) > 1 else receipts[0], + ) + vcon_redis.store_vcon(vcon) + logger.info(f"{link_name}: Stored {len(receipts)} SCITT receipt(s) for {vcon_uuid}") return vcon_uuid diff --git a/server/links/scitt/register_signed_statement.py b/server/links/scitt/register_signed_statement.py index 3f55fd0..c8058f4 100755 --- a/server/links/scitt/register_signed_statement.py +++ b/server/links/scitt/register_signed_statement.py @@ -1,331 +1,123 @@ -""" Module for submitting a SCITT signed statement to the - DataTrails Transparency Service and optionally returning - a Transparent Statement """ +"""Module for submitting a SCITT signed statement to a + SCRAPI-compatible Transparency Service (e.g. SCITTLEs) + and returning the entry ID and receipt.""" -import argparse import logging -import os -import sys -import datetime from time import sleep as time_sleep -from pycose.messages import Sign1Message import requests -# Increment for any API/attribute changes -link_version = "0.1.0" - -# CWT header label comes from version 4 of the scitt architecture document -# https://www.ietf.org/archive/id/draft-ietf-scitt-architecture-04.html#name-issuer-identity -HEADER_LABEL_CWT = 13 - -# Various CWT header labels come from: -# https://www.rfc-editor.org/rfc/rfc8392.html#section-3.1 -HEADER_LABEL_CWT_ISSUER = 1 -HEADER_LABEL_CWT_SUBJECT = 2 - -# CWT CNF header labels come from: -# https://datatracker.ietf.org/doc/html/rfc8747#name-confirmation-claim -HEADER_LABEL_CWT_CNF = 8 -HEADER_LABEL_CNF_COSE_KEY = 1 - # all timeouts and durations are in seconds REQUEST_TIMEOUT = 30 POLL_TIMEOUT = 60 POLL_INTERVAL = 10 -logger = logging.getLogger("check operation status") -logging.basicConfig(level=logging.getLevelName("INFO")) - -class OIDC_Auth: - """ - Handles authentication for SCRAPI API, including token management and refresh. - """ - - def __init__(self, opts:dict): - """ - Initialize the OIDC Auth object - - Args: - opts (dict) containing - auth_url, client_id, client_secret - for the OIDC API - """ - - self.auth_url = opts["auth_url"] - self.client_id = opts["client_id"] - self.client_secret = opts["client_secret"] - self.token = None - self.token_expiry = None +logger = logging.getLogger(__name__) - def get_token(self): - """ - Get a valid authentication token, refreshing if necessary - Returns: - str: A valid authentication token. - """ - if self.token is None or datetime.now() >= self.token_expiry: - self._refresh_token() - return self.token - - def _refresh_token(self): - """ - Refresh the authentication token and update the token file - """ - data = { - "grant_type": "client_credentials", - "client_id": self.client_id, - "client_secret": self.client_secret, - } - response = requests.post( - self.auth_url, - data=data, - timeout=REQUEST_TIMEOUT - ) - if response.status_code != 200: - logger.error("FAILED to acquire bearer token") - logger.debug(response) - sys.exit(1) - response.raise_for_status() - - token_data = response.json() - self.token = token_data["access_token"] - # Set token expiry to 5 minutes before actual expiry for safety - self.token_expiry = datetime.now() + timedelta( - seconds=token_data["expires_in"] - 300 - ) - -def get_dt_auth_header() -> str: - """ - Get DataTrails bearer token from OIDC credentials in env +def register_statement(scrapi_url: str, signed_statement: bytes) -> dict: """ - # Pick up credentials from env - client_id = os.environ.get("DATATRAILS_CLIENT_ID") - client_secret = os.environ.get("DATATRAILS_CLIENT_SECRET") + Register a COSE Sign1 signed statement via SCRAPI. - if client_id is None or client_secret is None: - logger.error( - "Please configure your DataTrails credentials in the shell environment" - ) - sys.exit(1) - - # Get token from the auth endpoint - response = requests.post( - "https://app.datatrails.ai/archivist/iam/v1/appidp/token", - data={ - "grant_type": "client_credentials", - "client_id": client_id, - "client_secret": client_secret, - }, - timeout=REQUEST_TIMEOUT, - ) - if response.status_code != 200: - logger.error("FAILED to acquire bearer token") - logger.debug(response) - sys.exit(1) - - # Format as a request header - res = response.json() - return f'{res["token_type"]} {res["access_token"]}' - - -def register_statement( - opts: dict, - auth: OIDC_Auth, - signed_statement: bytes -) -> str: - """ - Register the SCITT Signed Statement + Posts the signed statement to the /entries endpoint and handles + both synchronous (201) and asynchronous (303) responses. Args: - opts (dict): Configuration, including the base URL for the DataTrails API. - auth (DataTrailsAuth): Authentication object for DataTrails API. - signed_statement (str): The contents of the signed statement to be posted + scrapi_url: Base URL of the SCRAPI service (e.g. http://scittles:8000) + signed_statement: CBOR-encoded COSE Sign1 bytes Returns: - str: The operation ID to poll for completion, and receipts + dict with "entry_id" (str) and "receipt" (bytes) Raises: - requests.HTTPError: If the API request fails + requests.HTTPError: If the registration request fails + TimeoutError: If async registration doesn't complete in time """ - - logger.info("in register_statement") - - headers = { - "Authorization": f"Bearer {auth.get_token()}", - "DataTrails-User-Agent": "oss/conserverlink/" + link_version, - "DataTrails-Partner-ID": opts["partner_id"], - "Content-Type": "application/json", - } - api_url = opts["api_url"] - - # Make the POST request response = requests.post( - url=api_url, - headers=headers, + f"{scrapi_url}/entries", data=signed_statement, - timeout=REQUEST_TIMEOUT + headers={"Content-Type": "application/cose"}, + timeout=REQUEST_TIMEOUT, ) - if response.status_code != 200: - logger.error("FAILED to submit statement") - logger.debug(response) - sys.exit(1) - response.raise_for_status() + if response.status_code == 201: + # Synchronous registration — receipt in body, entry_id in Location header + entry_id = response.headers.get("Location", "").rsplit("/", 1)[-1] + return {"entry_id": entry_id, "receipt": response.content} - res = response.json() - if not "operationID" in res: - logger.error("FAILED No OperationID locator in response") - logger.debug(res) - sys.exit(1) + elif response.status_code == 303: + # Asynchronous registration — poll for completion + location = response.headers["Location"] + entry_id = wait_for_entry_id(scrapi_url, location) + receipt = get_receipt(scrapi_url, entry_id) + return {"entry_id": entry_id, "receipt": receipt} - return res["operationID"] + else: + response.raise_for_status() -def get_operation_status(operation_id: str, headers: dict) -> dict: - """ - Gets the status of a long-running registration operation +def wait_for_entry_id(scrapi_url: str, operation_location: str) -> str: """ - response = requests.get( - f"https://app.datatrails.ai/archivist/v1/publicscitt/operations/{operation_id}", - headers=headers, - timeout=REQUEST_TIMEOUT, - ) - - response.raise_for_status() + Poll for an async registration operation to complete. - return response.json() + Args: + scrapi_url: Base URL of the SCRAPI service + operation_location: Location header value from the 303 response + Returns: + The entry_id once registration succeeds -def wait_for_entry_id(operation_id: str, headers: dict) -> str: - """ - Polls for the operation status to be 'succeeded'. + Raises: + TimeoutError: If the operation doesn't complete within POLL_TIMEOUT """ + poll_attempts = int(POLL_TIMEOUT / POLL_INTERVAL) - poll_attempts: int = int(POLL_TIMEOUT / POLL_INTERVAL) - if not logger: - print("logger not set") + # Resolve relative or absolute URL + if operation_location.startswith("http"): + poll_url = operation_location + else: + poll_url = f"{scrapi_url}{operation_location}" - logger.info("starting to poll for operation status 'succeeded'") + logger.info("Polling for registration completion at %s", poll_url) for _ in range(poll_attempts): - try: - operation_status = get_operation_status(operation_id, headers) - - # pylint: disable=fixme - # TODO: ensure get_operation_status handles error cases from the rest request - if ( - "status" in operation_status - and operation_status["status"] == "succeeded" - ): - return operation_status["entryID"] - - except requests.HTTPError as e: - logger.debug("failed getting operation status, error: %s", e) + response = requests.get(poll_url, timeout=REQUEST_TIMEOUT) + if response.status_code == 200: + # Operation complete — extract entry_id + data = response.json() + if "entryID" in data: + return data["entryID"] + elif "entry_id" in data: + return data["entry_id"] + # Fall back to extracting from URL + return poll_url.rsplit("/", 1)[-1] + elif response.status_code == 202: + # Still processing + logger.debug("Registration still pending...") + except requests.RequestException as e: + logger.debug("Failed polling operation status: %s", e) time_sleep(POLL_INTERVAL) - raise TimeoutError("signed statement not registered within polling duration") + raise TimeoutError("Signed statement not registered within polling duration") -def attach_receipt( - entry_id: str, - signed_statement_filepath: str, - transparent_statement_file_path: str, - headers: dict -): +def get_receipt(scrapi_url: str, entry_id: str) -> bytes: """ - Given a Signed Statement and a corresponding Entry ID, fetch a Receipt from - the Transparency Service and write out a complete Transparent Statement + Fetch the COSE receipt for a registered entry. + + Args: + scrapi_url: Base URL of the SCRAPI service + entry_id: The entry identifier + + Returns: + COSE receipt bytes """ - # Get the receipt response = requests.get( - f"https://app.datatrails.ai/archivist/v1/publicscitt/entries/{entry_id}/receipt", - headers=headers, + f"{scrapi_url}/entries/{entry_id}", + headers={"Accept": "application/cose"}, timeout=REQUEST_TIMEOUT, ) - if response.status_code != 200: - logger.error("FAILED to get receipt") - logger.debug(response) - sys.exit(1) - - logger.debug(response.content) - - # Open up the signed statement - with open(signed_statement_filepath, "rb") as data_file: - data = data_file.read() - message = Sign1Message.decode(data) - logger.debug(message) - - # Add receipt to the unprotected header and re-encode - message.uhdr["receipts"] = [response.content] - ts = message.encode(sign=False) - - # Write out the updated Transparent Statement - with open(transparent_statement_file_path, "wb") as file: - file.write(ts) - logger.info("File saved successfully") - - -def main(): - """Creates a Transparent Statement""" - - parser = argparse.ArgumentParser(description="Create a signed statement.") - - # Signed Statement file - parser.add_argument( - "--signed-statement-file", - type=str, - help="filepath to the Signed Statement to be registered.", - default="signed-statement.cbor", - ) - - # Output file - parser.add_argument( - "--output-file", - type=str, - help="output file to store the Transparent Statement (leave blank to skip saving).", - default="", - ) - - # log level - parser.add_argument( - "--log-level", - type=str, - help="log level. for any individual poll errors use DEBUG, defaults to WARNING", - default="WARNING", - ) - - args = parser.parse_args() - - # logger = logging.getLogger("check operation status") - # logging.basicConfig(level=logging.getLevelName(args.log_level)) - - # Submit Signed Statement to DataTrails - - op_id = register_statement(args.signed_statement_file) - logger.info("Successfully submitted with Operation ID %s", op_id) - - # If the client wants the Transparent Statement, wait for it - if args.output_file != "": - logger.info("Now waiting for registration to complete") - - # Wait for the registration to complete - try: - entry_id = wait_for_entry_id(op_id, auth_headers) - except TimeoutError as e: - logger.error(e) - sys.exit(1) - - logger.info("Fully Registered with Entry ID %s", entry_id) - - # Attach the receipt - attach_receipt( - entry_id, args.signed_statement_file, args.output_file, auth_headers - ) - - -if __name__ == "__main__": - main() + response.raise_for_status() + return response.content diff --git a/server/links/scitt/tests/__init__.py b/server/links/scitt/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/server/links/scitt/tests/test_scitt.py b/server/links/scitt/tests/test_scitt.py new file mode 100644 index 0000000..516a553 --- /dev/null +++ b/server/links/scitt/tests/test_scitt.py @@ -0,0 +1,354 @@ +""" +Unit tests for the SCITT link — SCRAPI-based lifecycle registration. + +Tests cover: +- register_signed_statement: SCRAPI POST /entries (sync 201, async 303, errors) +- __init__.run: full link flow with mocked Redis and SCRAPI +- Receipt storage as scitt_receipt analysis entries + +NOTE on mock paths: The conserver's __init__.py uses a relative import +(``from links.scitt import register_signed_statement``), which registers +the module under ``links.scitt.register_signed_statement`` in sys.modules. +All @patch targets must use this path — NOT the ``server.links.scitt.…`` +path that the test file's own imports resolve to — otherwise the mock is +applied to a duplicate module object and the production code never sees it. +""" + +import pytest +from unittest.mock import Mock, patch, MagicMock +from requests import Response + +from server.links.scitt import register_signed_statement +from server.links.scitt import run, default_options + +# Use a non-routable URL (RFC 6761) so if mocks fail, tests get a +# ConnectionError instead of hitting the live SCITTLEs container. +SCRAPI_URL = "http://scrapi.test.invalid:9999" + +# The __init__.py does ``from links.scitt import register_signed_statement``, +# so the actual submodules are registered under these paths in sys.modules. +# Patching attributes on these module objects works for both ``links.scitt`` +# and ``server.links.scitt`` callers because they share the same objects. +_RSM = "links.scitt.register_signed_statement" +_CHSS = "links.scitt.create_hashed_signed_statement" + + +# ---------------------------- +# register_signed_statement tests +# ---------------------------- + +class TestRegisterStatement: + """Tests for register_signed_statement.register_statement()""" + + def _make_response(self, status_code, content=b"", headers=None): + resp = Response() + resp.status_code = status_code + resp._content = content + if headers: + resp.headers.update(headers) + return resp + + @patch(f"{_RSM}.requests.post") + def test_sync_201_returns_entry_id_and_receipt(self, mock_post): + """201 Created: entry_id from Location header, receipt from body.""" + mock_post.return_value = self._make_response( + 201, + content=b"\xd2\x84\x43", # fake COSE bytes + headers={"Location": "/entries/abc123def456"}, + ) + + result = register_signed_statement.register_statement( + SCRAPI_URL, b"\xd2\x84" + ) + + assert result["entry_id"] == "abc123def456" + assert result["receipt"] == b"\xd2\x84\x43" + mock_post.assert_called_once_with( + f"{SCRAPI_URL}/entries", + data=b"\xd2\x84", + headers={"Content-Type": "application/cose"}, + timeout=register_signed_statement.REQUEST_TIMEOUT, + ) + + @patch(f"{_RSM}.time_sleep") + @patch(f"{_RSM}.requests.get") + @patch(f"{_RSM}.requests.post") + def test_async_303_polls_and_fetches_receipt(self, mock_post, mock_get, mock_sleep): + """303 See Other: poll for entry_id, then fetch receipt.""" + mock_post.return_value = self._make_response( + 303, + headers={"Location": "/operations/op-789"}, + ) + + # First GET: poll returns 200 with entry_id + resp_poll = Mock() + resp_poll.status_code = 200 + resp_poll.json.return_value = {"entryID": "entry-xyz"} + + # Second GET: receipt fetch + resp_receipt = Mock() + resp_receipt.status_code = 200 + resp_receipt.content = b"\xd2\x84\x44" + resp_receipt.raise_for_status = Mock() + + mock_get.side_effect = [resp_poll, resp_receipt] + + result = register_signed_statement.register_statement( + SCRAPI_URL, b"\xd2\x84" + ) + + assert result["entry_id"] == "entry-xyz" + assert result["receipt"] == b"\xd2\x84\x44" + # Verify sleep was NOT called (poll succeeded on first attempt) + mock_sleep.assert_not_called() + + @patch(f"{_RSM}.requests.post") + def test_error_status_raises(self, mock_post): + """Non-201/303 responses raise HTTPError.""" + resp = self._make_response(400, content=b"Bad Request") + resp.url = f"{SCRAPI_URL}/entries" + mock_post.return_value = resp + + with pytest.raises(Exception): + register_signed_statement.register_statement( + SCRAPI_URL, b"\xd2\x84" + ) + + +class TestWaitForEntryId: + """Tests for register_signed_statement.wait_for_entry_id()""" + + @patch(f"{_RSM}.time_sleep") + @patch(f"{_RSM}.requests.get") + def test_polls_until_200(self, mock_get, mock_sleep): + """Returns entry_id when poll returns 200 with entryID.""" + resp_pending = Mock() + resp_pending.status_code = 202 + + resp_done = Mock() + resp_done.status_code = 200 + resp_done.json.return_value = {"entryID": "final-entry-id"} + + mock_get.side_effect = [resp_pending, resp_pending, resp_done] + + result = register_signed_statement.wait_for_entry_id( + SCRAPI_URL, "/operations/op-1" + ) + + assert result == "final-entry-id" + assert mock_get.call_count == 3 + assert mock_sleep.call_count == 2 + + @patch(f"{_RSM}.time_sleep") + @patch(f"{_RSM}.requests.get") + def test_timeout_raises(self, mock_get, mock_sleep): + """Raises TimeoutError if polling exhausts all attempts.""" + resp_pending = Mock() + resp_pending.status_code = 202 + mock_get.return_value = resp_pending + + with pytest.raises(TimeoutError, match="not registered"): + register_signed_statement.wait_for_entry_id( + SCRAPI_URL, "/operations/op-1" + ) + + @patch(f"{_RSM}.time_sleep") + @patch(f"{_RSM}.requests.get") + def test_handles_absolute_url(self, mock_get, mock_sleep): + """Supports absolute URLs in the Location header.""" + resp = Mock() + resp.status_code = 200 + resp.json.return_value = {"entry_id": "abs-entry"} + mock_get.return_value = resp + + result = register_signed_statement.wait_for_entry_id( + SCRAPI_URL, f"{SCRAPI_URL}/operations/op-1" + ) + + assert result == "abs-entry" + mock_get.assert_called_once_with( + f"{SCRAPI_URL}/operations/op-1", + timeout=register_signed_statement.REQUEST_TIMEOUT, + ) + + +class TestGetReceipt: + """Tests for register_signed_statement.get_receipt()""" + + @patch(f"{_RSM}.requests.get") + def test_returns_receipt_bytes(self, mock_get): + resp = Mock() + resp.status_code = 200 + resp.content = b"\xd2receipt" + resp.raise_for_status = Mock() + mock_get.return_value = resp + + result = register_signed_statement.get_receipt(SCRAPI_URL, "entry-1") + + assert result == b"\xd2receipt" + mock_get.assert_called_once_with( + f"{SCRAPI_URL}/entries/entry-1", + headers={"Accept": "application/cose"}, + timeout=register_signed_statement.REQUEST_TIMEOUT, + ) + + +# ---------------------------- +# SCITT link run() tests +# ---------------------------- + +# Patching run() dependencies requires two different prefixes due to a +# dual-module situation: __init__.py is loaded as BOTH ``server.links.scitt`` +# (via pytest's test imports) and ``links.scitt`` (via the conserver's internal +# relative import ``from links.scitt import …``). +# +# - Submodule *attributes* (e.g. register_signed_statement.register_statement) +# can be patched via _RSM because the submodule object is shared — both +# module entries hold a reference to the same object. +# - Names imported directly into __init__.py (e.g. VconRedis) must be patched +# on ``server.links.scitt`` because that's the module whose __dict__ the +# ``run()`` function resolves globals from. +_SCITT_INIT = "server.links.scitt" + + +class TestScittLinkRun: + """Tests for the SCITT link run() function.""" + + @pytest.fixture + def mock_vcon(self): + vcon = Mock() + vcon.uuid = "test-uuid-1234" + vcon.subject = "tel:+15551234567" + vcon.hash = "a1b2c3d4e5f6abcdef1234567890abcdef1234567890abcdef1234567890abcd" + vcon.add_analysis = Mock() + # Per-participant SCITT iterates over vcon.parties; must be a list + vcon.parties = [{"tel": "+15551234567"}] + return vcon + + @pytest.fixture + def mock_redis(self, mock_vcon): + with patch(f"{_SCITT_INIT}.VconRedis") as mock_cls: + redis_inst = Mock() + redis_inst.get_vcon.return_value = mock_vcon + mock_cls.return_value = redis_inst + yield redis_inst + + @patch(f"{_RSM}.register_statement") + @patch(f"{_CHSS}.create_hashed_signed_statement") + @patch(f"{_CHSS}.open_signing_key") + def test_run_registers_and_stores_receipt( + self, mock_open_key, mock_create_stmt, mock_register, mock_redis, mock_vcon + ): + """Full run: creates signed statement, registers, stores receipt.""" + mock_open_key.return_value = Mock() + mock_create_stmt.return_value = b"\xd2signed" + mock_register.return_value = { + "entry_id": "entry-abc123", + "receipt": b"\xd2receipt", + } + + opts = { + "scrapi_url": SCRAPI_URL, + "signing_key_path": "/etc/scitt/signing-key.pem", + "issuer": "conserver", + "key_id": "conserver-key-1", + "vcon_operation": "vcon_created", + "store_receipt": True, + } + + result = run("test-uuid-1234", "scitt_created", opts) + + assert result == "test-uuid-1234" + + # Verify signed statement was created with correct args + mock_create_stmt.assert_called_once() + call_kwargs = mock_create_stmt.call_args + assert call_kwargs.kwargs["issuer"] == "conserver" + assert call_kwargs.kwargs["subject"] == "tel:+15551234567" + assert call_kwargs.kwargs["meta_map"] == {"vcon_operation": "vcon_created"} + assert call_kwargs.kwargs["pre_image_content_type"] == "application/vcon+json" + + # Verify registration + mock_register.assert_called_once_with(SCRAPI_URL, b"\xd2signed") + + # Verify receipt stored as analysis + mock_vcon.add_analysis.assert_called_once_with( + type="scitt_receipt", + dialog=0, + vendor="scittles", + body={ + "entry_id": "entry-abc123", + "vcon_operation": "vcon_created", + "subject": "tel:+15551234567", + "vcon_hash": mock_vcon.hash, + "scrapi_url": SCRAPI_URL, + }, + ) + + # Verify vCon saved back to Redis + mock_redis.store_vcon.assert_called_once_with(mock_vcon) + + @patch(f"{_RSM}.register_statement") + @patch(f"{_CHSS}.create_hashed_signed_statement") + @patch(f"{_CHSS}.open_signing_key") + def test_run_skips_receipt_storage_when_disabled( + self, mock_open_key, mock_create_stmt, mock_register, mock_redis, mock_vcon + ): + """When store_receipt is False, don't add analysis or save.""" + mock_open_key.return_value = Mock() + mock_create_stmt.return_value = b"\xd2signed" + mock_register.return_value = {"entry_id": "entry-1", "receipt": b""} + + opts = {**default_options, "store_receipt": False} + result = run("test-uuid-1234", "scitt_created", opts) + + assert result == "test-uuid-1234" + mock_vcon.add_analysis.assert_not_called() + mock_redis.store_vcon.assert_not_called() + + @patch(f"{_RSM}.register_statement") + @patch(f"{_CHSS}.create_hashed_signed_statement") + @patch(f"{_CHSS}.open_signing_key") + def test_run_with_vcon_enhanced_operation( + self, mock_open_key, mock_create_stmt, mock_register, mock_redis, mock_vcon + ): + """vcon_enhanced operation uses the correct meta_map value.""" + mock_open_key.return_value = Mock() + mock_create_stmt.return_value = b"\xd2signed" + mock_register.return_value = {"entry_id": "entry-enh", "receipt": b""} + + opts = {**default_options, "vcon_operation": "vcon_enhanced"} + run("test-uuid-1234", "scitt_enhanced", opts) + + call_kwargs = mock_create_stmt.call_args + assert call_kwargs.kwargs["meta_map"] == {"vcon_operation": "vcon_enhanced"} + + mock_vcon.add_analysis.assert_called_once() + analysis_body = mock_vcon.add_analysis.call_args.kwargs["body"] + assert analysis_body["vcon_operation"] == "vcon_enhanced" + + def test_run_raises_on_missing_vcon(self, mock_redis): + """Raises HTTPException when vCon not found in Redis.""" + mock_redis.get_vcon.return_value = None + + from fastapi import HTTPException + with pytest.raises(HTTPException) as exc_info: + run("nonexistent-uuid", "scitt_created", default_options) + assert exc_info.value.status_code == 404 + + @patch(f"{_RSM}.register_statement") + @patch(f"{_CHSS}.create_hashed_signed_statement") + @patch(f"{_CHSS}.open_signing_key") + def test_run_uses_fallback_subject( + self, mock_open_key, mock_create_stmt, mock_register, mock_redis, mock_vcon + ): + """When no parties have tel, uses vcon:// URI as subject.""" + mock_vcon.parties = [] # No parties with tel -> fallback to vcon:// + mock_open_key.return_value = Mock() + mock_create_stmt.return_value = b"\xd2signed" + mock_register.return_value = {"entry_id": "entry-1", "receipt": b""} + + run("test-uuid-1234", "scitt_created", default_options) + + call_kwargs = mock_create_stmt.call_args + assert call_kwargs.kwargs["subject"] == "vcon://test-uuid-1234" diff --git a/server/storage/scitt/__init__.py b/server/storage/scitt/__init__.py new file mode 100644 index 0000000..453fc32 --- /dev/null +++ b/server/storage/scitt/__init__.py @@ -0,0 +1,88 @@ +from links.scitt import create_hashed_signed_statement, register_signed_statement +from server.lib.vcon_redis import VconRedis +from lib.logging_utils import init_logger + +logger = init_logger(__name__) + +default_options = { + "scrapi_url": "http://scittles:8000", + "signing_key_path": "/etc/scitt/signing-key.pem", + "issuer": "conserver", + "key_id": "conserver-key-1", + "operations": ["vcon_enhanced"], +} + + +def save(vcon_id, opts=default_options): + """Register per-participant SCITT entries for a vCon. + + Runs as a storage backend in parallel with other storages (e.g., webhook). + Does NOT write receipts back to the vCon in Redis to avoid race conditions + with parallel webhook storage. The transparency service is the authoritative + store for SCITT receipts. + + Each participant (party with a tel field) gets a separate SCITT entry per + operation, with subject set to tel:+number for portal queryability. + Falls back to a single vcon:// entry if no parties have tel. + """ + merged = default_options.copy() + merged.update(opts) + opts = merged + + vcon_redis = VconRedis() + vcon = vcon_redis.get_vcon(vcon_id) + if not vcon: + logger.warning("scitt storage: vCon not found: %s", vcon_id) + return + + payload = vcon.hash + + signing_key_path = opts["signing_key_path"] + signing_key = create_hashed_signed_statement.open_signing_key(signing_key_path) + + # Collect tel URIs from parties (Party objects use attrs, dicts use keys) + party_tels = [] + for party in (vcon.parties or []): + tel = party.get("tel") if isinstance(party, dict) else getattr(party, "tel", None) + if tel: + party_tels.append(tel) + else: + logger.warning("scitt storage: party without tel in %s, skipping", vcon_id) + + # Fall back to vcon:// subject if no parties have tel + if not party_tels: + party_tels = [None] + + scrapi_url = opts["scrapi_url"] + + for operation in opts.get("operations", ["vcon_enhanced"]): + for tel in party_tels: + if tel: + subject = f"tel:{tel}" + operation_payload = f"{payload}:{operation}:{tel}" + meta_map = {"vcon_operation": operation, "party_tel": tel} + else: + subject = f"vcon://{vcon_id}" + operation_payload = f"{payload}:{operation}" + meta_map = {"vcon_operation": operation} + + signed_statement = create_hashed_signed_statement.create_hashed_signed_statement( + issuer=opts["issuer"], + signing_key=signing_key, + subject=subject, + kid=opts["key_id"].encode("utf-8"), + meta_map=meta_map, + payload=operation_payload.encode("utf-8"), + payload_hash_alg="SHA-256", + payload_location="", + pre_image_content_type="application/vcon+json", + ) + + result = register_signed_statement.register_statement(scrapi_url, signed_statement) + logger.info( + "scitt storage: Registered %s entry_id=%s subject=%s for %s", + operation, + result["entry_id"], + subject, + vcon_id, + )