diff --git a/cwl_utils/cwl_v1_0_expression_refactor.py b/cwl_utils/cwl_v1_0_expression_refactor.py index 8af5929e..c66e7d6f 100755 --- a/cwl_utils/cwl_v1_0_expression_refactor.py +++ b/cwl_utils/cwl_v1_0_expression_refactor.py @@ -7,7 +7,7 @@ import uuid from collections.abc import MutableSequence, Sequence from contextlib import suppress -from typing import Any, Optional, cast +from typing import Any, cast from ruamel import yaml from schema_salad.sourceline import SourceLine @@ -445,7 +445,7 @@ def find_expressionLib( if process.requirements: for req in process.requirements: if isinstance(req, cwl.InlineJavascriptRequirement): - return cast(Optional[list[str]], copy.deepcopy(req.expressionLib)) + return cast(list[str] | None, copy.deepcopy(req.expressionLib)) return None @@ -1695,7 +1695,9 @@ def cltool_step_outputs_to_workflow_outputs( def generate_etool_from_expr2( expr: str, target: cwl.InputParameter, - inputs: Sequence[cwl.InputParameter | cwl.CommandInputParameter], + inputs: Sequence[ + cwl.InputParameter | cwl.CommandInputParameter | cwl.CommandOutputParameter + ], self_name: str | None = None, process: cwl.CommandLineTool | cwl.ExpressionTool | None = None, extra_processes: None | ( @@ -1815,9 +1817,12 @@ def traverse_step( if not target: raise WorkflowException("target not found") input_source_id = None - source_type: None | (list[cwl.InputParameter] | cwl.InputParameter) = ( + source_type: ( None - ) + | MutableSequence[cwl.InputParameter | cwl.CommandOutputParameter] + | cwl.InputParameter + | cwl.CommandOutputParameter + ) = None if inp.source: if isinstance(inp.source, MutableSequence): input_source_id = [] @@ -1904,7 +1909,7 @@ def traverse_step( def workflow_step_to_InputParameters( step_ins: list[cwl.WorkflowStepInput], parent: cwl.Workflow, except_in_id: str -) -> list[cwl.InputParameter]: +) -> list[cwl.InputParameter | cwl.CommandOutputParameter]: """Create InputParameters to match the given WorkflowStep inputs.""" params = [] for inp in step_ins: @@ -1915,7 +1920,7 @@ def workflow_step_to_InputParameters( param = copy.deepcopy( utils.param_for_source_id(parent, sourcenames=inp.source) ) - if isinstance(param, list): + if isinstance(param, MutableSequence): for p in param: if not p.type_: raise WorkflowException( @@ -1946,7 +1951,12 @@ def replace_step_valueFrom_expr_with_etool( original_step_ins: list[cwl.WorkflowStepInput], source: str | list[str] | None, replace_etool: bool, - source_type: cwl.InputParameter | list[cwl.InputParameter] | None = None, + source_type: ( + cwl.InputParameter + | cwl.CommandOutputParameter + | MutableSequence[cwl.InputParameter | cwl.CommandOutputParameter] + | None + ) = None, ) -> None: """Replace a WorkflowStep level 'valueFrom' expression with a sibling ExpressionTool step.""" if not step_inp.id: diff --git a/cwl_utils/cwl_v1_1_expression_refactor.py b/cwl_utils/cwl_v1_1_expression_refactor.py index 12bff3d0..3f260e56 100755 --- a/cwl_utils/cwl_v1_1_expression_refactor.py +++ b/cwl_utils/cwl_v1_1_expression_refactor.py @@ -7,7 +7,7 @@ import uuid from collections.abc import MutableSequence, Sequence from contextlib import suppress -from typing import Any, Optional, cast +from typing import Any, cast from ruamel import yaml from schema_salad.sourceline import SourceLine @@ -443,7 +443,7 @@ def find_expressionLib( if process.requirements: for req in process.requirements: if isinstance(req, cwl.InlineJavascriptRequirement): - return cast(Optional[list[str]], copy.deepcopy(req.expressionLib)) + return cast(list[str] | None, copy.deepcopy(req.expressionLib)) return None @@ -1697,7 +1697,11 @@ def cltool_step_outputs_to_workflow_outputs( def generate_etool_from_expr2( expr: str, target: cwl.CommandInputParameter | cwl.WorkflowInputParameter, - inputs: Sequence[cwl.WorkflowInputParameter | cwl.CommandInputParameter], + inputs: Sequence[ + cwl.WorkflowInputParameter + | cwl.CommandInputParameter + | cwl.CommandOutputParameter + ], self_name: str | None = None, process: cwl.CommandLineTool | cwl.ExpressionTool | None = None, extra_processes: None | ( @@ -1818,7 +1822,14 @@ def traverse_step( raise WorkflowException("target not found") input_source_id = None source_type: None | ( - list[cwl.WorkflowInputParameter] | cwl.WorkflowInputParameter + MutableSequence[ + cwl.CommandInputParameter + | cwl.CommandOutputParameter + | cwl.WorkflowInputParameter + ] + | cwl.CommandInputParameter + | cwl.CommandOutputParameter + | cwl.WorkflowInputParameter ) = None if inp.source: if isinstance(inp.source, MutableSequence): @@ -1906,7 +1917,9 @@ def traverse_step( def workflow_step_to_WorkflowInputParameters( step_ins: list[cwl.WorkflowStepInput], parent: cwl.Workflow, except_in_id: str -) -> list[cwl.WorkflowInputParameter]: +) -> MutableSequence[ + cwl.CommandInputParameter | cwl.CommandOutputParameter | cwl.WorkflowInputParameter +]: """Create WorkflowInputParameters to match the given WorkflowStep inputs.""" params = [] for inp in step_ins: @@ -1917,7 +1930,7 @@ def workflow_step_to_WorkflowInputParameters( param = copy.deepcopy( utils.param_for_source_id(parent, sourcenames=inp.source) ) - if isinstance(param, list): + if isinstance(param, MutableSequence): for p in param: p.id = inp_id p.type_ = clean_type_ids(p.type_) @@ -1941,7 +1954,14 @@ def replace_step_valueFrom_expr_with_etool( source: str | list[str] | None, replace_etool: bool, source_type: None | ( - cwl.WorkflowInputParameter | list[cwl.WorkflowInputParameter] + cwl.CommandInputParameter + | cwl.CommandOutputParameter + | cwl.WorkflowInputParameter + | MutableSequence[ + cwl.CommandInputParameter + | cwl.CommandOutputParameter + | cwl.WorkflowInputParameter + ] ) = None, ) -> None: """Replace a WorkflowStep level 'valueFrom' expression with a sibling ExpressionTool step.""" diff --git a/cwl_utils/cwl_v1_2_expression_refactor.py b/cwl_utils/cwl_v1_2_expression_refactor.py index eb484a57..3d7f86e7 100755 --- a/cwl_utils/cwl_v1_2_expression_refactor.py +++ b/cwl_utils/cwl_v1_2_expression_refactor.py @@ -7,7 +7,7 @@ import uuid from collections.abc import Mapping, MutableSequence, Sequence from contextlib import suppress -from typing import Any, Optional, cast +from typing import Any, cast from ruamel import yaml from schema_salad.sourceline import SourceLine @@ -442,7 +442,7 @@ def find_expressionLib( if process.requirements: for req in process.requirements: if isinstance(req, cwl.InlineJavascriptRequirement): - return cast(Optional[list[str]], copy.deepcopy(req.expressionLib)) + return cast(list[str] | None, copy.deepcopy(req.expressionLib)) return None @@ -725,7 +725,9 @@ def process_workflow_inputs_and_outputs( source_type_items.append("null") elif source_type_items != "null": source_type_items = ["null", source_type_items] - source_type = cwl.CommandInputParameter(type_=source_type_items) + source_type = cwl.CommandInputParameter( + id=None, type_=source_type_items + ) replace_expr_with_etool( expression, etool_id, @@ -1798,7 +1800,11 @@ def cltool_step_outputs_to_workflow_outputs( def generate_etool_from_expr2( expr: str, target: cwl.CommandInputParameter | cwl.WorkflowInputParameter, - inputs: Sequence[cwl.WorkflowInputParameter | cwl.CommandInputParameter], + inputs: Sequence[ + cwl.WorkflowInputParameter + | cwl.CommandInputParameter + | cwl.CommandOutputParameter + ], self_name: str | None = None, process: cwl.CommandLineTool | cwl.ExpressionTool | None = None, extra_processes: None | ( @@ -1919,7 +1925,14 @@ def traverse_step( raise WorkflowException("target not found") input_source_id = None source_type: None | ( - list[cwl.WorkflowInputParameter] | cwl.WorkflowInputParameter + MutableSequence[ + cwl.CommandInputParameter + | cwl.CommandOutputParameter + | cwl.WorkflowInputParameter + ] + | cwl.CommandInputParameter + | cwl.CommandOutputParameter + | cwl.WorkflowInputParameter ) = None if inp.source: if isinstance(inp.source, MutableSequence): @@ -2015,7 +2028,9 @@ def traverse_step( def workflow_step_to_WorkflowInputParameters( step_ins: list[cwl.WorkflowStepInput], parent: cwl.Workflow, except_in_id: str -) -> list[cwl.WorkflowInputParameter]: +) -> MutableSequence[ + cwl.CommandInputParameter | cwl.CommandOutputParameter | cwl.WorkflowInputParameter +]: """Create WorkflowInputParameters to match the given WorkflowStep inputs.""" params = [] for inp in step_ins: @@ -2026,7 +2041,7 @@ def workflow_step_to_WorkflowInputParameters( param = copy.deepcopy( utils.param_for_source_id(parent, sourcenames=inp.source) ) - if isinstance(param, list): + if isinstance(param, MutableSequence): for p in param: p.id = inp_id p.type_ = clean_type_ids(p.type_) @@ -2050,7 +2065,14 @@ def replace_step_valueFrom_expr_with_etool( source: str | list[str] | None, replace_etool: bool, source_type: None | ( - cwl.WorkflowInputParameter | list[cwl.WorkflowInputParameter] + cwl.CommandInputParameter + | cwl.CommandOutputParameter + | cwl.WorkflowInputParameter + | MutableSequence[ + cwl.CommandInputParameter + | cwl.CommandOutputParameter + | cwl.WorkflowInputParameter + ] ) = None, ) -> None: """Replace a WorkflowStep level 'valueFrom' expression with a sibling ExpressionTool step.""" diff --git a/cwl_utils/expression.py b/cwl_utils/expression.py index 90448c8a..c5a3c39c 100644 --- a/cwl_utils/expression.py +++ b/cwl_utils/expression.py @@ -1,12 +1,11 @@ # SPDX-License-Identifier: Apache-2.0 """CWL Expression parsing.""" import asyncio -import copy import inspect import json -from collections.abc import Awaitable, MutableMapping +from collections.abc import Awaitable, Container from enum import Enum -from typing import Any, Union, cast +from typing import Any, Final, cast from schema_salad.utils import json_dumps @@ -21,6 +20,15 @@ ) from cwl_utils.utils import bytes2str_in_dicts +OLD_ESCAPE_CWL_VERSIONS: Final[Container[str]] = ( + "v1.0", + "v1.1.0-dev1", + "v1.1", + "v1.2.0-dev1", + "v1.2.0-dev2", + "v1.2.0-dev3", +) + def _convert_dumper(string: str) -> str: return f"{json.dumps(string)} + " @@ -293,9 +301,7 @@ def do_eval( :param timeout: The maximum number of seconds to wait while executing. """ - runtime = cast(MutableMapping[str, Union[int, str, None]], copy.deepcopy(resources)) - runtime["tmpdir"] = tmpdir or None - runtime["outdir"] = outdir or None + runtime = resources | {"tmpdir": tmpdir or None, "outdir": outdir or None} rootvars = cast( CWLParameterContext, @@ -319,19 +325,7 @@ def do_eval( fullJS=fullJS, jslib=jslib, strip_whitespace=strip_whitespace, - escaping_behavior=( - 1 - if cwlVersion - in ( - "v1.0", - "v1.1.0-dev1", - "v1.1", - "v1.2.0-dev1", - "v1.2.0-dev2", - "v1.2.0-dev3", - ) - else 2 - ), + escaping_behavior=1 if cwlVersion in OLD_ESCAPE_CWL_VERSIONS else 2, **kwargs, ) diff --git a/cwl_utils/expression_refactor.py b/cwl_utils/expression_refactor.py index 9784cd64..6439be9f 100755 --- a/cwl_utils/expression_refactor.py +++ b/cwl_utils/expression_refactor.py @@ -8,7 +8,7 @@ import sys from collections.abc import Callable, MutableMapping, MutableSequence from pathlib import Path -from typing import Any, Optional, Protocol, Union +from typing import Any, Protocol from ruamel.yaml.main import YAML from ruamel.yaml.scalarstring import walk_tree @@ -28,9 +28,9 @@ _logger.setLevel(logging.INFO) _cwlutilslogger.setLevel(100) -save_type = Optional[ - Union[MutableMapping[str, Any], MutableSequence[Any], int, float, bool, str] -] +save_type = ( + MutableMapping[str, Any] | MutableSequence[Any] | int | float | bool | str | None +) class saveCWL(Protocol): diff --git a/cwl_utils/inputs_schema_gen.py b/cwl_utils/inputs_schema_gen.py index e8552410..d78228f8 100644 --- a/cwl_utils/inputs_schema_gen.py +++ b/cwl_utils/inputs_schema_gen.py @@ -12,7 +12,7 @@ from copy import deepcopy from importlib.resources import files from pathlib import Path -from typing import Any, TypeGuard, Union +from typing import Any, TypeGuard from urllib.parse import urlparse import requests @@ -61,9 +61,9 @@ } # Some type hinting -InputType = Union[ - InputArraySchema, InputEnumSchema, InputRecordSchema, str, File, Directory -] +InputType = ( + InputArraySchema | InputEnumSchema | InputRecordSchema | str | File | Directory +) # Don't need type checking at runtime diff --git a/cwl_utils/parser/__init__.py b/cwl_utils/parser/__init__.py index bf56e85e..6e105072 100644 --- a/cwl_utils/parser/__init__.py +++ b/cwl_utils/parser/__init__.py @@ -294,24 +294,49 @@ def load_document_by_uri( loadingOptions = cwl_v1_0.LoadingOptions( fileuri=real_uri, baseuri=base_uri, copyfrom=loadingOptions ) + return load_document_by_string( + loadingOptions.fetcher.fetch_text(real_uri), + real_uri, + loadingOptions, + id_, + load_all, + ) case cwl_v1_1.LoadingOptions(): loadingOptions = cwl_v1_1.LoadingOptions( fileuri=real_uri, baseuri=base_uri, copyfrom=loadingOptions ) + return load_document_by_string( + loadingOptions.fetcher.fetch_text(real_uri), + real_uri, + loadingOptions, + id_, + load_all, + ) case cwl_v1_2.LoadingOptions(): loadingOptions = cwl_v1_2.LoadingOptions( fileuri=real_uri, baseuri=base_uri, copyfrom=loadingOptions ) + return load_document_by_string( + loadingOptions.fetcher.fetch_text(real_uri), + real_uri, + loadingOptions, + id_, + load_all, + ) case None: loadingOptions = cwl_v1_2.LoadingOptions(fileuri=real_uri, baseuri=base_uri) + return load_document_by_string( + loadingOptions.fetcher.fetch_text(real_uri), + real_uri, + None, + id_, + load_all, + ) case _: raise ValidationException( f"Unsupported loadingOptions type: {type(loadingOptions)}" ) - doc = loadingOptions.fetcher.fetch_text(real_uri) - return load_document_by_string(doc, real_uri, loadingOptions, id_, load_all) - def load_document( doc: Any, diff --git a/cwl_utils/parser/cwl_v1_0.py b/cwl_utils/parser/cwl_v1_0.py index 59242c7f..7f3110dd 100644 --- a/cwl_utils/parser/cwl_v1_0.py +++ b/cwl_utils/parser/cwl_v1_0.py @@ -14,7 +14,7 @@ from collections.abc import MutableMapping, MutableSequence, Sequence from io import StringIO from itertools import chain -from typing import Any, Optional, Union, cast +from typing import Any, Final, Optional, Union, cast from urllib.parse import quote, urldefrag, urlparse, urlsplit, urlunsplit from urllib.request import pathname2url @@ -30,28 +30,28 @@ _vocab: dict[str, str] = {} _rvocab: dict[str, str] = {} -_logger = logging.getLogger("salad") +_logger: Final = logging.getLogger("salad") IdxType = MutableMapping[str, tuple[Any, "LoadingOptions"]] class LoadingOptions: - idx: IdxType - fileuri: Optional[str] - baseuri: str - namespaces: MutableMapping[str, str] - schemas: MutableSequence[str] - original_doc: Optional[Any] - addl_metadata: MutableMapping[str, Any] - fetcher: Fetcher - vocab: dict[str, str] - rvocab: dict[str, str] - cache: CacheType - imports: list[str] - includes: list[str] - no_link_check: Optional[bool] - container: Optional[str] + idx: Final[IdxType] + fileuri: Final[Optional[str]] + baseuri: Final[str] + namespaces: Final[MutableMapping[str, str]] + schemas: Final[MutableSequence[str]] + original_doc: Final[Optional[Any]] + addl_metadata: Final[MutableMapping[str, Any]] + fetcher: Final[Fetcher] + vocab: Final[dict[str, str]] + rvocab: Final[dict[str, str]] + cache: Final[CacheType] + imports: Final[list[str]] + includes: Final[list[str]] + no_link_check: Final[Optional[bool]] + container: Final[Optional[str]] def __init__( self, @@ -73,59 +73,69 @@ def __init__( self.original_doc = original_doc if idx is not None: - self.idx = idx + temp_idx = idx else: - self.idx = copyfrom.idx if copyfrom is not None else {} + temp_idx = copyfrom.idx if copyfrom is not None else {} + self.idx = temp_idx if fileuri is not None: - self.fileuri = fileuri + temp_fileuri: Optional[str] = fileuri else: - self.fileuri = copyfrom.fileuri if copyfrom is not None else None + temp_fileuri = copyfrom.fileuri if copyfrom is not None else None + self.fileuri = temp_fileuri if baseuri is not None: - self.baseuri = baseuri + temp_baseuri = baseuri else: - self.baseuri = copyfrom.baseuri if copyfrom is not None else "" + temp_baseuri = copyfrom.baseuri if copyfrom is not None else "" + self.baseuri = temp_baseuri if namespaces is not None: - self.namespaces = namespaces + temp_namespaces: MutableMapping[str, str] = namespaces else: - self.namespaces = copyfrom.namespaces if copyfrom is not None else {} + temp_namespaces = copyfrom.namespaces if copyfrom is not None else {} + self.namespaces = temp_namespaces if schemas is not None: - self.schemas = schemas + temp_schemas: MutableSequence[str] = schemas else: - self.schemas = copyfrom.schemas if copyfrom is not None else [] + temp_schemas = copyfrom.schemas if copyfrom is not None else [] + self.schemas = temp_schemas if addl_metadata is not None: - self.addl_metadata = addl_metadata + temp_addl_metadata: MutableMapping[str, Any] = addl_metadata else: - self.addl_metadata = copyfrom.addl_metadata if copyfrom is not None else {} + temp_addl_metadata = copyfrom.addl_metadata if copyfrom is not None else {} + self.addl_metadata = temp_addl_metadata if imports is not None: - self.imports = imports + temp_imports = imports else: - self.imports = copyfrom.imports if copyfrom is not None else [] + temp_imports = copyfrom.imports if copyfrom is not None else [] + self.imports = temp_imports if includes is not None: - self.includes = includes + temp_includes = includes else: - self.includes = copyfrom.includes if copyfrom is not None else [] + temp_includes = copyfrom.includes if copyfrom is not None else [] + self.includes = temp_includes if no_link_check is not None: - self.no_link_check = no_link_check + temp_no_link_check: Optional[bool] = no_link_check else: - self.no_link_check = copyfrom.no_link_check if copyfrom is not None else False + temp_no_link_check = copyfrom.no_link_check if copyfrom is not None else False + self.no_link_check = temp_no_link_check if container is not None: - self.container = container + temp_container: Optional[str] = container else: - self.container = copyfrom.container if copyfrom is not None else None + temp_container = copyfrom.container if copyfrom is not None else None + self.container = temp_container if fetcher is not None: - self.fetcher = fetcher + temp_fetcher = fetcher elif copyfrom is not None: - self.fetcher = copyfrom.fetcher + temp_fetcher = copyfrom.fetcher else: import requests from cachecontrol.caches import SeparateBodyFileCache @@ -136,19 +146,22 @@ def __init__( requests.Session(), cache=SeparateBodyFileCache(root / ".cache" / "salad"), ) - self.fetcher: Fetcher = DefaultFetcher({}, session) + temp_fetcher = DefaultFetcher({}, session) + self.fetcher = temp_fetcher self.cache = self.fetcher.cache if isinstance(self.fetcher, MemoryCachingFetcher) else {} - self.vocab = _vocab - self.rvocab = _rvocab - - if self.namespaces is not None: - self.vocab = self.vocab.copy() - self.rvocab = self.rvocab.copy() + if self.namespaces != {}: + temp_vocab = _vocab.copy() + temp_rvocab = _rvocab.copy() for k, v in self.namespaces.items(): - self.vocab[k] = v - self.rvocab[v] = k + temp_vocab[k] = v + temp_rvocab[v] = k + else: + temp_vocab = _vocab + temp_rvocab = _rvocab + self.vocab = temp_vocab + self.rvocab = temp_rvocab @property def graph(self) -> Graph: @@ -156,7 +169,7 @@ def graph(self) -> Graph: graph = Graph() if not self.schemas: return graph - key = str(hash(tuple(self.schemas))) + key: Final = str(hash(tuple(self.schemas))) if key in self.cache: return cast(Graph, self.cache[key]) for schema in self.schemas: @@ -221,20 +234,20 @@ def load_field( if "$import" in val: if loadingOptions.fileuri is None: raise SchemaSaladException("Cannot load $import without fileuri") - url = loadingOptions.fetcher.urljoin(loadingOptions.fileuri, val["$import"]) + url1: Final = loadingOptions.fetcher.urljoin(loadingOptions.fileuri, val["$import"]) result, metadata = _document_load_by_url( fieldtype, - url, + url1, loadingOptions, ) - loadingOptions.imports.append(url) + loadingOptions.imports.append(url1) return result if "$include" in val: if loadingOptions.fileuri is None: raise SchemaSaladException("Cannot load $import without fileuri") - url = loadingOptions.fetcher.urljoin(loadingOptions.fileuri, val["$include"]) - val = loadingOptions.fetcher.fetch_text(url) - loadingOptions.includes.append(url) + url2: Final = loadingOptions.fetcher.urljoin(loadingOptions.fileuri, val["$include"]) + val = loadingOptions.fetcher.fetch_text(url2) + loadingOptions.includes.append(url2) return fieldtype.load(val, baseuri, loadingOptions, lc=lc) @@ -243,7 +256,7 @@ def load_field( def extract_type(val_type: type[Any]) -> str: """Take a type of value, and extracts the value as a string.""" - val_str = str(val_type) + val_str: Final = str(val_type) return val_str.split("'")[1] @@ -264,10 +277,10 @@ def parse_errors(error_message: str) -> tuple[str, str, str]: """Parse error messages from several loaders into one error message.""" if not error_message.startswith("Expected"): return error_message, "", "" - vals = error_message.split("\n") + vals: Final = error_message.split("\n") if len(vals) == 1: return error_message, "", "" - types = set() + types1: Final = set() for val in vals: individual_vals = val.split(" ") if val == "": @@ -275,27 +288,29 @@ def parse_errors(error_message: str) -> tuple[str, str, str]: if individual_vals[1] == "one": individual_vals = val.split("(")[1].split(",") for t in individual_vals: - types.add(t.strip(" ").strip(")\n")) + types1.add(t.strip(" ").strip(")\n")) elif individual_vals[2] == "").replace("'", "")) + types1.add(individual_vals[3].strip(">").replace("'", "")) elif individual_vals[0] == "Value": - types.add(individual_vals[-1].strip(".")) + types1.add(individual_vals[-1].strip(".")) else: - types.add(individual_vals[1].replace(",", "")) - types = {val for val in types if val != "NoneType"} - if "str" in types: - types = {convert_typing(val) for val in types if "'" not in val} + types1.add(individual_vals[1].replace(",", "")) + types2: Final = {val for val in types1 if val != "NoneType"} + if "str" in types2: + types3 = {convert_typing(val) for val in types2 if "'" not in val} + else: + types3 = types2 to_print = "" - for val in types: + for val in types3: if "'" in val: - to_print = "value" if len(types) == 1 else "values" + to_print = "value" if len(types3) == 1 else "values" if to_print == "": - to_print = "type" if len(types) == 1 else "types" + to_print = "type" if len(types3) == 1 else "types" - verb_tensage = "is" if len(types) == 1 else "are" + verb_tensage: Final = "is" if len(types3) == 1 else "are" - return str(types).replace("{", "(").replace("}", ")").replace("'", ""), to_print, verb_tensage + return str(types3).replace("{", "(").replace("}", ")").replace("'", ""), to_print, verb_tensage def save( @@ -309,7 +324,7 @@ def save( if isinstance(val, MutableSequence): return [save(v, top=False, base_url=base_url, relative_uris=relative_uris) for v in val] if isinstance(val, MutableMapping): - newdict = {} + newdict: Final = {} for key in val: newdict[key] = save(val[key], top=False, base_url=base_url, relative_uris=relative_uris) return newdict @@ -326,7 +341,7 @@ def save_with_metadata( relative_uris: bool = True, ) -> save_type: """Save and set $namespaces, $schemas, $base and any other metadata fields at the top level.""" - saved_val = save(val, top, base_url, relative_uris) + saved_val: Final = save(val, top, base_url, relative_uris) newdict: MutableMapping[str, Any] = {} if isinstance(saved_val, MutableSequence): newdict = {"$graph": saved_val} @@ -361,30 +376,30 @@ def expand_url( return url if bool(loadingOptions.vocab) and ":" in url: - prefix = url.split(":")[0] + prefix: Final = url.split(":")[0] if prefix in loadingOptions.vocab: url = loadingOptions.vocab[prefix] + url[len(prefix) + 1 :] - split = urlsplit(url) + split1: Final = urlsplit(url) if ( - (bool(split.scheme) and split.scheme in loadingOptions.fetcher.supported_schemes()) + (bool(split1.scheme) and split1.scheme in loadingOptions.fetcher.supported_schemes()) or url.startswith("$(") or url.startswith("${") ): pass - elif scoped_id and not bool(split.fragment): - splitbase = urlsplit(base_url) - frg = "" - if bool(splitbase.fragment): - frg = splitbase.fragment + "/" + split.path + elif scoped_id and not bool(split1.fragment): + splitbase1: Final = urlsplit(base_url) + frg: str + if bool(splitbase1.fragment): + frg = splitbase1.fragment + "/" + split1.path else: - frg = split.path - pt = splitbase.path if splitbase.path != "" else "/" - url = urlunsplit((splitbase.scheme, splitbase.netloc, pt, splitbase.query, frg)) - elif scoped_ref is not None and not bool(split.fragment): - splitbase = urlsplit(base_url) - sp = splitbase.fragment.split("/") + frg = split1.path + pt: Final = splitbase1.path if splitbase1.path != "" else "/" + url = urlunsplit((splitbase1.scheme, splitbase1.netloc, pt, splitbase1.query, frg)) + elif scoped_ref is not None and not bool(split1.fragment): + splitbase2: Final = urlsplit(base_url) + sp = splitbase2.fragment.split("/") n = scoped_ref while n > 0 and len(sp) > 0: sp.pop() @@ -392,10 +407,10 @@ def expand_url( sp.append(url) url = urlunsplit( ( - splitbase.scheme, - splitbase.netloc, - splitbase.path, - splitbase.query, + splitbase2.scheme, + splitbase2.netloc, + splitbase2.path, + splitbase2.query, "/".join(sp), ) ) @@ -403,8 +418,8 @@ def expand_url( url = loadingOptions.fetcher.urljoin(base_url, url) if vocab_term: - split = urlsplit(url) - if bool(split.scheme): + split2: Final = urlsplit(url) + if bool(split2.scheme): if url in loadingOptions.rvocab: return loadingOptions.rvocab[url] else: @@ -441,7 +456,7 @@ def load( class _PrimitiveLoader(_Loader): def __init__(self, tp: Union[type, tuple[type[str], type[str]]]) -> None: - self.tp = tp + self.tp: Final = tp def load( self, @@ -461,7 +476,7 @@ def __repr__(self) -> str: class _ArrayLoader(_Loader): def __init__(self, items: _Loader) -> None: - self.items = items + self.items: Final = items def load( self, @@ -476,9 +491,9 @@ def load( f"Value is a {convert_typing(extract_type(type(doc)))}, " f"but valid type for this field is an array." ) - r: list[Any] = [] - errors: list[SchemaSaladException] = [] - fields: list[str] = [] + r: Final[list[Any]] = [] + errors: Final[list[SchemaSaladException]] = [] + fields: Final[list[str]] = [] for i in range(0, len(doc)): try: lf = load_field( @@ -524,10 +539,10 @@ def __init__( container: Optional[str] = None, no_link_check: Optional[bool] = None, ) -> None: - self.values = values - self.name = name - self.container = container - self.no_link_check = no_link_check + self.values: Final = values + self.name: Final = name + self.container: Final = container + self.no_link_check: Final = no_link_check def load( self, @@ -543,8 +558,8 @@ def load( loadingOptions = LoadingOptions( copyfrom=loadingOptions, container=self.container, no_link_check=self.no_link_check ) - r: dict[str, Any] = {} - errors: list[SchemaSaladException] = [] + r: Final[dict[str, Any]] = {} + errors: Final[list[SchemaSaladException]] = [] for k, v in doc.items(): try: lf = load_field(v, self.values, baseuri, loadingOptions, lc) @@ -561,8 +576,8 @@ def __repr__(self) -> str: class _EnumLoader(_Loader): def __init__(self, symbols: Sequence[str], name: str) -> None: - self.symbols = symbols - self.name = name + self.symbols: Final = symbols + self.name: Final = name def load( self, @@ -582,7 +597,7 @@ def __repr__(self) -> str: class _SecondaryDSLLoader(_Loader): def __init__(self, inner: _Loader) -> None: - self.inner = inner + self.inner: Final = inner def load( self, @@ -592,7 +607,7 @@ def load( docRoot: Optional[str] = None, lc: Optional[list[Any]] = None, ) -> Any: - r: list[dict[str, Any]] = [] + r: Final[list[dict[str, Any]]] = [] if isinstance(doc, MutableSequence): for d in doc: if isinstance(d, str): @@ -601,15 +616,15 @@ def load( else: r.append({"pattern": d}) elif isinstance(d, dict): - new_dict: dict[str, Any] = {} + new_dict1: dict[str, Any] = {} dict_copy = copy.deepcopy(d) if "pattern" in dict_copy: - new_dict["pattern"] = dict_copy.pop("pattern") + new_dict1["pattern"] = dict_copy.pop("pattern") else: raise ValidationException( f"Missing pattern in secondaryFiles specification entry: {d}" ) - new_dict["required"] = ( + new_dict1["required"] = ( dict_copy.pop("required") if "required" in dict_copy else None ) @@ -619,28 +634,28 @@ def load( dict_copy ) ) - r.append(new_dict) + r.append(new_dict1) else: raise ValidationException( "Expected a string or sequence of (strings or mappings)." ) elif isinstance(doc, MutableMapping): - new_dict = {} - doc_copy = copy.deepcopy(doc) + new_dict2: Final = {} + doc_copy: Final = copy.deepcopy(doc) if "pattern" in doc_copy: - new_dict["pattern"] = doc_copy.pop("pattern") + new_dict2["pattern"] = doc_copy.pop("pattern") else: raise ValidationException( f"Missing pattern in secondaryFiles specification entry: {doc}" ) - new_dict["required"] = doc_copy.pop("required") if "required" in doc_copy else None + new_dict2["required"] = doc_copy.pop("required") if "required" in doc_copy else None if len(doc_copy): raise ValidationException( f"Unallowed values in secondaryFiles specification entry: {doc_copy}" ) - r.append(new_dict) + r.append(new_dict2) elif isinstance(doc, str): if doc.endswith("?"): @@ -659,9 +674,9 @@ def __init__( container: Optional[str] = None, no_link_check: Optional[bool] = None, ) -> None: - self.classtype = classtype - self.container = container - self.no_link_check = no_link_check + self.classtype: Final = classtype + self.container: Final = container + self.no_link_check: Final = no_link_check def load( self, @@ -688,7 +703,7 @@ def __repr__(self) -> str: class _ExpressionLoader(_Loader): def __init__(self, items: type[str]) -> None: - self.items = items + self.items: Final = items def load( self, @@ -709,7 +724,7 @@ def load( class _UnionLoader(_Loader): def __init__(self, alternates: Sequence[_Loader], name: Optional[str] = None) -> None: self.alternates = alternates - self.name = name + self.name: Final = name def add_loaders(self, loaders: Sequence[_Loader]) -> None: self.alternates = tuple(loader for loader in chain(self.alternates, loaders)) @@ -722,7 +737,7 @@ def load( docRoot: Optional[str] = None, lc: Optional[list[Any]] = None, ) -> Any: - errors = [] + errors: Final = [] if lc is None: lc = [] @@ -805,11 +820,11 @@ def __init__( scoped_ref: Optional[int], no_link_check: Optional[bool], ) -> None: - self.inner = inner - self.scoped_id = scoped_id - self.vocab_term = vocab_term - self.scoped_ref = scoped_ref - self.no_link_check = no_link_check + self.inner: Final = inner + self.scoped_id: Final = scoped_id + self.vocab_term: Final = vocab_term + self.scoped_ref: Final = scoped_ref + self.no_link_check: Final = no_link_check def load( self, @@ -824,7 +839,7 @@ def load( copyfrom=loadingOptions, no_link_check=self.no_link_check ) if isinstance(doc, MutableSequence): - newdoc = [] + newdoc: Final = [] for i in doc: if isinstance(i, str): newdoc.append( @@ -851,7 +866,7 @@ def load( ) if isinstance(doc, str): if not loadingOptions.no_link_check: - errors = [] + errors: Final = [] try: if not loadingOptions.fetcher.check_exists(doc): errors.append( @@ -866,9 +881,9 @@ def load( class _TypeDSLLoader(_Loader): def __init__(self, inner: _Loader, refScope: Optional[int], salad_version: str) -> None: - self.inner = inner - self.refScope = refScope - self.salad_version = salad_version + self.inner: Final = inner + self.refScope: Final = refScope + self.salad_version: Final = salad_version def resolve( self, @@ -883,9 +898,9 @@ def resolve( doc_ = doc_[0:-1] if doc_.endswith("[]"): - salad_versions = [int(v) for v in self.salad_version[1:].split(".")] + salad_versions: Final = [int(v) for v in self.salad_version[1:].split(".")] items: Union[list[Union[dict[str, Any], str]], dict[str, Any], str] = "" - rest = doc_[0:-2] + rest: Final = doc_[0:-2] if salad_versions < [1, 3]: if rest.endswith("[]"): # To show the error message with the original type @@ -914,7 +929,7 @@ def load( lc: Optional[list[Any]] = None, ) -> Any: if isinstance(doc, MutableSequence): - r: list[Any] = [] + r: Final[list[Any]] = [] for d in doc: if isinstance(d, str): resolved = self.resolve(d, baseuri, loadingOptions) @@ -936,9 +951,9 @@ def load( class _IdMapLoader(_Loader): def __init__(self, inner: _Loader, mapSubject: str, mapPredicate: Optional[str]) -> None: - self.inner = inner - self.mapSubject = mapSubject - self.mapPredicate = mapPredicate + self.inner: Final = inner + self.mapSubject: Final = mapSubject + self.mapPredicate: Final = mapPredicate def load( self, @@ -949,7 +964,7 @@ def load( lc: Optional[list[Any]] = None, ) -> Any: if isinstance(doc, MutableMapping): - r: list[Any] = [] + r: Final[list[Any]] = [] for k in doc.keys(): val = doc[k] if isinstance(val, CommentedMap): @@ -989,13 +1004,13 @@ def _document_load( ) if isinstance(doc, MutableMapping): - addl_metadata = {} + addl_metadata: Final = {} if addl_metadata_fields is not None: for mf in addl_metadata_fields: if mf in doc: addl_metadata[mf] = doc[mf] - docuri = baseuri + docuri: Final = baseuri if "$base" in doc: baseuri = doc["$base"] @@ -1007,22 +1022,22 @@ def _document_load( addl_metadata=addl_metadata, ) - doc = copy.copy(doc) - if "$namespaces" in doc: - doc.pop("$namespaces") - if "$schemas" in doc: - doc.pop("$schemas") - if "$base" in doc: - doc.pop("$base") + doc2: Final = copy.copy(doc) + if "$namespaces" in doc2: + doc2.pop("$namespaces") + if "$schemas" in doc2: + doc2.pop("$schemas") + if "$base" in doc2: + doc2.pop("$base") - if "$graph" in doc: + if "$graph" in doc2: loadingOptions.idx[baseuri] = ( - loader.load(doc["$graph"], baseuri, loadingOptions), + loader.load(doc2["$graph"], baseuri, loadingOptions), loadingOptions, ) else: loadingOptions.idx[baseuri] = ( - loader.load(doc, baseuri, loadingOptions, docRoot=baseuri), + loader.load(doc2, baseuri, loadingOptions, docRoot=baseuri), loadingOptions, ) @@ -1054,11 +1069,11 @@ def _document_load_by_url( doc_url, frg = urldefrag(url) - text = loadingOptions.fetcher.fetch_text(doc_url) - textIO = StringIO(text) + text: Final = loadingOptions.fetcher.fetch_text(doc_url) + textIO: Final = StringIO(text) textIO.name = str(doc_url) - yaml = yaml_no_ts() - result = yaml.load(textIO) + yaml: Final = yaml_no_ts() + result: Final = yaml.load(textIO) add_lc_filename(result, doc_url) loadingOptions = LoadingOptions(copyfrom=loadingOptions, fileuri=doc_url) @@ -1079,7 +1094,7 @@ def file_uri(path: str, split_frag: bool = False) -> str: if path.startswith("file://"): return path if split_frag: - pathsp = path.split("#", 2) + pathsp: Final = path.split("#", 2) frag = "#" + quote(str(pathsp[1])) if len(pathsp) == 2 else "" urlpath = pathname2url(str(pathsp[0])) else: @@ -1111,8 +1126,8 @@ def save_relative_uri( elif isinstance(uri, str): if not relative_uris or uri == base_url: return uri - urisplit = urlsplit(uri) - basesplit = urlsplit(base_url) + urisplit: Final = urlsplit(uri) + basesplit: Final = urlsplit(base_url) if urisplit.scheme == basesplit.scheme and urisplit.netloc == basesplit.netloc: if urisplit.path != basesplit.path: p = os.path.relpath(urisplit.path, os.path.dirname(basesplit.path)) @@ -1143,7 +1158,7 @@ def shortname(inputid: str) -> str: See https://w3id.org/cwl/v1.2/SchemaSalad.html#Short_names. """ - parsed_id = urlparse(inputid) + parsed_id: Final = urlparse(inputid) if parsed_id.fragment: return parsed_id.fragment.split("/")[-1] return parsed_id.path.split("/")[-1] diff --git a/cwl_utils/parser/cwl_v1_0_utils.py b/cwl_utils/parser/cwl_v1_0_utils.py index 6a647478..2400298b 100644 --- a/cwl_utils/parser/cwl_v1_0_utils.py +++ b/cwl_utils/parser/cwl_v1_0_utils.py @@ -5,7 +5,7 @@ from collections.abc import MutableMapping, MutableSequence from io import StringIO from pathlib import Path -from typing import IO, Any, Union, cast +from typing import Any, IO, cast from urllib.parse import urldefrag from schema_salad.exceptions import ValidationException @@ -48,12 +48,8 @@ def _compare_records( _logger.info( "Record comparison failure for %s and %s\n" "Did not match fields for %s: %s and %s", - cast( - Union[cwl.InputRecordSchema, cwl.CommandOutputRecordSchema], src - ).name, - cast( - Union[cwl.InputRecordSchema, cwl.CommandOutputRecordSchema], sink - ).name, + cast(cwl.InputRecordSchema | cwl.CommandOutputRecordSchema, src).name, + cast(cwl.InputRecordSchema | cwl.CommandOutputRecordSchema, sink).name, key, srcfields.get(key), sinkfields.get(key), @@ -79,8 +75,8 @@ def _compare_type(type1: Any, type2: Any) -> bool: case MutableSequence(), MutableSequence(): if len(type1) != len(type2): return False - for t1 in type1: - if not any(_compare_type(t1, t2) for t2 in type2): + for t3 in type1: + if not any(_compare_type(t3, t2) for t2 in type2): return False return True return bool(type1 == type2) @@ -427,7 +423,7 @@ def type_for_source( """Determine the type for the given sourcenames.""" scatter_context: list[tuple[int, str] | None] = [] params = param_for_source_id(process, sourcenames, parent, scatter_context) - if not isinstance(params, list): + if not isinstance(params, MutableSequence): new_type = params.type_ if scatter_context[0] is not None: if scatter_context[0][1] == "nested_crossproduct": @@ -474,11 +470,15 @@ def param_for_source_id( sourcenames: str | list[str], parent: cwl.Workflow | None = None, scatter_context: list[tuple[int, str] | None] | None = None, -) -> list[cwl.InputParameter] | cwl.InputParameter: +) -> ( + cwl.InputParameter + | cwl.CommandOutputParameter + | MutableSequence[cwl.InputParameter | cwl.CommandOutputParameter] +): """Find the process input parameter that matches one of the given sourcenames.""" if isinstance(sourcenames, str): sourcenames = [sourcenames] - params: list[cwl.InputParameter] = [] + params: MutableSequence[cwl.InputParameter | cwl.CommandOutputParameter] = [] for sourcename in sourcenames: if not isinstance(process, cwl.Workflow): for param in process.inputs: diff --git a/cwl_utils/parser/cwl_v1_1.py b/cwl_utils/parser/cwl_v1_1.py index 2705f6ba..744cba7f 100644 --- a/cwl_utils/parser/cwl_v1_1.py +++ b/cwl_utils/parser/cwl_v1_1.py @@ -14,7 +14,7 @@ from collections.abc import MutableMapping, MutableSequence, Sequence from io import StringIO from itertools import chain -from typing import Any, Optional, Union, cast +from typing import Any, Final, Optional, Union, cast from urllib.parse import quote, urldefrag, urlparse, urlsplit, urlunsplit from urllib.request import pathname2url @@ -30,28 +30,28 @@ _vocab: dict[str, str] = {} _rvocab: dict[str, str] = {} -_logger = logging.getLogger("salad") +_logger: Final = logging.getLogger("salad") IdxType = MutableMapping[str, tuple[Any, "LoadingOptions"]] class LoadingOptions: - idx: IdxType - fileuri: Optional[str] - baseuri: str - namespaces: MutableMapping[str, str] - schemas: MutableSequence[str] - original_doc: Optional[Any] - addl_metadata: MutableMapping[str, Any] - fetcher: Fetcher - vocab: dict[str, str] - rvocab: dict[str, str] - cache: CacheType - imports: list[str] - includes: list[str] - no_link_check: Optional[bool] - container: Optional[str] + idx: Final[IdxType] + fileuri: Final[Optional[str]] + baseuri: Final[str] + namespaces: Final[MutableMapping[str, str]] + schemas: Final[MutableSequence[str]] + original_doc: Final[Optional[Any]] + addl_metadata: Final[MutableMapping[str, Any]] + fetcher: Final[Fetcher] + vocab: Final[dict[str, str]] + rvocab: Final[dict[str, str]] + cache: Final[CacheType] + imports: Final[list[str]] + includes: Final[list[str]] + no_link_check: Final[Optional[bool]] + container: Final[Optional[str]] def __init__( self, @@ -73,59 +73,69 @@ def __init__( self.original_doc = original_doc if idx is not None: - self.idx = idx + temp_idx = idx else: - self.idx = copyfrom.idx if copyfrom is not None else {} + temp_idx = copyfrom.idx if copyfrom is not None else {} + self.idx = temp_idx if fileuri is not None: - self.fileuri = fileuri + temp_fileuri: Optional[str] = fileuri else: - self.fileuri = copyfrom.fileuri if copyfrom is not None else None + temp_fileuri = copyfrom.fileuri if copyfrom is not None else None + self.fileuri = temp_fileuri if baseuri is not None: - self.baseuri = baseuri + temp_baseuri = baseuri else: - self.baseuri = copyfrom.baseuri if copyfrom is not None else "" + temp_baseuri = copyfrom.baseuri if copyfrom is not None else "" + self.baseuri = temp_baseuri if namespaces is not None: - self.namespaces = namespaces + temp_namespaces: MutableMapping[str, str] = namespaces else: - self.namespaces = copyfrom.namespaces if copyfrom is not None else {} + temp_namespaces = copyfrom.namespaces if copyfrom is not None else {} + self.namespaces = temp_namespaces if schemas is not None: - self.schemas = schemas + temp_schemas: MutableSequence[str] = schemas else: - self.schemas = copyfrom.schemas if copyfrom is not None else [] + temp_schemas = copyfrom.schemas if copyfrom is not None else [] + self.schemas = temp_schemas if addl_metadata is not None: - self.addl_metadata = addl_metadata + temp_addl_metadata: MutableMapping[str, Any] = addl_metadata else: - self.addl_metadata = copyfrom.addl_metadata if copyfrom is not None else {} + temp_addl_metadata = copyfrom.addl_metadata if copyfrom is not None else {} + self.addl_metadata = temp_addl_metadata if imports is not None: - self.imports = imports + temp_imports = imports else: - self.imports = copyfrom.imports if copyfrom is not None else [] + temp_imports = copyfrom.imports if copyfrom is not None else [] + self.imports = temp_imports if includes is not None: - self.includes = includes + temp_includes = includes else: - self.includes = copyfrom.includes if copyfrom is not None else [] + temp_includes = copyfrom.includes if copyfrom is not None else [] + self.includes = temp_includes if no_link_check is not None: - self.no_link_check = no_link_check + temp_no_link_check: Optional[bool] = no_link_check else: - self.no_link_check = copyfrom.no_link_check if copyfrom is not None else False + temp_no_link_check = copyfrom.no_link_check if copyfrom is not None else False + self.no_link_check = temp_no_link_check if container is not None: - self.container = container + temp_container: Optional[str] = container else: - self.container = copyfrom.container if copyfrom is not None else None + temp_container = copyfrom.container if copyfrom is not None else None + self.container = temp_container if fetcher is not None: - self.fetcher = fetcher + temp_fetcher = fetcher elif copyfrom is not None: - self.fetcher = copyfrom.fetcher + temp_fetcher = copyfrom.fetcher else: import requests from cachecontrol.caches import SeparateBodyFileCache @@ -136,19 +146,22 @@ def __init__( requests.Session(), cache=SeparateBodyFileCache(root / ".cache" / "salad"), ) - self.fetcher: Fetcher = DefaultFetcher({}, session) + temp_fetcher = DefaultFetcher({}, session) + self.fetcher = temp_fetcher self.cache = self.fetcher.cache if isinstance(self.fetcher, MemoryCachingFetcher) else {} - self.vocab = _vocab - self.rvocab = _rvocab - - if self.namespaces is not None: - self.vocab = self.vocab.copy() - self.rvocab = self.rvocab.copy() + if self.namespaces != {}: + temp_vocab = _vocab.copy() + temp_rvocab = _rvocab.copy() for k, v in self.namespaces.items(): - self.vocab[k] = v - self.rvocab[v] = k + temp_vocab[k] = v + temp_rvocab[v] = k + else: + temp_vocab = _vocab + temp_rvocab = _rvocab + self.vocab = temp_vocab + self.rvocab = temp_rvocab @property def graph(self) -> Graph: @@ -156,7 +169,7 @@ def graph(self) -> Graph: graph = Graph() if not self.schemas: return graph - key = str(hash(tuple(self.schemas))) + key: Final = str(hash(tuple(self.schemas))) if key in self.cache: return cast(Graph, self.cache[key]) for schema in self.schemas: @@ -221,20 +234,20 @@ def load_field( if "$import" in val: if loadingOptions.fileuri is None: raise SchemaSaladException("Cannot load $import without fileuri") - url = loadingOptions.fetcher.urljoin(loadingOptions.fileuri, val["$import"]) + url1: Final = loadingOptions.fetcher.urljoin(loadingOptions.fileuri, val["$import"]) result, metadata = _document_load_by_url( fieldtype, - url, + url1, loadingOptions, ) - loadingOptions.imports.append(url) + loadingOptions.imports.append(url1) return result if "$include" in val: if loadingOptions.fileuri is None: raise SchemaSaladException("Cannot load $import without fileuri") - url = loadingOptions.fetcher.urljoin(loadingOptions.fileuri, val["$include"]) - val = loadingOptions.fetcher.fetch_text(url) - loadingOptions.includes.append(url) + url2: Final = loadingOptions.fetcher.urljoin(loadingOptions.fileuri, val["$include"]) + val = loadingOptions.fetcher.fetch_text(url2) + loadingOptions.includes.append(url2) return fieldtype.load(val, baseuri, loadingOptions, lc=lc) @@ -243,7 +256,7 @@ def load_field( def extract_type(val_type: type[Any]) -> str: """Take a type of value, and extracts the value as a string.""" - val_str = str(val_type) + val_str: Final = str(val_type) return val_str.split("'")[1] @@ -264,10 +277,10 @@ def parse_errors(error_message: str) -> tuple[str, str, str]: """Parse error messages from several loaders into one error message.""" if not error_message.startswith("Expected"): return error_message, "", "" - vals = error_message.split("\n") + vals: Final = error_message.split("\n") if len(vals) == 1: return error_message, "", "" - types = set() + types1: Final = set() for val in vals: individual_vals = val.split(" ") if val == "": @@ -275,27 +288,29 @@ def parse_errors(error_message: str) -> tuple[str, str, str]: if individual_vals[1] == "one": individual_vals = val.split("(")[1].split(",") for t in individual_vals: - types.add(t.strip(" ").strip(")\n")) + types1.add(t.strip(" ").strip(")\n")) elif individual_vals[2] == "").replace("'", "")) + types1.add(individual_vals[3].strip(">").replace("'", "")) elif individual_vals[0] == "Value": - types.add(individual_vals[-1].strip(".")) + types1.add(individual_vals[-1].strip(".")) else: - types.add(individual_vals[1].replace(",", "")) - types = {val for val in types if val != "NoneType"} - if "str" in types: - types = {convert_typing(val) for val in types if "'" not in val} + types1.add(individual_vals[1].replace(",", "")) + types2: Final = {val for val in types1 if val != "NoneType"} + if "str" in types2: + types3 = {convert_typing(val) for val in types2 if "'" not in val} + else: + types3 = types2 to_print = "" - for val in types: + for val in types3: if "'" in val: - to_print = "value" if len(types) == 1 else "values" + to_print = "value" if len(types3) == 1 else "values" if to_print == "": - to_print = "type" if len(types) == 1 else "types" + to_print = "type" if len(types3) == 1 else "types" - verb_tensage = "is" if len(types) == 1 else "are" + verb_tensage: Final = "is" if len(types3) == 1 else "are" - return str(types).replace("{", "(").replace("}", ")").replace("'", ""), to_print, verb_tensage + return str(types3).replace("{", "(").replace("}", ")").replace("'", ""), to_print, verb_tensage def save( @@ -309,7 +324,7 @@ def save( if isinstance(val, MutableSequence): return [save(v, top=False, base_url=base_url, relative_uris=relative_uris) for v in val] if isinstance(val, MutableMapping): - newdict = {} + newdict: Final = {} for key in val: newdict[key] = save(val[key], top=False, base_url=base_url, relative_uris=relative_uris) return newdict @@ -326,7 +341,7 @@ def save_with_metadata( relative_uris: bool = True, ) -> save_type: """Save and set $namespaces, $schemas, $base and any other metadata fields at the top level.""" - saved_val = save(val, top, base_url, relative_uris) + saved_val: Final = save(val, top, base_url, relative_uris) newdict: MutableMapping[str, Any] = {} if isinstance(saved_val, MutableSequence): newdict = {"$graph": saved_val} @@ -361,30 +376,30 @@ def expand_url( return url if bool(loadingOptions.vocab) and ":" in url: - prefix = url.split(":")[0] + prefix: Final = url.split(":")[0] if prefix in loadingOptions.vocab: url = loadingOptions.vocab[prefix] + url[len(prefix) + 1 :] - split = urlsplit(url) + split1: Final = urlsplit(url) if ( - (bool(split.scheme) and split.scheme in loadingOptions.fetcher.supported_schemes()) + (bool(split1.scheme) and split1.scheme in loadingOptions.fetcher.supported_schemes()) or url.startswith("$(") or url.startswith("${") ): pass - elif scoped_id and not bool(split.fragment): - splitbase = urlsplit(base_url) - frg = "" - if bool(splitbase.fragment): - frg = splitbase.fragment + "/" + split.path + elif scoped_id and not bool(split1.fragment): + splitbase1: Final = urlsplit(base_url) + frg: str + if bool(splitbase1.fragment): + frg = splitbase1.fragment + "/" + split1.path else: - frg = split.path - pt = splitbase.path if splitbase.path != "" else "/" - url = urlunsplit((splitbase.scheme, splitbase.netloc, pt, splitbase.query, frg)) - elif scoped_ref is not None and not bool(split.fragment): - splitbase = urlsplit(base_url) - sp = splitbase.fragment.split("/") + frg = split1.path + pt: Final = splitbase1.path if splitbase1.path != "" else "/" + url = urlunsplit((splitbase1.scheme, splitbase1.netloc, pt, splitbase1.query, frg)) + elif scoped_ref is not None and not bool(split1.fragment): + splitbase2: Final = urlsplit(base_url) + sp = splitbase2.fragment.split("/") n = scoped_ref while n > 0 and len(sp) > 0: sp.pop() @@ -392,10 +407,10 @@ def expand_url( sp.append(url) url = urlunsplit( ( - splitbase.scheme, - splitbase.netloc, - splitbase.path, - splitbase.query, + splitbase2.scheme, + splitbase2.netloc, + splitbase2.path, + splitbase2.query, "/".join(sp), ) ) @@ -403,8 +418,8 @@ def expand_url( url = loadingOptions.fetcher.urljoin(base_url, url) if vocab_term: - split = urlsplit(url) - if bool(split.scheme): + split2: Final = urlsplit(url) + if bool(split2.scheme): if url in loadingOptions.rvocab: return loadingOptions.rvocab[url] else: @@ -441,7 +456,7 @@ def load( class _PrimitiveLoader(_Loader): def __init__(self, tp: Union[type, tuple[type[str], type[str]]]) -> None: - self.tp = tp + self.tp: Final = tp def load( self, @@ -461,7 +476,7 @@ def __repr__(self) -> str: class _ArrayLoader(_Loader): def __init__(self, items: _Loader) -> None: - self.items = items + self.items: Final = items def load( self, @@ -476,9 +491,9 @@ def load( f"Value is a {convert_typing(extract_type(type(doc)))}, " f"but valid type for this field is an array." ) - r: list[Any] = [] - errors: list[SchemaSaladException] = [] - fields: list[str] = [] + r: Final[list[Any]] = [] + errors: Final[list[SchemaSaladException]] = [] + fields: Final[list[str]] = [] for i in range(0, len(doc)): try: lf = load_field( @@ -524,10 +539,10 @@ def __init__( container: Optional[str] = None, no_link_check: Optional[bool] = None, ) -> None: - self.values = values - self.name = name - self.container = container - self.no_link_check = no_link_check + self.values: Final = values + self.name: Final = name + self.container: Final = container + self.no_link_check: Final = no_link_check def load( self, @@ -543,8 +558,8 @@ def load( loadingOptions = LoadingOptions( copyfrom=loadingOptions, container=self.container, no_link_check=self.no_link_check ) - r: dict[str, Any] = {} - errors: list[SchemaSaladException] = [] + r: Final[dict[str, Any]] = {} + errors: Final[list[SchemaSaladException]] = [] for k, v in doc.items(): try: lf = load_field(v, self.values, baseuri, loadingOptions, lc) @@ -561,8 +576,8 @@ def __repr__(self) -> str: class _EnumLoader(_Loader): def __init__(self, symbols: Sequence[str], name: str) -> None: - self.symbols = symbols - self.name = name + self.symbols: Final = symbols + self.name: Final = name def load( self, @@ -582,7 +597,7 @@ def __repr__(self) -> str: class _SecondaryDSLLoader(_Loader): def __init__(self, inner: _Loader) -> None: - self.inner = inner + self.inner: Final = inner def load( self, @@ -592,7 +607,7 @@ def load( docRoot: Optional[str] = None, lc: Optional[list[Any]] = None, ) -> Any: - r: list[dict[str, Any]] = [] + r: Final[list[dict[str, Any]]] = [] if isinstance(doc, MutableSequence): for d in doc: if isinstance(d, str): @@ -601,15 +616,15 @@ def load( else: r.append({"pattern": d}) elif isinstance(d, dict): - new_dict: dict[str, Any] = {} + new_dict1: dict[str, Any] = {} dict_copy = copy.deepcopy(d) if "pattern" in dict_copy: - new_dict["pattern"] = dict_copy.pop("pattern") + new_dict1["pattern"] = dict_copy.pop("pattern") else: raise ValidationException( f"Missing pattern in secondaryFiles specification entry: {d}" ) - new_dict["required"] = ( + new_dict1["required"] = ( dict_copy.pop("required") if "required" in dict_copy else None ) @@ -619,28 +634,28 @@ def load( dict_copy ) ) - r.append(new_dict) + r.append(new_dict1) else: raise ValidationException( "Expected a string or sequence of (strings or mappings)." ) elif isinstance(doc, MutableMapping): - new_dict = {} - doc_copy = copy.deepcopy(doc) + new_dict2: Final = {} + doc_copy: Final = copy.deepcopy(doc) if "pattern" in doc_copy: - new_dict["pattern"] = doc_copy.pop("pattern") + new_dict2["pattern"] = doc_copy.pop("pattern") else: raise ValidationException( f"Missing pattern in secondaryFiles specification entry: {doc}" ) - new_dict["required"] = doc_copy.pop("required") if "required" in doc_copy else None + new_dict2["required"] = doc_copy.pop("required") if "required" in doc_copy else None if len(doc_copy): raise ValidationException( f"Unallowed values in secondaryFiles specification entry: {doc_copy}" ) - r.append(new_dict) + r.append(new_dict2) elif isinstance(doc, str): if doc.endswith("?"): @@ -659,9 +674,9 @@ def __init__( container: Optional[str] = None, no_link_check: Optional[bool] = None, ) -> None: - self.classtype = classtype - self.container = container - self.no_link_check = no_link_check + self.classtype: Final = classtype + self.container: Final = container + self.no_link_check: Final = no_link_check def load( self, @@ -688,7 +703,7 @@ def __repr__(self) -> str: class _ExpressionLoader(_Loader): def __init__(self, items: type[str]) -> None: - self.items = items + self.items: Final = items def load( self, @@ -709,7 +724,7 @@ def load( class _UnionLoader(_Loader): def __init__(self, alternates: Sequence[_Loader], name: Optional[str] = None) -> None: self.alternates = alternates - self.name = name + self.name: Final = name def add_loaders(self, loaders: Sequence[_Loader]) -> None: self.alternates = tuple(loader for loader in chain(self.alternates, loaders)) @@ -722,7 +737,7 @@ def load( docRoot: Optional[str] = None, lc: Optional[list[Any]] = None, ) -> Any: - errors = [] + errors: Final = [] if lc is None: lc = [] @@ -805,11 +820,11 @@ def __init__( scoped_ref: Optional[int], no_link_check: Optional[bool], ) -> None: - self.inner = inner - self.scoped_id = scoped_id - self.vocab_term = vocab_term - self.scoped_ref = scoped_ref - self.no_link_check = no_link_check + self.inner: Final = inner + self.scoped_id: Final = scoped_id + self.vocab_term: Final = vocab_term + self.scoped_ref: Final = scoped_ref + self.no_link_check: Final = no_link_check def load( self, @@ -824,7 +839,7 @@ def load( copyfrom=loadingOptions, no_link_check=self.no_link_check ) if isinstance(doc, MutableSequence): - newdoc = [] + newdoc: Final = [] for i in doc: if isinstance(i, str): newdoc.append( @@ -851,7 +866,7 @@ def load( ) if isinstance(doc, str): if not loadingOptions.no_link_check: - errors = [] + errors: Final = [] try: if not loadingOptions.fetcher.check_exists(doc): errors.append( @@ -866,9 +881,9 @@ def load( class _TypeDSLLoader(_Loader): def __init__(self, inner: _Loader, refScope: Optional[int], salad_version: str) -> None: - self.inner = inner - self.refScope = refScope - self.salad_version = salad_version + self.inner: Final = inner + self.refScope: Final = refScope + self.salad_version: Final = salad_version def resolve( self, @@ -883,9 +898,9 @@ def resolve( doc_ = doc_[0:-1] if doc_.endswith("[]"): - salad_versions = [int(v) for v in self.salad_version[1:].split(".")] + salad_versions: Final = [int(v) for v in self.salad_version[1:].split(".")] items: Union[list[Union[dict[str, Any], str]], dict[str, Any], str] = "" - rest = doc_[0:-2] + rest: Final = doc_[0:-2] if salad_versions < [1, 3]: if rest.endswith("[]"): # To show the error message with the original type @@ -914,7 +929,7 @@ def load( lc: Optional[list[Any]] = None, ) -> Any: if isinstance(doc, MutableSequence): - r: list[Any] = [] + r: Final[list[Any]] = [] for d in doc: if isinstance(d, str): resolved = self.resolve(d, baseuri, loadingOptions) @@ -936,9 +951,9 @@ def load( class _IdMapLoader(_Loader): def __init__(self, inner: _Loader, mapSubject: str, mapPredicate: Optional[str]) -> None: - self.inner = inner - self.mapSubject = mapSubject - self.mapPredicate = mapPredicate + self.inner: Final = inner + self.mapSubject: Final = mapSubject + self.mapPredicate: Final = mapPredicate def load( self, @@ -949,7 +964,7 @@ def load( lc: Optional[list[Any]] = None, ) -> Any: if isinstance(doc, MutableMapping): - r: list[Any] = [] + r: Final[list[Any]] = [] for k in doc.keys(): val = doc[k] if isinstance(val, CommentedMap): @@ -989,13 +1004,13 @@ def _document_load( ) if isinstance(doc, MutableMapping): - addl_metadata = {} + addl_metadata: Final = {} if addl_metadata_fields is not None: for mf in addl_metadata_fields: if mf in doc: addl_metadata[mf] = doc[mf] - docuri = baseuri + docuri: Final = baseuri if "$base" in doc: baseuri = doc["$base"] @@ -1007,22 +1022,22 @@ def _document_load( addl_metadata=addl_metadata, ) - doc = copy.copy(doc) - if "$namespaces" in doc: - doc.pop("$namespaces") - if "$schemas" in doc: - doc.pop("$schemas") - if "$base" in doc: - doc.pop("$base") + doc2: Final = copy.copy(doc) + if "$namespaces" in doc2: + doc2.pop("$namespaces") + if "$schemas" in doc2: + doc2.pop("$schemas") + if "$base" in doc2: + doc2.pop("$base") - if "$graph" in doc: + if "$graph" in doc2: loadingOptions.idx[baseuri] = ( - loader.load(doc["$graph"], baseuri, loadingOptions), + loader.load(doc2["$graph"], baseuri, loadingOptions), loadingOptions, ) else: loadingOptions.idx[baseuri] = ( - loader.load(doc, baseuri, loadingOptions, docRoot=baseuri), + loader.load(doc2, baseuri, loadingOptions, docRoot=baseuri), loadingOptions, ) @@ -1054,11 +1069,11 @@ def _document_load_by_url( doc_url, frg = urldefrag(url) - text = loadingOptions.fetcher.fetch_text(doc_url) - textIO = StringIO(text) + text: Final = loadingOptions.fetcher.fetch_text(doc_url) + textIO: Final = StringIO(text) textIO.name = str(doc_url) - yaml = yaml_no_ts() - result = yaml.load(textIO) + yaml: Final = yaml_no_ts() + result: Final = yaml.load(textIO) add_lc_filename(result, doc_url) loadingOptions = LoadingOptions(copyfrom=loadingOptions, fileuri=doc_url) @@ -1079,7 +1094,7 @@ def file_uri(path: str, split_frag: bool = False) -> str: if path.startswith("file://"): return path if split_frag: - pathsp = path.split("#", 2) + pathsp: Final = path.split("#", 2) frag = "#" + quote(str(pathsp[1])) if len(pathsp) == 2 else "" urlpath = pathname2url(str(pathsp[0])) else: @@ -1111,8 +1126,8 @@ def save_relative_uri( elif isinstance(uri, str): if not relative_uris or uri == base_url: return uri - urisplit = urlsplit(uri) - basesplit = urlsplit(base_url) + urisplit: Final = urlsplit(uri) + basesplit: Final = urlsplit(base_url) if urisplit.scheme == basesplit.scheme and urisplit.netloc == basesplit.netloc: if urisplit.path != basesplit.path: p = os.path.relpath(urisplit.path, os.path.dirname(basesplit.path)) @@ -1143,7 +1158,7 @@ def shortname(inputid: str) -> str: See https://w3id.org/cwl/v1.2/SchemaSalad.html#Short_names. """ - parsed_id = urlparse(inputid) + parsed_id: Final = urlparse(inputid) if parsed_id.fragment: return parsed_id.fragment.split("/")[-1] return parsed_id.path.split("/")[-1] @@ -4378,6 +4393,10 @@ class Identified(Saveable): pass +class IdentifierRequired(Identified): + pass + + class LoadContents(Saveable): pass @@ -4394,7 +4413,7 @@ class OutputFormat(Saveable): pass -class Parameter(FieldBase, Documented, Identified): +class Parameter(FieldBase, Documented, IdentifierRequired): """ Define an input or output parameter to a process. @@ -9841,150 +9860,7 @@ def save( class CommandLineBindable(Saveable): - def __init__( - self, - inputBinding: Optional[Any] = None, - extension_fields: Optional[dict[str, Any]] = None, - loadingOptions: Optional[LoadingOptions] = None, - ) -> None: - if extension_fields: - self.extension_fields = extension_fields - else: - self.extension_fields = CommentedMap() - if loadingOptions: - self.loadingOptions = loadingOptions - else: - self.loadingOptions = LoadingOptions() - self.inputBinding = inputBinding - - def __eq__(self, other: Any) -> bool: - if isinstance(other, CommandLineBindable): - return bool(self.inputBinding == other.inputBinding) - return False - - def __hash__(self) -> int: - return hash((self.inputBinding)) - - @classmethod - def fromDoc( - cls, - doc: Any, - baseuri: str, - loadingOptions: LoadingOptions, - docRoot: Optional[str] = None - ) -> "CommandLineBindable": - _doc = copy.copy(doc) - - if hasattr(doc, "lc"): - _doc.lc.data = doc.lc.data - _doc.lc.filename = doc.lc.filename - _errors__ = [] - inputBinding = None - if "inputBinding" in _doc: - try: - inputBinding = load_field( - _doc.get("inputBinding"), - union_of_None_type_or_CommandLineBindingLoader, - baseuri, - loadingOptions, - lc=_doc.get("inputBinding") - ) - - except ValidationException as e: - error_message, to_print, verb_tensage = parse_errors(str(e)) - - if str(e) == "missing required field `inputBinding`": - _errors__.append( - ValidationException( - str(e), - None - ) - ) - else: - val = _doc.get("inputBinding") - if error_message != str(e): - val_type = convert_typing(extract_type(type(val))) - _errors__.append( - ValidationException( - "the `inputBinding` field is not valid because:", - SourceLine(_doc, "inputBinding", str), - [ValidationException(f"Value is a {val_type}, " - f"but valid {to_print} for this field " - f"{verb_tensage} {error_message}", - detailed_message=f"Value `{val}` is a {val_type}, " - f"but valid {to_print} for this field " - f"{verb_tensage} {error_message}")], - ) - ) - else: - _errors__.append( - ValidationException( - "the `inputBinding` field is not valid because:", - SourceLine(_doc, "inputBinding", str), - [e], - detailed_message=f"the `inputBinding` field with value `{val}` " - "is not valid because:", - ) - ) - extension_fields: dict[str, Any] = {} - for k in _doc.keys(): - if k not in cls.attrs: - if not k: - _errors__.append( - ValidationException("mapping with implicit null key") - ) - elif ":" in k: - ex = expand_url( - k, "", loadingOptions, scoped_id=False, vocab_term=False - ) - extension_fields[ex] = _doc[k] - else: - _errors__.append( - ValidationException( - "invalid field `{}`, expected one of: `inputBinding`".format( - k - ), - SourceLine(_doc, k, str), - ) - ) - - if _errors__: - raise ValidationException("", None, _errors__, "*") - _constructed = cls( - inputBinding=inputBinding, - extension_fields=extension_fields, - loadingOptions=loadingOptions, - ) - return _constructed - - def save( - self, top: bool = False, base_url: str = "", relative_uris: bool = True - ) -> dict[str, Any]: - r: dict[str, Any] = {} - - if relative_uris: - for ef in self.extension_fields: - r[prefix_url(ef, self.loadingOptions.vocab)] = self.extension_fields[ef] - else: - for ef in self.extension_fields: - r[ef] = self.extension_fields[ef] - if self.inputBinding is not None: - r["inputBinding"] = save( - self.inputBinding, - top=False, - base_url=base_url, - relative_uris=relative_uris, - ) - - # top refers to the directory level - if top: - if self.loadingOptions.namespaces: - r["$namespaces"] = self.loadingOptions.namespaces - if self.loadingOptions.schemas: - r["$schemas"] = self.loadingOptions.schemas - return r - - attrs = frozenset(["inputBinding"]) + pass class CommandInputRecordField(InputRecordField, CommandLineBindable): @@ -13728,12 +13604,12 @@ class CommandInputParameter(InputParameter): def __init__( self, + id: Any, type_: Any, label: Optional[Any] = None, secondaryFiles: Optional[Any] = None, streamable: Optional[Any] = None, doc: Optional[Any] = None, - id: Optional[Any] = None, format: Optional[Any] = None, loadContents: Optional[Any] = None, loadListing: Optional[Any] = None, @@ -13815,7 +13691,7 @@ def fromDoc( try: id = load_field( _doc.get("id"), - uri_union_of_None_type_or_strtype_True_False_None_None, + uri_strtype_True_False_None_None, baseuri, loadingOptions, lc=_doc.get("id") @@ -13863,7 +13739,7 @@ def fromDoc( if docRoot is not None: id = docRoot else: - id = "_:" + str(_uuid__.uuid4()) + _errors__.append(ValidationException("missing id")) if not __original_id_is_none: baseuri = cast(str, id) label = None @@ -14482,12 +14358,12 @@ class CommandOutputParameter(OutputParameter): def __init__( self, + id: Any, type_: Any, label: Optional[Any] = None, secondaryFiles: Optional[Any] = None, streamable: Optional[Any] = None, doc: Optional[Any] = None, - id: Optional[Any] = None, format: Optional[Any] = None, outputBinding: Optional[Any] = None, extension_fields: Optional[dict[str, Any]] = None, @@ -14557,7 +14433,7 @@ def fromDoc( try: id = load_field( _doc.get("id"), - uri_union_of_None_type_or_strtype_True_False_None_None, + uri_strtype_True_False_None_None, baseuri, loadingOptions, lc=_doc.get("id") @@ -14605,7 +14481,7 @@ def fromDoc( if docRoot is not None: id = docRoot else: - id = "_:" + str(_uuid__.uuid4()) + _errors__.append(ValidationException("missing id")) if not __original_id_is_none: baseuri = cast(str, id) label = None @@ -19234,12 +19110,12 @@ class ExpressionToolOutputParameter(OutputParameter): def __init__( self, + id: Any, type_: Any, label: Optional[Any] = None, secondaryFiles: Optional[Any] = None, streamable: Optional[Any] = None, doc: Optional[Any] = None, - id: Optional[Any] = None, format: Optional[Any] = None, extension_fields: Optional[dict[str, Any]] = None, loadingOptions: Optional[LoadingOptions] = None, @@ -19305,7 +19181,7 @@ def fromDoc( try: id = load_field( _doc.get("id"), - uri_union_of_None_type_or_strtype_True_False_None_None, + uri_strtype_True_False_None_None, baseuri, loadingOptions, lc=_doc.get("id") @@ -19353,7 +19229,7 @@ def fromDoc( if docRoot is not None: id = docRoot else: - id = "_:" + str(_uuid__.uuid4()) + _errors__.append(ValidationException("missing id")) if not __original_id_is_none: baseuri = cast(str, id) label = None @@ -19739,12 +19615,12 @@ class WorkflowInputParameter(InputParameter): def __init__( self, + id: Any, type_: Any, label: Optional[Any] = None, secondaryFiles: Optional[Any] = None, streamable: Optional[Any] = None, doc: Optional[Any] = None, - id: Optional[Any] = None, format: Optional[Any] = None, loadContents: Optional[Any] = None, loadListing: Optional[Any] = None, @@ -19826,7 +19702,7 @@ def fromDoc( try: id = load_field( _doc.get("id"), - uri_union_of_None_type_or_strtype_True_False_None_None, + uri_strtype_True_False_None_None, baseuri, loadingOptions, lc=_doc.get("id") @@ -19874,7 +19750,7 @@ def fromDoc( if docRoot is not None: id = docRoot else: - id = "_:" + str(_uuid__.uuid4()) + _errors__.append(ValidationException("missing id")) if not __original_id_is_none: baseuri = cast(str, id) label = None @@ -21165,12 +21041,12 @@ class WorkflowOutputParameter(OutputParameter): def __init__( self, + id: Any, type_: Any, label: Optional[Any] = None, secondaryFiles: Optional[Any] = None, streamable: Optional[Any] = None, doc: Optional[Any] = None, - id: Optional[Any] = None, format: Optional[Any] = None, outputSource: Optional[Any] = None, linkMerge: Optional[Any] = None, @@ -21244,7 +21120,7 @@ def fromDoc( try: id = load_field( _doc.get("id"), - uri_union_of_None_type_or_strtype_True_False_None_None, + uri_strtype_True_False_None_None, baseuri, loadingOptions, lc=_doc.get("id") @@ -21292,7 +21168,7 @@ def fromDoc( if docRoot is not None: id = docRoot else: - id = "_:" + str(_uuid__.uuid4()) + _errors__.append(ValidationException("missing id")) if not __original_id_is_none: baseuri = cast(str, id) label = None @@ -21790,7 +21666,7 @@ class Sink(Saveable): pass -class WorkflowStepInput(Identified, Sink, LoadContents, Labeled): +class WorkflowStepInput(IdentifierRequired, Sink, LoadContents, Labeled): """ The input of a workflow step connects an upstream parameter (from the workflow inputs, or the outputs of other workflows steps) with the input @@ -21842,7 +21718,7 @@ class WorkflowStepInput(Identified, Sink, LoadContents, Labeled): def __init__( self, - id: Optional[Any] = None, + id: Any, source: Optional[Any] = None, linkMerge: Optional[Any] = None, loadContents: Optional[Any] = None, @@ -21917,7 +21793,7 @@ def fromDoc( try: id = load_field( _doc.get("id"), - uri_union_of_None_type_or_strtype_True_False_None_None, + uri_strtype_True_False_None_None, baseuri, loadingOptions, lc=_doc.get("id") @@ -21965,7 +21841,7 @@ def fromDoc( if docRoot is not None: id = docRoot else: - id = "_:" + str(_uuid__.uuid4()) + _errors__.append(ValidationException("missing id")) if not __original_id_is_none: baseuri = cast(str, id) source = None @@ -22406,7 +22282,7 @@ def save( ) -class WorkflowStepOutput(Identified): +class WorkflowStepOutput(IdentifierRequired): """ Associate an output parameter of the underlying process with a workflow parameter. The workflow parameter (given in the `id` field) be may be used @@ -22423,7 +22299,7 @@ class WorkflowStepOutput(Identified): def __init__( self, - id: Optional[Any] = None, + id: Any, extension_fields: Optional[dict[str, Any]] = None, loadingOptions: Optional[LoadingOptions] = None, ) -> None: @@ -22464,7 +22340,7 @@ def fromDoc( try: id = load_field( _doc.get("id"), - uri_union_of_None_type_or_strtype_True_False_None_None, + uri_strtype_True_False_None_None, baseuri, loadingOptions, lc=_doc.get("id") @@ -22512,7 +22388,7 @@ def fromDoc( if docRoot is not None: id = docRoot else: - id = "_:" + str(_uuid__.uuid4()) + _errors__.append(ValidationException("missing id")) if not __original_id_is_none: baseuri = cast(str, id) extension_fields: dict[str, Any] = {} @@ -22571,7 +22447,7 @@ def save( attrs = frozenset(["id"]) -class WorkflowStep(Identified, Labeled, Documented): +class WorkflowStep(IdentifierRequired, Labeled, Documented): """ A workflow step is an executable element of a workflow. It specifies the underlying process implementation (such as `CommandLineTool` or another @@ -22635,10 +22511,10 @@ class WorkflowStep(Identified, Labeled, Documented): def __init__( self, + id: Any, in_: Any, out: Any, run: Any, - id: Optional[Any] = None, label: Optional[Any] = None, doc: Optional[Any] = None, requirements: Optional[Any] = None, @@ -22718,7 +22594,7 @@ def fromDoc( try: id = load_field( _doc.get("id"), - uri_union_of_None_type_or_strtype_True_False_None_None, + uri_strtype_True_False_None_None, baseuri, loadingOptions, lc=_doc.get("id") @@ -22766,7 +22642,7 @@ def fromDoc( if docRoot is not None: id = docRoot else: - id = "_:" + str(_uuid__.uuid4()) + _errors__.append(ValidationException("missing id")) if not __original_id_is_none: baseuri = cast(str, id) label = None @@ -26080,6 +25956,7 @@ def save( "File": "https://w3id.org/cwl/cwl#File", "IOSchema": "https://w3id.org/cwl/cwl#IOSchema", "Identified": "https://w3id.org/cwl/cwl#Identified", + "IdentifierRequired": "https://w3id.org/cwl/cwl#IdentifierRequired", "InitialWorkDirRequirement": "https://w3id.org/cwl/cwl#InitialWorkDirRequirement", "InlineJavascriptRequirement": "https://w3id.org/cwl/cwl#InlineJavascriptRequirement", "InplaceUpdateRequirement": "https://w3id.org/cwl/cwl#InplaceUpdateRequirement", @@ -26215,6 +26092,7 @@ def save( "https://w3id.org/cwl/cwl#File": "File", "https://w3id.org/cwl/cwl#IOSchema": "IOSchema", "https://w3id.org/cwl/cwl#Identified": "Identified", + "https://w3id.org/cwl/cwl#IdentifierRequired": "IdentifierRequired", "https://w3id.org/cwl/cwl#InitialWorkDirRequirement": "InitialWorkDirRequirement", "https://w3id.org/cwl/cwl#InlineJavascriptRequirement": "InlineJavascriptRequirement", "https://w3id.org/cwl/cwl#InplaceUpdateRequirement": "InplaceUpdateRequirement", @@ -26512,7 +26390,6 @@ def save( EnvironmentDefLoader = _RecordLoader(EnvironmentDef, None, None) CommandLineBindingLoader = _RecordLoader(CommandLineBinding, None, None) CommandOutputBindingLoader = _RecordLoader(CommandOutputBinding, None, None) -CommandLineBindableLoader = _RecordLoader(CommandLineBindable, None, None) CommandInputRecordFieldLoader = _RecordLoader(CommandInputRecordField, None, None) CommandInputRecordSchemaLoader = _RecordLoader(CommandInputRecordSchema, None, None) CommandInputEnumSchemaLoader = _RecordLoader(CommandInputEnumSchema, None, None) diff --git a/cwl_utils/parser/cwl_v1_1_utils.py b/cwl_utils/parser/cwl_v1_1_utils.py index 1d9ea49e..79ce337b 100644 --- a/cwl_utils/parser/cwl_v1_1_utils.py +++ b/cwl_utils/parser/cwl_v1_1_utils.py @@ -5,7 +5,7 @@ from collections.abc import MutableMapping, MutableSequence from io import StringIO from pathlib import Path -from typing import IO, Any, Union, cast +from typing import Any, IO, cast from urllib.parse import urldefrag from schema_salad.exceptions import ValidationException @@ -48,12 +48,8 @@ def _compare_records( _logger.info( "Record comparison failure for %s and %s\n" "Did not match fields for %s: %s and %s", - cast( - Union[cwl.InputRecordSchema, cwl.CommandOutputRecordSchema], src - ).name, - cast( - Union[cwl.InputRecordSchema, cwl.CommandOutputRecordSchema], sink - ).name, + cast(cwl.InputRecordSchema | cwl.CommandOutputRecordSchema, src).name, + cast(cwl.InputRecordSchema | cwl.CommandOutputRecordSchema, sink).name, key, srcfields.get(key), sinkfields.get(key), @@ -79,8 +75,8 @@ def _compare_type(type1: Any, type2: Any) -> bool: case MutableSequence(), MutableSequence(): if len(type1) != len(type2): return False - for t1 in type1: - if not any(_compare_type(t1, t2) for t2 in type2): + for t3 in type1: + if not any(_compare_type(t3, t2) for t2 in type2): return False return True return bool(type1 == type2) @@ -443,7 +439,7 @@ def type_for_source( """Determine the type for the given sourcenames.""" scatter_context: list[tuple[int, str] | None] = [] params = param_for_source_id(process, sourcenames, parent, scatter_context) - if not isinstance(params, list): + if not isinstance(params, MutableSequence): new_type = params.type_ if scatter_context[0] is not None: if scatter_context[0][1] == "nested_crossproduct": @@ -491,11 +487,24 @@ def param_for_source_id( sourcenames: str | list[str], parent: cwl.Workflow | None = None, scatter_context: list[tuple[int, str] | None] | None = None, -) -> list[cwl.WorkflowInputParameter] | cwl.WorkflowInputParameter: +) -> ( + cwl.CommandInputParameter + | cwl.CommandOutputParameter + | cwl.WorkflowInputParameter + | MutableSequence[ + cwl.CommandInputParameter + | cwl.CommandOutputParameter + | cwl.WorkflowInputParameter + ] +): """Find the process input parameter that matches one of the given sourcenames.""" if isinstance(sourcenames, str): sourcenames = [sourcenames] - params: list[cwl.WorkflowInputParameter] = [] + params: MutableSequence[ + cwl.CommandInputParameter + | cwl.CommandOutputParameter + | cwl.WorkflowInputParameter + ] = [] for sourcename in sourcenames: if not isinstance(process, cwl.Workflow): for param in process.inputs: diff --git a/cwl_utils/parser/cwl_v1_2.py b/cwl_utils/parser/cwl_v1_2.py index b24ac23c..85f744fc 100644 --- a/cwl_utils/parser/cwl_v1_2.py +++ b/cwl_utils/parser/cwl_v1_2.py @@ -14,7 +14,7 @@ from collections.abc import MutableMapping, MutableSequence, Sequence from io import StringIO from itertools import chain -from typing import Any, Optional, Union, cast +from typing import Any, Final, Optional, Union, cast from urllib.parse import quote, urldefrag, urlparse, urlsplit, urlunsplit from urllib.request import pathname2url @@ -30,28 +30,28 @@ _vocab: dict[str, str] = {} _rvocab: dict[str, str] = {} -_logger = logging.getLogger("salad") +_logger: Final = logging.getLogger("salad") IdxType = MutableMapping[str, tuple[Any, "LoadingOptions"]] class LoadingOptions: - idx: IdxType - fileuri: Optional[str] - baseuri: str - namespaces: MutableMapping[str, str] - schemas: MutableSequence[str] - original_doc: Optional[Any] - addl_metadata: MutableMapping[str, Any] - fetcher: Fetcher - vocab: dict[str, str] - rvocab: dict[str, str] - cache: CacheType - imports: list[str] - includes: list[str] - no_link_check: Optional[bool] - container: Optional[str] + idx: Final[IdxType] + fileuri: Final[Optional[str]] + baseuri: Final[str] + namespaces: Final[MutableMapping[str, str]] + schemas: Final[MutableSequence[str]] + original_doc: Final[Optional[Any]] + addl_metadata: Final[MutableMapping[str, Any]] + fetcher: Final[Fetcher] + vocab: Final[dict[str, str]] + rvocab: Final[dict[str, str]] + cache: Final[CacheType] + imports: Final[list[str]] + includes: Final[list[str]] + no_link_check: Final[Optional[bool]] + container: Final[Optional[str]] def __init__( self, @@ -73,59 +73,69 @@ def __init__( self.original_doc = original_doc if idx is not None: - self.idx = idx + temp_idx = idx else: - self.idx = copyfrom.idx if copyfrom is not None else {} + temp_idx = copyfrom.idx if copyfrom is not None else {} + self.idx = temp_idx if fileuri is not None: - self.fileuri = fileuri + temp_fileuri: Optional[str] = fileuri else: - self.fileuri = copyfrom.fileuri if copyfrom is not None else None + temp_fileuri = copyfrom.fileuri if copyfrom is not None else None + self.fileuri = temp_fileuri if baseuri is not None: - self.baseuri = baseuri + temp_baseuri = baseuri else: - self.baseuri = copyfrom.baseuri if copyfrom is not None else "" + temp_baseuri = copyfrom.baseuri if copyfrom is not None else "" + self.baseuri = temp_baseuri if namespaces is not None: - self.namespaces = namespaces + temp_namespaces: MutableMapping[str, str] = namespaces else: - self.namespaces = copyfrom.namespaces if copyfrom is not None else {} + temp_namespaces = copyfrom.namespaces if copyfrom is not None else {} + self.namespaces = temp_namespaces if schemas is not None: - self.schemas = schemas + temp_schemas: MutableSequence[str] = schemas else: - self.schemas = copyfrom.schemas if copyfrom is not None else [] + temp_schemas = copyfrom.schemas if copyfrom is not None else [] + self.schemas = temp_schemas if addl_metadata is not None: - self.addl_metadata = addl_metadata + temp_addl_metadata: MutableMapping[str, Any] = addl_metadata else: - self.addl_metadata = copyfrom.addl_metadata if copyfrom is not None else {} + temp_addl_metadata = copyfrom.addl_metadata if copyfrom is not None else {} + self.addl_metadata = temp_addl_metadata if imports is not None: - self.imports = imports + temp_imports = imports else: - self.imports = copyfrom.imports if copyfrom is not None else [] + temp_imports = copyfrom.imports if copyfrom is not None else [] + self.imports = temp_imports if includes is not None: - self.includes = includes + temp_includes = includes else: - self.includes = copyfrom.includes if copyfrom is not None else [] + temp_includes = copyfrom.includes if copyfrom is not None else [] + self.includes = temp_includes if no_link_check is not None: - self.no_link_check = no_link_check + temp_no_link_check: Optional[bool] = no_link_check else: - self.no_link_check = copyfrom.no_link_check if copyfrom is not None else False + temp_no_link_check = copyfrom.no_link_check if copyfrom is not None else False + self.no_link_check = temp_no_link_check if container is not None: - self.container = container + temp_container: Optional[str] = container else: - self.container = copyfrom.container if copyfrom is not None else None + temp_container = copyfrom.container if copyfrom is not None else None + self.container = temp_container if fetcher is not None: - self.fetcher = fetcher + temp_fetcher = fetcher elif copyfrom is not None: - self.fetcher = copyfrom.fetcher + temp_fetcher = copyfrom.fetcher else: import requests from cachecontrol.caches import SeparateBodyFileCache @@ -136,19 +146,22 @@ def __init__( requests.Session(), cache=SeparateBodyFileCache(root / ".cache" / "salad"), ) - self.fetcher: Fetcher = DefaultFetcher({}, session) + temp_fetcher = DefaultFetcher({}, session) + self.fetcher = temp_fetcher self.cache = self.fetcher.cache if isinstance(self.fetcher, MemoryCachingFetcher) else {} - self.vocab = _vocab - self.rvocab = _rvocab - - if self.namespaces is not None: - self.vocab = self.vocab.copy() - self.rvocab = self.rvocab.copy() + if self.namespaces != {}: + temp_vocab = _vocab.copy() + temp_rvocab = _rvocab.copy() for k, v in self.namespaces.items(): - self.vocab[k] = v - self.rvocab[v] = k + temp_vocab[k] = v + temp_rvocab[v] = k + else: + temp_vocab = _vocab + temp_rvocab = _rvocab + self.vocab = temp_vocab + self.rvocab = temp_rvocab @property def graph(self) -> Graph: @@ -156,7 +169,7 @@ def graph(self) -> Graph: graph = Graph() if not self.schemas: return graph - key = str(hash(tuple(self.schemas))) + key: Final = str(hash(tuple(self.schemas))) if key in self.cache: return cast(Graph, self.cache[key]) for schema in self.schemas: @@ -221,20 +234,20 @@ def load_field( if "$import" in val: if loadingOptions.fileuri is None: raise SchemaSaladException("Cannot load $import without fileuri") - url = loadingOptions.fetcher.urljoin(loadingOptions.fileuri, val["$import"]) + url1: Final = loadingOptions.fetcher.urljoin(loadingOptions.fileuri, val["$import"]) result, metadata = _document_load_by_url( fieldtype, - url, + url1, loadingOptions, ) - loadingOptions.imports.append(url) + loadingOptions.imports.append(url1) return result if "$include" in val: if loadingOptions.fileuri is None: raise SchemaSaladException("Cannot load $import without fileuri") - url = loadingOptions.fetcher.urljoin(loadingOptions.fileuri, val["$include"]) - val = loadingOptions.fetcher.fetch_text(url) - loadingOptions.includes.append(url) + url2: Final = loadingOptions.fetcher.urljoin(loadingOptions.fileuri, val["$include"]) + val = loadingOptions.fetcher.fetch_text(url2) + loadingOptions.includes.append(url2) return fieldtype.load(val, baseuri, loadingOptions, lc=lc) @@ -243,7 +256,7 @@ def load_field( def extract_type(val_type: type[Any]) -> str: """Take a type of value, and extracts the value as a string.""" - val_str = str(val_type) + val_str: Final = str(val_type) return val_str.split("'")[1] @@ -264,10 +277,10 @@ def parse_errors(error_message: str) -> tuple[str, str, str]: """Parse error messages from several loaders into one error message.""" if not error_message.startswith("Expected"): return error_message, "", "" - vals = error_message.split("\n") + vals: Final = error_message.split("\n") if len(vals) == 1: return error_message, "", "" - types = set() + types1: Final = set() for val in vals: individual_vals = val.split(" ") if val == "": @@ -275,27 +288,29 @@ def parse_errors(error_message: str) -> tuple[str, str, str]: if individual_vals[1] == "one": individual_vals = val.split("(")[1].split(",") for t in individual_vals: - types.add(t.strip(" ").strip(")\n")) + types1.add(t.strip(" ").strip(")\n")) elif individual_vals[2] == "").replace("'", "")) + types1.add(individual_vals[3].strip(">").replace("'", "")) elif individual_vals[0] == "Value": - types.add(individual_vals[-1].strip(".")) + types1.add(individual_vals[-1].strip(".")) else: - types.add(individual_vals[1].replace(",", "")) - types = {val for val in types if val != "NoneType"} - if "str" in types: - types = {convert_typing(val) for val in types if "'" not in val} + types1.add(individual_vals[1].replace(",", "")) + types2: Final = {val for val in types1 if val != "NoneType"} + if "str" in types2: + types3 = {convert_typing(val) for val in types2 if "'" not in val} + else: + types3 = types2 to_print = "" - for val in types: + for val in types3: if "'" in val: - to_print = "value" if len(types) == 1 else "values" + to_print = "value" if len(types3) == 1 else "values" if to_print == "": - to_print = "type" if len(types) == 1 else "types" + to_print = "type" if len(types3) == 1 else "types" - verb_tensage = "is" if len(types) == 1 else "are" + verb_tensage: Final = "is" if len(types3) == 1 else "are" - return str(types).replace("{", "(").replace("}", ")").replace("'", ""), to_print, verb_tensage + return str(types3).replace("{", "(").replace("}", ")").replace("'", ""), to_print, verb_tensage def save( @@ -309,7 +324,7 @@ def save( if isinstance(val, MutableSequence): return [save(v, top=False, base_url=base_url, relative_uris=relative_uris) for v in val] if isinstance(val, MutableMapping): - newdict = {} + newdict: Final = {} for key in val: newdict[key] = save(val[key], top=False, base_url=base_url, relative_uris=relative_uris) return newdict @@ -326,7 +341,7 @@ def save_with_metadata( relative_uris: bool = True, ) -> save_type: """Save and set $namespaces, $schemas, $base and any other metadata fields at the top level.""" - saved_val = save(val, top, base_url, relative_uris) + saved_val: Final = save(val, top, base_url, relative_uris) newdict: MutableMapping[str, Any] = {} if isinstance(saved_val, MutableSequence): newdict = {"$graph": saved_val} @@ -361,30 +376,30 @@ def expand_url( return url if bool(loadingOptions.vocab) and ":" in url: - prefix = url.split(":")[0] + prefix: Final = url.split(":")[0] if prefix in loadingOptions.vocab: url = loadingOptions.vocab[prefix] + url[len(prefix) + 1 :] - split = urlsplit(url) + split1: Final = urlsplit(url) if ( - (bool(split.scheme) and split.scheme in loadingOptions.fetcher.supported_schemes()) + (bool(split1.scheme) and split1.scheme in loadingOptions.fetcher.supported_schemes()) or url.startswith("$(") or url.startswith("${") ): pass - elif scoped_id and not bool(split.fragment): - splitbase = urlsplit(base_url) - frg = "" - if bool(splitbase.fragment): - frg = splitbase.fragment + "/" + split.path + elif scoped_id and not bool(split1.fragment): + splitbase1: Final = urlsplit(base_url) + frg: str + if bool(splitbase1.fragment): + frg = splitbase1.fragment + "/" + split1.path else: - frg = split.path - pt = splitbase.path if splitbase.path != "" else "/" - url = urlunsplit((splitbase.scheme, splitbase.netloc, pt, splitbase.query, frg)) - elif scoped_ref is not None and not bool(split.fragment): - splitbase = urlsplit(base_url) - sp = splitbase.fragment.split("/") + frg = split1.path + pt: Final = splitbase1.path if splitbase1.path != "" else "/" + url = urlunsplit((splitbase1.scheme, splitbase1.netloc, pt, splitbase1.query, frg)) + elif scoped_ref is not None and not bool(split1.fragment): + splitbase2: Final = urlsplit(base_url) + sp = splitbase2.fragment.split("/") n = scoped_ref while n > 0 and len(sp) > 0: sp.pop() @@ -392,10 +407,10 @@ def expand_url( sp.append(url) url = urlunsplit( ( - splitbase.scheme, - splitbase.netloc, - splitbase.path, - splitbase.query, + splitbase2.scheme, + splitbase2.netloc, + splitbase2.path, + splitbase2.query, "/".join(sp), ) ) @@ -403,8 +418,8 @@ def expand_url( url = loadingOptions.fetcher.urljoin(base_url, url) if vocab_term: - split = urlsplit(url) - if bool(split.scheme): + split2: Final = urlsplit(url) + if bool(split2.scheme): if url in loadingOptions.rvocab: return loadingOptions.rvocab[url] else: @@ -441,7 +456,7 @@ def load( class _PrimitiveLoader(_Loader): def __init__(self, tp: Union[type, tuple[type[str], type[str]]]) -> None: - self.tp = tp + self.tp: Final = tp def load( self, @@ -461,7 +476,7 @@ def __repr__(self) -> str: class _ArrayLoader(_Loader): def __init__(self, items: _Loader) -> None: - self.items = items + self.items: Final = items def load( self, @@ -476,9 +491,9 @@ def load( f"Value is a {convert_typing(extract_type(type(doc)))}, " f"but valid type for this field is an array." ) - r: list[Any] = [] - errors: list[SchemaSaladException] = [] - fields: list[str] = [] + r: Final[list[Any]] = [] + errors: Final[list[SchemaSaladException]] = [] + fields: Final[list[str]] = [] for i in range(0, len(doc)): try: lf = load_field( @@ -524,10 +539,10 @@ def __init__( container: Optional[str] = None, no_link_check: Optional[bool] = None, ) -> None: - self.values = values - self.name = name - self.container = container - self.no_link_check = no_link_check + self.values: Final = values + self.name: Final = name + self.container: Final = container + self.no_link_check: Final = no_link_check def load( self, @@ -543,8 +558,8 @@ def load( loadingOptions = LoadingOptions( copyfrom=loadingOptions, container=self.container, no_link_check=self.no_link_check ) - r: dict[str, Any] = {} - errors: list[SchemaSaladException] = [] + r: Final[dict[str, Any]] = {} + errors: Final[list[SchemaSaladException]] = [] for k, v in doc.items(): try: lf = load_field(v, self.values, baseuri, loadingOptions, lc) @@ -561,8 +576,8 @@ def __repr__(self) -> str: class _EnumLoader(_Loader): def __init__(self, symbols: Sequence[str], name: str) -> None: - self.symbols = symbols - self.name = name + self.symbols: Final = symbols + self.name: Final = name def load( self, @@ -582,7 +597,7 @@ def __repr__(self) -> str: class _SecondaryDSLLoader(_Loader): def __init__(self, inner: _Loader) -> None: - self.inner = inner + self.inner: Final = inner def load( self, @@ -592,7 +607,7 @@ def load( docRoot: Optional[str] = None, lc: Optional[list[Any]] = None, ) -> Any: - r: list[dict[str, Any]] = [] + r: Final[list[dict[str, Any]]] = [] if isinstance(doc, MutableSequence): for d in doc: if isinstance(d, str): @@ -601,15 +616,15 @@ def load( else: r.append({"pattern": d}) elif isinstance(d, dict): - new_dict: dict[str, Any] = {} + new_dict1: dict[str, Any] = {} dict_copy = copy.deepcopy(d) if "pattern" in dict_copy: - new_dict["pattern"] = dict_copy.pop("pattern") + new_dict1["pattern"] = dict_copy.pop("pattern") else: raise ValidationException( f"Missing pattern in secondaryFiles specification entry: {d}" ) - new_dict["required"] = ( + new_dict1["required"] = ( dict_copy.pop("required") if "required" in dict_copy else None ) @@ -619,28 +634,28 @@ def load( dict_copy ) ) - r.append(new_dict) + r.append(new_dict1) else: raise ValidationException( "Expected a string or sequence of (strings or mappings)." ) elif isinstance(doc, MutableMapping): - new_dict = {} - doc_copy = copy.deepcopy(doc) + new_dict2: Final = {} + doc_copy: Final = copy.deepcopy(doc) if "pattern" in doc_copy: - new_dict["pattern"] = doc_copy.pop("pattern") + new_dict2["pattern"] = doc_copy.pop("pattern") else: raise ValidationException( f"Missing pattern in secondaryFiles specification entry: {doc}" ) - new_dict["required"] = doc_copy.pop("required") if "required" in doc_copy else None + new_dict2["required"] = doc_copy.pop("required") if "required" in doc_copy else None if len(doc_copy): raise ValidationException( f"Unallowed values in secondaryFiles specification entry: {doc_copy}" ) - r.append(new_dict) + r.append(new_dict2) elif isinstance(doc, str): if doc.endswith("?"): @@ -659,9 +674,9 @@ def __init__( container: Optional[str] = None, no_link_check: Optional[bool] = None, ) -> None: - self.classtype = classtype - self.container = container - self.no_link_check = no_link_check + self.classtype: Final = classtype + self.container: Final = container + self.no_link_check: Final = no_link_check def load( self, @@ -688,7 +703,7 @@ def __repr__(self) -> str: class _ExpressionLoader(_Loader): def __init__(self, items: type[str]) -> None: - self.items = items + self.items: Final = items def load( self, @@ -709,7 +724,7 @@ def load( class _UnionLoader(_Loader): def __init__(self, alternates: Sequence[_Loader], name: Optional[str] = None) -> None: self.alternates = alternates - self.name = name + self.name: Final = name def add_loaders(self, loaders: Sequence[_Loader]) -> None: self.alternates = tuple(loader for loader in chain(self.alternates, loaders)) @@ -722,7 +737,7 @@ def load( docRoot: Optional[str] = None, lc: Optional[list[Any]] = None, ) -> Any: - errors = [] + errors: Final = [] if lc is None: lc = [] @@ -805,11 +820,11 @@ def __init__( scoped_ref: Optional[int], no_link_check: Optional[bool], ) -> None: - self.inner = inner - self.scoped_id = scoped_id - self.vocab_term = vocab_term - self.scoped_ref = scoped_ref - self.no_link_check = no_link_check + self.inner: Final = inner + self.scoped_id: Final = scoped_id + self.vocab_term: Final = vocab_term + self.scoped_ref: Final = scoped_ref + self.no_link_check: Final = no_link_check def load( self, @@ -824,7 +839,7 @@ def load( copyfrom=loadingOptions, no_link_check=self.no_link_check ) if isinstance(doc, MutableSequence): - newdoc = [] + newdoc: Final = [] for i in doc: if isinstance(i, str): newdoc.append( @@ -851,7 +866,7 @@ def load( ) if isinstance(doc, str): if not loadingOptions.no_link_check: - errors = [] + errors: Final = [] try: if not loadingOptions.fetcher.check_exists(doc): errors.append( @@ -866,9 +881,9 @@ def load( class _TypeDSLLoader(_Loader): def __init__(self, inner: _Loader, refScope: Optional[int], salad_version: str) -> None: - self.inner = inner - self.refScope = refScope - self.salad_version = salad_version + self.inner: Final = inner + self.refScope: Final = refScope + self.salad_version: Final = salad_version def resolve( self, @@ -883,9 +898,9 @@ def resolve( doc_ = doc_[0:-1] if doc_.endswith("[]"): - salad_versions = [int(v) for v in self.salad_version[1:].split(".")] + salad_versions: Final = [int(v) for v in self.salad_version[1:].split(".")] items: Union[list[Union[dict[str, Any], str]], dict[str, Any], str] = "" - rest = doc_[0:-2] + rest: Final = doc_[0:-2] if salad_versions < [1, 3]: if rest.endswith("[]"): # To show the error message with the original type @@ -914,7 +929,7 @@ def load( lc: Optional[list[Any]] = None, ) -> Any: if isinstance(doc, MutableSequence): - r: list[Any] = [] + r: Final[list[Any]] = [] for d in doc: if isinstance(d, str): resolved = self.resolve(d, baseuri, loadingOptions) @@ -936,9 +951,9 @@ def load( class _IdMapLoader(_Loader): def __init__(self, inner: _Loader, mapSubject: str, mapPredicate: Optional[str]) -> None: - self.inner = inner - self.mapSubject = mapSubject - self.mapPredicate = mapPredicate + self.inner: Final = inner + self.mapSubject: Final = mapSubject + self.mapPredicate: Final = mapPredicate def load( self, @@ -949,7 +964,7 @@ def load( lc: Optional[list[Any]] = None, ) -> Any: if isinstance(doc, MutableMapping): - r: list[Any] = [] + r: Final[list[Any]] = [] for k in doc.keys(): val = doc[k] if isinstance(val, CommentedMap): @@ -989,13 +1004,13 @@ def _document_load( ) if isinstance(doc, MutableMapping): - addl_metadata = {} + addl_metadata: Final = {} if addl_metadata_fields is not None: for mf in addl_metadata_fields: if mf in doc: addl_metadata[mf] = doc[mf] - docuri = baseuri + docuri: Final = baseuri if "$base" in doc: baseuri = doc["$base"] @@ -1007,22 +1022,22 @@ def _document_load( addl_metadata=addl_metadata, ) - doc = copy.copy(doc) - if "$namespaces" in doc: - doc.pop("$namespaces") - if "$schemas" in doc: - doc.pop("$schemas") - if "$base" in doc: - doc.pop("$base") + doc2: Final = copy.copy(doc) + if "$namespaces" in doc2: + doc2.pop("$namespaces") + if "$schemas" in doc2: + doc2.pop("$schemas") + if "$base" in doc2: + doc2.pop("$base") - if "$graph" in doc: + if "$graph" in doc2: loadingOptions.idx[baseuri] = ( - loader.load(doc["$graph"], baseuri, loadingOptions), + loader.load(doc2["$graph"], baseuri, loadingOptions), loadingOptions, ) else: loadingOptions.idx[baseuri] = ( - loader.load(doc, baseuri, loadingOptions, docRoot=baseuri), + loader.load(doc2, baseuri, loadingOptions, docRoot=baseuri), loadingOptions, ) @@ -1054,11 +1069,11 @@ def _document_load_by_url( doc_url, frg = urldefrag(url) - text = loadingOptions.fetcher.fetch_text(doc_url) - textIO = StringIO(text) + text: Final = loadingOptions.fetcher.fetch_text(doc_url) + textIO: Final = StringIO(text) textIO.name = str(doc_url) - yaml = yaml_no_ts() - result = yaml.load(textIO) + yaml: Final = yaml_no_ts() + result: Final = yaml.load(textIO) add_lc_filename(result, doc_url) loadingOptions = LoadingOptions(copyfrom=loadingOptions, fileuri=doc_url) @@ -1079,7 +1094,7 @@ def file_uri(path: str, split_frag: bool = False) -> str: if path.startswith("file://"): return path if split_frag: - pathsp = path.split("#", 2) + pathsp: Final = path.split("#", 2) frag = "#" + quote(str(pathsp[1])) if len(pathsp) == 2 else "" urlpath = pathname2url(str(pathsp[0])) else: @@ -1111,8 +1126,8 @@ def save_relative_uri( elif isinstance(uri, str): if not relative_uris or uri == base_url: return uri - urisplit = urlsplit(uri) - basesplit = urlsplit(base_url) + urisplit: Final = urlsplit(uri) + basesplit: Final = urlsplit(base_url) if urisplit.scheme == basesplit.scheme and urisplit.netloc == basesplit.netloc: if urisplit.path != basesplit.path: p = os.path.relpath(urisplit.path, os.path.dirname(basesplit.path)) @@ -1143,7 +1158,7 @@ def shortname(inputid: str) -> str: See https://w3id.org/cwl/v1.2/SchemaSalad.html#Short_names. """ - parsed_id = urlparse(inputid) + parsed_id: Final = urlparse(inputid) if parsed_id.fragment: return parsed_id.fragment.split("/")[-1] return parsed_id.path.split("/")[-1] @@ -4378,6 +4393,10 @@ class Identified(Saveable): pass +class IdentifierRequired(Identified): + pass + + class LoadContents(Saveable): pass @@ -4394,7 +4413,7 @@ class OutputFormat(Saveable): pass -class Parameter(FieldBase, Documented, Identified): +class Parameter(FieldBase, Documented, IdentifierRequired): """ Define an input or output parameter to a process. @@ -9863,150 +9882,7 @@ def save( class CommandLineBindable(Saveable): - def __init__( - self, - inputBinding: Optional[Any] = None, - extension_fields: Optional[dict[str, Any]] = None, - loadingOptions: Optional[LoadingOptions] = None, - ) -> None: - if extension_fields: - self.extension_fields = extension_fields - else: - self.extension_fields = CommentedMap() - if loadingOptions: - self.loadingOptions = loadingOptions - else: - self.loadingOptions = LoadingOptions() - self.inputBinding = inputBinding - - def __eq__(self, other: Any) -> bool: - if isinstance(other, CommandLineBindable): - return bool(self.inputBinding == other.inputBinding) - return False - - def __hash__(self) -> int: - return hash((self.inputBinding)) - - @classmethod - def fromDoc( - cls, - doc: Any, - baseuri: str, - loadingOptions: LoadingOptions, - docRoot: Optional[str] = None - ) -> "CommandLineBindable": - _doc = copy.copy(doc) - - if hasattr(doc, "lc"): - _doc.lc.data = doc.lc.data - _doc.lc.filename = doc.lc.filename - _errors__ = [] - inputBinding = None - if "inputBinding" in _doc: - try: - inputBinding = load_field( - _doc.get("inputBinding"), - union_of_None_type_or_CommandLineBindingLoader, - baseuri, - loadingOptions, - lc=_doc.get("inputBinding") - ) - - except ValidationException as e: - error_message, to_print, verb_tensage = parse_errors(str(e)) - - if str(e) == "missing required field `inputBinding`": - _errors__.append( - ValidationException( - str(e), - None - ) - ) - else: - val = _doc.get("inputBinding") - if error_message != str(e): - val_type = convert_typing(extract_type(type(val))) - _errors__.append( - ValidationException( - "the `inputBinding` field is not valid because:", - SourceLine(_doc, "inputBinding", str), - [ValidationException(f"Value is a {val_type}, " - f"but valid {to_print} for this field " - f"{verb_tensage} {error_message}", - detailed_message=f"Value `{val}` is a {val_type}, " - f"but valid {to_print} for this field " - f"{verb_tensage} {error_message}")], - ) - ) - else: - _errors__.append( - ValidationException( - "the `inputBinding` field is not valid because:", - SourceLine(_doc, "inputBinding", str), - [e], - detailed_message=f"the `inputBinding` field with value `{val}` " - "is not valid because:", - ) - ) - extension_fields: dict[str, Any] = {} - for k in _doc.keys(): - if k not in cls.attrs: - if not k: - _errors__.append( - ValidationException("mapping with implicit null key") - ) - elif ":" in k: - ex = expand_url( - k, "", loadingOptions, scoped_id=False, vocab_term=False - ) - extension_fields[ex] = _doc[k] - else: - _errors__.append( - ValidationException( - "invalid field `{}`, expected one of: `inputBinding`".format( - k - ), - SourceLine(_doc, k, str), - ) - ) - - if _errors__: - raise ValidationException("", None, _errors__, "*") - _constructed = cls( - inputBinding=inputBinding, - extension_fields=extension_fields, - loadingOptions=loadingOptions, - ) - return _constructed - - def save( - self, top: bool = False, base_url: str = "", relative_uris: bool = True - ) -> dict[str, Any]: - r: dict[str, Any] = {} - - if relative_uris: - for ef in self.extension_fields: - r[prefix_url(ef, self.loadingOptions.vocab)] = self.extension_fields[ef] - else: - for ef in self.extension_fields: - r[ef] = self.extension_fields[ef] - if self.inputBinding is not None: - r["inputBinding"] = save( - self.inputBinding, - top=False, - base_url=base_url, - relative_uris=relative_uris, - ) - - # top refers to the directory level - if top: - if self.loadingOptions.namespaces: - r["$namespaces"] = self.loadingOptions.namespaces - if self.loadingOptions.schemas: - r["$schemas"] = self.loadingOptions.schemas - return r - - attrs = frozenset(["inputBinding"]) + pass class CommandInputRecordField(InputRecordField, CommandLineBindable): @@ -13750,12 +13626,12 @@ class CommandInputParameter(InputParameter): def __init__( self, + id: Any, type_: Any, label: Optional[Any] = None, secondaryFiles: Optional[Any] = None, streamable: Optional[Any] = None, doc: Optional[Any] = None, - id: Optional[Any] = None, format: Optional[Any] = None, loadContents: Optional[Any] = None, loadListing: Optional[Any] = None, @@ -13837,7 +13713,7 @@ def fromDoc( try: id = load_field( _doc.get("id"), - uri_union_of_None_type_or_strtype_True_False_None_None, + uri_strtype_True_False_None_None, baseuri, loadingOptions, lc=_doc.get("id") @@ -13885,7 +13761,7 @@ def fromDoc( if docRoot is not None: id = docRoot else: - id = "_:" + str(_uuid__.uuid4()) + _errors__.append(ValidationException("missing id")) if not __original_id_is_none: baseuri = cast(str, id) label = None @@ -14504,12 +14380,12 @@ class CommandOutputParameter(OutputParameter): def __init__( self, + id: Any, type_: Any, label: Optional[Any] = None, secondaryFiles: Optional[Any] = None, streamable: Optional[Any] = None, doc: Optional[Any] = None, - id: Optional[Any] = None, format: Optional[Any] = None, outputBinding: Optional[Any] = None, extension_fields: Optional[dict[str, Any]] = None, @@ -14579,7 +14455,7 @@ def fromDoc( try: id = load_field( _doc.get("id"), - uri_union_of_None_type_or_strtype_True_False_None_None, + uri_strtype_True_False_None_None, baseuri, loadingOptions, lc=_doc.get("id") @@ -14627,7 +14503,7 @@ def fromDoc( if docRoot is not None: id = docRoot else: - id = "_:" + str(_uuid__.uuid4()) + _errors__.append(ValidationException("missing id")) if not __original_id_is_none: baseuri = cast(str, id) label = None @@ -19322,12 +19198,12 @@ class ExpressionToolOutputParameter(OutputParameter): def __init__( self, + id: Any, type_: Any, label: Optional[Any] = None, secondaryFiles: Optional[Any] = None, streamable: Optional[Any] = None, doc: Optional[Any] = None, - id: Optional[Any] = None, format: Optional[Any] = None, extension_fields: Optional[dict[str, Any]] = None, loadingOptions: Optional[LoadingOptions] = None, @@ -19393,7 +19269,7 @@ def fromDoc( try: id = load_field( _doc.get("id"), - uri_union_of_None_type_or_strtype_True_False_None_None, + uri_strtype_True_False_None_None, baseuri, loadingOptions, lc=_doc.get("id") @@ -19441,7 +19317,7 @@ def fromDoc( if docRoot is not None: id = docRoot else: - id = "_:" + str(_uuid__.uuid4()) + _errors__.append(ValidationException("missing id")) if not __original_id_is_none: baseuri = cast(str, id) label = None @@ -19827,12 +19703,12 @@ class WorkflowInputParameter(InputParameter): def __init__( self, + id: Any, type_: Any, label: Optional[Any] = None, secondaryFiles: Optional[Any] = None, streamable: Optional[Any] = None, doc: Optional[Any] = None, - id: Optional[Any] = None, format: Optional[Any] = None, loadContents: Optional[Any] = None, loadListing: Optional[Any] = None, @@ -19914,7 +19790,7 @@ def fromDoc( try: id = load_field( _doc.get("id"), - uri_union_of_None_type_or_strtype_True_False_None_None, + uri_strtype_True_False_None_None, baseuri, loadingOptions, lc=_doc.get("id") @@ -19962,7 +19838,7 @@ def fromDoc( if docRoot is not None: id = docRoot else: - id = "_:" + str(_uuid__.uuid4()) + _errors__.append(ValidationException("missing id")) if not __original_id_is_none: baseuri = cast(str, id) label = None @@ -21312,12 +21188,12 @@ class WorkflowOutputParameter(OutputParameter): def __init__( self, + id: Any, type_: Any, label: Optional[Any] = None, secondaryFiles: Optional[Any] = None, streamable: Optional[Any] = None, doc: Optional[Any] = None, - id: Optional[Any] = None, format: Optional[Any] = None, outputSource: Optional[Any] = None, linkMerge: Optional[Any] = None, @@ -21395,7 +21271,7 @@ def fromDoc( try: id = load_field( _doc.get("id"), - uri_union_of_None_type_or_strtype_True_False_None_None, + uri_strtype_True_False_None_None, baseuri, loadingOptions, lc=_doc.get("id") @@ -21443,7 +21319,7 @@ def fromDoc( if docRoot is not None: id = docRoot else: - id = "_:" + str(_uuid__.uuid4()) + _errors__.append(ValidationException("missing id")) if not __original_id_is_none: baseuri = cast(str, id) label = None @@ -21994,7 +21870,7 @@ class Sink(Saveable): pass -class WorkflowStepInput(Identified, Sink, LoadContents, Labeled): +class WorkflowStepInput(IdentifierRequired, Sink, LoadContents, Labeled): """ The input of a workflow step connects an upstream parameter (from the workflow inputs, or the outputs of other workflows steps) with the input @@ -22111,7 +21987,7 @@ class WorkflowStepInput(Identified, Sink, LoadContents, Labeled): def __init__( self, - id: Optional[Any] = None, + id: Any, source: Optional[Any] = None, linkMerge: Optional[Any] = None, pickValue: Optional[Any] = None, @@ -22190,7 +22066,7 @@ def fromDoc( try: id = load_field( _doc.get("id"), - uri_union_of_None_type_or_strtype_True_False_None_None, + uri_strtype_True_False_None_None, baseuri, loadingOptions, lc=_doc.get("id") @@ -22238,7 +22114,7 @@ def fromDoc( if docRoot is not None: id = docRoot else: - id = "_:" + str(_uuid__.uuid4()) + _errors__.append(ValidationException("missing id")) if not __original_id_is_none: baseuri = cast(str, id) source = None @@ -22732,7 +22608,7 @@ def save( ) -class WorkflowStepOutput(Identified): +class WorkflowStepOutput(IdentifierRequired): """ Associate an output parameter of the underlying process with a workflow parameter. The workflow parameter (given in the `id` field) be may be used @@ -22749,7 +22625,7 @@ class WorkflowStepOutput(Identified): def __init__( self, - id: Optional[Any] = None, + id: Any, extension_fields: Optional[dict[str, Any]] = None, loadingOptions: Optional[LoadingOptions] = None, ) -> None: @@ -22790,7 +22666,7 @@ def fromDoc( try: id = load_field( _doc.get("id"), - uri_union_of_None_type_or_strtype_True_False_None_None, + uri_strtype_True_False_None_None, baseuri, loadingOptions, lc=_doc.get("id") @@ -22838,7 +22714,7 @@ def fromDoc( if docRoot is not None: id = docRoot else: - id = "_:" + str(_uuid__.uuid4()) + _errors__.append(ValidationException("missing id")) if not __original_id_is_none: baseuri = cast(str, id) extension_fields: dict[str, Any] = {} @@ -22897,7 +22773,7 @@ def save( attrs = frozenset(["id"]) -class WorkflowStep(Identified, Labeled, Documented): +class WorkflowStep(IdentifierRequired, Labeled, Documented): """ A workflow step is an executable element of a workflow. It specifies the underlying process implementation (such as `CommandLineTool` or another @@ -22985,10 +22861,10 @@ class WorkflowStep(Identified, Labeled, Documented): def __init__( self, + id: Any, in_: Any, out: Any, run: Any, - id: Optional[Any] = None, label: Optional[Any] = None, doc: Optional[Any] = None, requirements: Optional[Any] = None, @@ -23072,7 +22948,7 @@ def fromDoc( try: id = load_field( _doc.get("id"), - uri_union_of_None_type_or_strtype_True_False_None_None, + uri_strtype_True_False_None_None, baseuri, loadingOptions, lc=_doc.get("id") @@ -23120,7 +22996,7 @@ def fromDoc( if docRoot is not None: id = docRoot else: - id = "_:" + str(_uuid__.uuid4()) + _errors__.append(ValidationException("missing id")) if not __original_id_is_none: baseuri = cast(str, id) label = None @@ -24972,12 +24848,12 @@ class OperationInputParameter(InputParameter): def __init__( self, + id: Any, type_: Any, label: Optional[Any] = None, secondaryFiles: Optional[Any] = None, streamable: Optional[Any] = None, doc: Optional[Any] = None, - id: Optional[Any] = None, format: Optional[Any] = None, loadContents: Optional[Any] = None, loadListing: Optional[Any] = None, @@ -25055,7 +24931,7 @@ def fromDoc( try: id = load_field( _doc.get("id"), - uri_union_of_None_type_or_strtype_True_False_None_None, + uri_strtype_True_False_None_None, baseuri, loadingOptions, lc=_doc.get("id") @@ -25103,7 +24979,7 @@ def fromDoc( if docRoot is not None: id = docRoot else: - id = "_:" + str(_uuid__.uuid4()) + _errors__.append(ValidationException("missing id")) if not __original_id_is_none: baseuri = cast(str, id) label = None @@ -25667,12 +25543,12 @@ class OperationOutputParameter(OutputParameter): def __init__( self, + id: Any, type_: Any, label: Optional[Any] = None, secondaryFiles: Optional[Any] = None, streamable: Optional[Any] = None, doc: Optional[Any] = None, - id: Optional[Any] = None, format: Optional[Any] = None, extension_fields: Optional[dict[str, Any]] = None, loadingOptions: Optional[LoadingOptions] = None, @@ -25738,7 +25614,7 @@ def fromDoc( try: id = load_field( _doc.get("id"), - uri_union_of_None_type_or_strtype_True_False_None_None, + uri_strtype_True_False_None_None, baseuri, loadingOptions, lc=_doc.get("id") @@ -25786,7 +25662,7 @@ def fromDoc( if docRoot is not None: id = docRoot else: - id = "_:" + str(_uuid__.uuid4()) + _errors__.append(ValidationException("missing id")) if not __original_id_is_none: baseuri = cast(str, id) label = None @@ -29219,6 +29095,7 @@ def save( "File": "https://w3id.org/cwl/cwl#File", "IOSchema": "https://w3id.org/cwl/cwl#IOSchema", "Identified": "https://w3id.org/cwl/cwl#Identified", + "IdentifierRequired": "https://w3id.org/cwl/cwl#IdentifierRequired", "InitialWorkDirRequirement": "https://w3id.org/cwl/cwl#InitialWorkDirRequirement", "InlineJavascriptRequirement": "https://w3id.org/cwl/cwl#InlineJavascriptRequirement", "InplaceUpdateRequirement": "https://w3id.org/cwl/cwl#InplaceUpdateRequirement", @@ -29371,6 +29248,7 @@ def save( "https://w3id.org/cwl/cwl#File": "File", "https://w3id.org/cwl/cwl#IOSchema": "IOSchema", "https://w3id.org/cwl/cwl#Identified": "Identified", + "https://w3id.org/cwl/cwl#IdentifierRequired": "IdentifierRequired", "https://w3id.org/cwl/cwl#InitialWorkDirRequirement": "InitialWorkDirRequirement", "https://w3id.org/cwl/cwl#InlineJavascriptRequirement": "InlineJavascriptRequirement", "https://w3id.org/cwl/cwl#InplaceUpdateRequirement": "InplaceUpdateRequirement", @@ -29693,7 +29571,6 @@ def save( EnvironmentDefLoader = _RecordLoader(EnvironmentDef, None, None) CommandLineBindingLoader = _RecordLoader(CommandLineBinding, None, None) CommandOutputBindingLoader = _RecordLoader(CommandOutputBinding, None, None) -CommandLineBindableLoader = _RecordLoader(CommandLineBindable, None, None) CommandInputRecordFieldLoader = _RecordLoader(CommandInputRecordField, None, None) CommandInputRecordSchemaLoader = _RecordLoader(CommandInputRecordSchema, None, None) CommandInputEnumSchemaLoader = _RecordLoader(CommandInputEnumSchema, None, None) diff --git a/cwl_utils/parser/cwl_v1_2_utils.py b/cwl_utils/parser/cwl_v1_2_utils.py index 33733b08..e40f5877 100644 --- a/cwl_utils/parser/cwl_v1_2_utils.py +++ b/cwl_utils/parser/cwl_v1_2_utils.py @@ -5,7 +5,7 @@ from collections.abc import MutableMapping, MutableSequence from io import StringIO from pathlib import Path -from typing import IO, Any, Union, cast +from typing import Any, IO, cast from urllib.parse import urldefrag from schema_salad.exceptions import ValidationException @@ -48,12 +48,8 @@ def _compare_records( _logger.info( "Record comparison failure for %s and %s\n" "Did not match fields for %s: %s and %s", - cast( - Union[cwl.InputRecordSchema, cwl.CommandOutputRecordSchema], src - ).name, - cast( - Union[cwl.InputRecordSchema, cwl.CommandOutputRecordSchema], sink - ).name, + cast(cwl.InputRecordSchema | cwl.CommandOutputRecordSchema, src).name, + cast(cwl.InputRecordSchema | cwl.CommandOutputRecordSchema, sink).name, key, srcfields.get(key), sinkfields.get(key), @@ -79,8 +75,8 @@ def _compare_type(type1: Any, type2: Any) -> bool: case MutableSequence(), MutableSequence(): if len(type1) != len(type2): return False - for t1 in type1: - if not any(_compare_type(t1, t2) for t2 in type2): + for t3 in type1: + if not any(_compare_type(t3, t2) for t2 in type2): return False return True return bool(type1 == type2) @@ -225,7 +221,7 @@ def check_all_types( continue if sourceField is not None: if isinstance(sourceField, MutableSequence): - linkMerge = sink.linkMerge or ( + linkMerge: str | None = sink.linkMerge or ( "merge_nested" if len(sourceField) > 1 else None ) if sink.pickValue in ("first_non_null", "the_only_non_null"): @@ -525,7 +521,7 @@ def type_for_source( """Determine the type for the given sourcenames.""" scatter_context: list[tuple[int, str] | None] = [] params = param_for_source_id(process, sourcenames, parent, scatter_context) - if not isinstance(params, list): + if not isinstance(params, MutableSequence): new_type = params.type_ if scatter_context[0] is not None: if scatter_context[0][1] == "nested_crossproduct": @@ -580,11 +576,24 @@ def param_for_source_id( sourcenames: str | list[str], parent: cwl.Workflow | None = None, scatter_context: list[tuple[int, str] | None] | None = None, -) -> list[cwl.WorkflowInputParameter] | cwl.WorkflowInputParameter: +) -> ( + cwl.CommandInputParameter + | cwl.CommandOutputParameter + | cwl.WorkflowInputParameter + | MutableSequence[ + cwl.CommandInputParameter + | cwl.CommandOutputParameter + | cwl.WorkflowInputParameter + ] +): """Find the process input parameter that matches one of the given sourcenames.""" if isinstance(sourcenames, str): sourcenames = [sourcenames] - params: list[cwl.WorkflowInputParameter] = [] + params: MutableSequence[ + cwl.CommandInputParameter + | cwl.CommandOutputParameter + | cwl.WorkflowInputParameter + ] = [] for sourcename in sourcenames: if not isinstance(process, cwl.Workflow): for param in process.inputs: diff --git a/cwl_utils/parser/utils.py b/cwl_utils/parser/utils.py index 1823fa9b..6849afd9 100644 --- a/cwl_utils/parser/utils.py +++ b/cwl_utils/parser/utils.py @@ -5,7 +5,7 @@ from collections.abc import MutableSequence from pathlib import Path from types import ModuleType -from typing import Any, Optional, Union, cast +from typing import Any, Optional, cast from urllib.parse import unquote_plus, urlparse from schema_salad.exceptions import ValidationException @@ -14,7 +14,6 @@ import cwl_utils import cwl_utils.parser - from . import ( LoadingOptions, Process, @@ -339,43 +338,37 @@ def type_for_source( case "v1.0": return cwl_v1_0_utils.type_for_source( cast( - Union[ - cwl_v1_0.CommandLineTool, - cwl_v1_0.Workflow, - cwl_v1_0.ExpressionTool, - ], + cwl_v1_0.CommandLineTool + | cwl_v1_0.Workflow + | cwl_v1_0.ExpressionTool, process, ), sourcenames, - cast(Optional[cwl_v1_0.Workflow], parent), + cast(cwl_v1_0.Workflow | None, parent), linkMerge, ) case "v1.1": return cwl_v1_1_utils.type_for_source( cast( - Union[ - cwl_v1_1.CommandLineTool, - cwl_v1_1.Workflow, - cwl_v1_1.ExpressionTool, - ], + cwl_v1_1.CommandLineTool + | cwl_v1_1.Workflow + | cwl_v1_1.ExpressionTool, process, ), sourcenames, - cast(Optional[cwl_v1_1.Workflow], parent), + cast(cwl_v1_1.Workflow | None, parent), linkMerge, ) case "v1.2": return cwl_v1_2_utils.type_for_source( cast( - Union[ - cwl_v1_2.CommandLineTool, - cwl_v1_2.Workflow, - cwl_v1_2.ExpressionTool, - ], + cwl_v1_2.CommandLineTool + | cwl_v1_2.Workflow + | cwl_v1_2.ExpressionTool, process, ), sourcenames, - cast(Optional[cwl_v1_2.Workflow], parent), + cast(cwl_v1_2.Workflow | None, parent), linkMerge, pickValue, ) @@ -434,15 +427,31 @@ def param_for_source_id( scatter_context: list[tuple[int, str] | None] | None = None, ) -> ( ( - list[cwl_utils.parser.cwl_v1_0.InputParameter] + MutableSequence[ + cwl_utils.parser.cwl_v1_0.InputParameter + | cwl_utils.parser.cwl_v1_0.CommandOutputParameter + ] | cwl_utils.parser.cwl_v1_0.InputParameter + | cwl_utils.parser.cwl_v1_0.CommandOutputParameter ) | ( - list[cwl_utils.parser.cwl_v1_1.WorkflowInputParameter] + MutableSequence[ + cwl_utils.parser.cwl_v1_1.CommandInputParameter + | cwl_utils.parser.cwl_v1_1.CommandOutputParameter + | cwl_utils.parser.cwl_v1_1.WorkflowInputParameter + ] + | cwl_utils.parser.cwl_v1_1.CommandInputParameter + | cwl_utils.parser.cwl_v1_1.CommandOutputParameter | cwl_utils.parser.cwl_v1_1.WorkflowInputParameter ) | ( - list[cwl_utils.parser.cwl_v1_2.WorkflowInputParameter] + MutableSequence[ + cwl_utils.parser.cwl_v1_2.CommandInputParameter + | cwl_utils.parser.cwl_v1_2.CommandOutputParameter + | cwl_utils.parser.cwl_v1_2.WorkflowInputParameter + ] + | cwl_utils.parser.cwl_v1_2.CommandInputParameter + | cwl_utils.parser.cwl_v1_2.CommandOutputParameter | cwl_utils.parser.cwl_v1_2.WorkflowInputParameter ) ): @@ -450,11 +459,9 @@ def param_for_source_id( case "v1.0": return cwl_utils.parser.cwl_v1_0_utils.param_for_source_id( cast( - Union[ - cwl_utils.parser.cwl_v1_0.CommandLineTool, - cwl_utils.parser.cwl_v1_0.Workflow, - cwl_utils.parser.cwl_v1_0.ExpressionTool, - ], + cwl_utils.parser.cwl_v1_0.CommandLineTool + | cwl_utils.parser.cwl_v1_0.Workflow + | cwl_utils.parser.cwl_v1_0.ExpressionTool, process, ), sourcenames, @@ -464,11 +471,9 @@ def param_for_source_id( case "v1.1": return cwl_utils.parser.cwl_v1_1_utils.param_for_source_id( cast( - Union[ - cwl_utils.parser.cwl_v1_1.CommandLineTool, - cwl_utils.parser.cwl_v1_1.Workflow, - cwl_utils.parser.cwl_v1_1.ExpressionTool, - ], + cwl_utils.parser.cwl_v1_1.CommandLineTool + | cwl_utils.parser.cwl_v1_1.Workflow + | cwl_utils.parser.cwl_v1_1.ExpressionTool, process, ), sourcenames, @@ -478,11 +483,9 @@ def param_for_source_id( case "v1.2": return cwl_utils.parser.cwl_v1_2_utils.param_for_source_id( cast( - Union[ - cwl_utils.parser.cwl_v1_2.CommandLineTool, - cwl_utils.parser.cwl_v1_2.Workflow, - cwl_utils.parser.cwl_v1_2.ExpressionTool, - ], + cwl_utils.parser.cwl_v1_2.CommandLineTool + | cwl_utils.parser.cwl_v1_2.Workflow + | cwl_utils.parser.cwl_v1_2.ExpressionTool, process, ), sourcenames, diff --git a/pyproject.toml b/pyproject.toml index 2ec54b8e..133b5929 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,7 +71,7 @@ cwl-normalizer = "cwl_utils.normalizer:main" cwl-inputs-schema-gen = "cwl_utils.inputs_schema_gen:main" [tool.pytest.ini_options] -testpaths = ["tests"] +testpaths = ["cwl_utils/tests"] addopts = "-rsx -n auto" [tool.hatch.version] @@ -113,14 +113,9 @@ dependencies = [ "types-setuptools>=57.4.0", ] require-runtime-dependencies = true -exclude = [ -"/create_cwl_from_objects.py", -"/load_cwl_by_path.py", -"/cwl_utils/tests", -"/cwl_utils/testdata", -] include = [ -"/cwl_utils/parser", + "/cwl_utils/parser", + "/cwl_utils/expression.py" ] [tool.hatch.envs.test]