Conversation
- Introduced a new backend option for Kubernetes in the Backends enum. - Updated segmentation algorithms to accept optional keyword arguments for Kubernetes backend, enhancing flexibility for users.
…thm execution - Added a new section outlining how to configure and use the Kubernetes backend for running algorithms. - Provided examples for setting the KUBECONFIG environment variable and specifying backend options in the inference method.
- Integrated Kubernetes backend option into the BraTSAlgorithm class. - Updated methods to accept optional keyword arguments for Kubernetes, allowing for enhanced configuration during inference. - Added error handling to ensure Kubernetes kwargs are only used with the Kubernetes backend.
- Updated pyproject.toml to include the Kubernetes package with a minimum version of 34.1.0, enabling support for Kubernetes features in the project.
…Kubernetes configuration - Changed the parameter name from `mount_path` to `data_mount_path` in the documentation to accurately describe its function in the `infer_single` method for Kubernetes backend usage.
- Implemented functions for creating and managing Kubernetes jobs, including PVC creation, job execution, and output handling. - Added methods for downloading additional files from Zenodo and checking file presence in pods. - Enhanced logging for better traceability during job execution and output verification. - Integrated command execution within pods to facilitate file uploads and downloads, ensuring smooth operation of the Kubernetes backend for algorithm inference.
…_from_pod function in kubernetes.py
…rnetes.py`. Tests cover command argument building, file handling, job creation, and PVC management for different algorithm configurations.
…`local_base_dir` parameter
…hanced Kubernetes backend configuration
|
/format |
|
🤖 I will now format your code with black. Check the status here. |
There was a problem hiding this comment.
Pull Request Overview
This PR adds Kubernetes backend support to the BraTS orchestrator, enabling remote algorithm execution via Kubernetes Jobs as an alternative to local Docker/Singularity containers. Key changes include:
- New Kubernetes backend implementation with job orchestration, file transfer, and PVC management
- Integration of Kubernetes backend into the existing algorithm inference pipeline
- Support for configurable Kubernetes resources via
kubernetes_kwargsparameter
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
| brats/constants.py | Added KUBERNETES enum value to Backends |
| brats/core/kubernetes.py | New module implementing Kubernetes job execution with PVC management, file transfers, and pod lifecycle management |
| brats/core/brats_algorithm.py | Updated _infer_single and _infer_batch to support kubernetes_kwargs parameter and dispatch to Kubernetes backend |
| brats/core/segmentation_algorithms.py | Added kubernetes_kwargs parameter to infer_single and infer_batch methods for both Adult and Pediatric classes |
| tests/core/test_kubernetes.py | Comprehensive test suite for Kubernetes backend functionality |
| README.md | Added documentation for Kubernetes backend usage with configuration examples |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…new packages: `cachetools`, `durationpy`, `google-auth`, `kubernetes`, `oauthlib`, `pyasn1`, `pyasn1-modules`, `requests-oauthlib`, `rsa`, and `websocket-client`. Adjust version constraints and add optional dependencies for improved functionality.
…Lesion/BraTS into 113-kubernetes-integration
… path handling based on algorithm year, and streamline command execution logging.
There was a problem hiding this comment.
Pull Request Overview
Copilot reviewed 7 out of 8 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
… and modify type hints in `kubernetes.py` for clarity. Enhance logging in `_download_folder_from_pod` and add TODO comments for future security context implementation. Remove commented-out code in `run_job` and adjust test cases in `test_kubernetes.py` to reflect changes.
There was a problem hiding this comment.
Pull Request Overview
Copilot reviewed 7 out of 8 changed files in this pull request and generated 11 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…ity function Updated the return type of _create_finalizer_job and _create_namespaced_job functions to return job names for better usability. Introduced a new utility function _check_pod_terminal_or_running to streamline pod status checks, enhancing code readability and maintainability.
…tional checks Updated the _build_command_args function to ensure additional file paths are only appended if they exist. Enhanced the documentation for _observe_job_output and _create_namespaced_job functions to clarify return values and error handling, improving overall code clarity and usability.
|
/format |
|
🤖 I will now format your code with black. Check the status here. |
There was a problem hiding this comment.
Pull Request Overview
Copilot reviewed 7 out of 8 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| "numpy>=1.21.0; python_version<='3.9'", | ||
| "numpy>=1.26.0; python_version>='3.10'", | ||
| "kubernetes>=34.1.0,<35.0.0", | ||
| ] |
There was a problem hiding this comment.
[Blocker] should be an optional dependency
kubernetes>=34.1.0,<35.0.0 is added as a hard (unconditional) dependency. This forces ~10 new transitive packages on every user of this library, including those who never use Kubernetes.
It should be moved to [project.optional-dependencies] (or [tool.poetry.extras] in Poetry), e.g.:
[project.optional-dependencies]
kubernetes = ["kubernetes>=34.1.0,<35.0.0"]Users would then install it via pip install brats[kubernetes]. The top-level import in brats_algorithm.py also needs to be made lazy (see related comment there) to avoid an ImportError for users without the extra.
|
|
||
| from brats.core.docker import run_container as run_docker_container | ||
| from brats.core.singularity import run_container as run_singularity_container | ||
| from brats.core.kubernetes import run_job as run_kubernetes_job |
There was a problem hiding this comment.
[Blocker] Top-level import causes ImportError for users without the kubernetes package
This import runs unconditionally at module load time. Anyone who does from brats.core.segmentation_algorithms import ... without having kubernetes installed will get an ImportError, even if they never use the Kubernetes backend.
The import should be deferred to where it is actually needed:
# In _get_backend_runner or inside run_job dispatch:
if backend == Backends.KUBERNETES:
from brats.core.kubernetes import run_job as run_kubernetes_job
return run_kubernetes_jobThis is consistent with how optional heavy dependencies are typically guarded and pairs with making kubernetes an optional install extra in pyproject.toml.
| import random | ||
| import string | ||
| import base64 | ||
| import docker |
There was a problem hiding this comment.
Unconditional import docker in a module that may not require Docker
docker is only used here for the docker.types.DeviceRequest type annotation in _create_namespaced_job. Importing the entire docker SDK at module level adds an unnecessary hard dependency inside kubernetes.py.
Consider using a TYPE_CHECKING guard or switching to a string annotation:
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import dockerOr replace the type hint with the actual list structure (List[Any]) since DeviceRequest objects are passed in from outside and are already instantiated by the caller (_handle_device_requests in docker.py).
| Backends.DOCKER: run_docker_container, | ||
| Backends.SINGULARITY: run_singularity_container, | ||
| Backends.KUBERNETES: run_kubernetes_job, | ||
| } |
There was a problem hiding this comment.
kubernetes_kwargs is an untyped Dict — typos will surface as confusing TypeError deep in run_job
Any misspelled key (e.g. "namespacee") will be silently passed through to run_job and produce a hard-to-diagnose error. Consider:
- Defining a
TypedDict(ordataclass) for the known kwargs and annotating accordingly. - If a
TypedDictfeels heavy, at least validate the keys against the known set before merging:
VALID_KUBERNETES_KWARGS = frozenset({
"namespace", "pvc_name", "pvc_storage_size",
"pvc_storage_class", "job_name", "data_mount_path",
})
unknown = set(kubernetes_kwargs) - VALID_KUBERNETES_KWARGS
if unknown:
raise ValueError(f"Unknown kubernetes_kwargs keys: {unknown}")| @@ -199,13 +203,22 @@ def _infer_single( | |||
| runner = self._get_backend_runner(backend) | |||
| if runner is None: | |||
There was a problem hiding this comment.
Validation order: the backend != KUBERNETES guard fires too late
_get_backend_runner(backend) is called on line 203 (before this check). If an unsupported backend was passed, the runner is None guard on line 204 already handles it. But more importantly: the intent here is to reject kubernetes_kwargs when the backend isn't Kubernetes — this should be checked before any backend-specific setup happens, not after the runner is already retrieved.
Suggested reorder:
if kubernetes_kwargs is not None and backend != Backends.KUBERNETES:
raise ValueError("kubernetes_kwargs can only be used with the Kubernetes backend.")
runner = self._get_backend_runner(backend)
if runner is None:
raise ValueError(f"Unsupported backend: {backend}")The same issue is present in _infer_batch.
| @@ -246,14 +261,23 @@ def _infer_batch( | |||
| if runner is None: | |||
| raise ValueError(f"Unsupported backend: {backend}") | |||
There was a problem hiding this comment.
Duplicated kubernetes_kwargs merging block — extract to a helper
The block that validates and merges kubernetes_kwargs into runner_kwargs is copy-pasted identically in both _infer_single and _infer_batch. This violates DRY and means any future change (e.g. adding a new allowed key) must be made in two places.
Extract to a small helper:
def _build_runner_kwargs(self, base_kwargs: dict, backend: Backends, kubernetes_kwargs: Optional[Dict]) -> dict:
if kubernetes_kwargs is not None:
if backend != Backends.KUBERNETES:
raise ValueError("kubernetes_kwargs can only be used with the Kubernetes backend.")
return {**base_kwargs, **kubernetes_kwargs}
return base_kwargs| name="job-container", | ||
| image=image, | ||
| volume_mounts=volume_mounts, | ||
| # security_context=client.V1SecurityContext(run_as_user=user_id, run_as_group=group_id) |
There was a problem hiding this comment.
[Design] No cleanup of Kubernetes resources — PVCs and Jobs accumulate indefinitely
run_job creates PVCs and Jobs but never deletes them, even on success. Over time (or after repeated runs) this will leave orphaned resources in the cluster. The function should clean up after itself unless the caller explicitly opts out.
Wrap the core logic in a try/finally:
try:
# ... main body ...
finally:
if not keep_resources: # add a keep_resources=False param
batch_v1_api.delete_namespaced_job(job_name, namespace, ...)
batch_v1_api.delete_namespaced_job(job_name + "-finalizer", namespace, ...)
core_v1_api.delete_namespaced_persistent_volume_claim(pvc_name, namespace)
if algorithm.meta.year > 2024:
core_v1_api.delete_namespaced_persistent_volume_claim(pvc_name + "-output", namespace)The finalizer job also currently requires a manual touch /etc/content_verified to terminate — that sentinel mechanism means the pod runs forever if run_job crashes before reaching that point.
| pvc_name (str): Name of the PVC to create | ||
| namespace (str): The Kubernetes namespace to create the PVC in | ||
| storage_size (str): The size of the storage to request | ||
| storage_class (str): The storage class to use for the PVC. If None, the default storage class will be used. |
There was a problem hiding this comment.
alpine:latest is not reproducible — pin to a specific version
alpine:latest resolves to a different image digest across runs (and will change when Alpine releases a new version). This breaks reproducibility and can introduce subtle runtime differences.
Pin to a specific version, e.g.:
image="alpine:3.20"Or make it a module-level constant so it can be updated in one place:
_ALPINE_IMAGE = "alpine:3.20"The same hardcoded tag also appears in the init-container inside _create_namespaced_job.
| commands = [ | ||
| "ls", | ||
| "-la", | ||
| str(Path(mount_path).joinpath("input", file.relative_to(path))), |
There was a problem hiding this comment.
Locale-dependent string matching for file existence check
Detecting file absence by checking for "No such file or directory" in ls output is fragile — this string is locale-dependent and will silently fail on any non-English k8s node image.
Use test -f and check the exit code instead:
commands = ["sh", "-c", f"test -f {remote_path} && echo EXISTS || echo MISSING"]
output = _execute_command_in_pod(...)
if "MISSING" in output:
# uploadOr use _execute_command_in_pod with _preload_content=False and inspect the return code via the websocket channel.
| namespace (str): The namespace of the pod to observe the output of | ||
| Returns: | ||
| str: The output of the job | ||
| """ |
There was a problem hiding this comment.
Hardcoded 10-minute timeout — should be configurable
range(300) with time.sleep(2) gives a fixed 10-minute ceiling. Some BraTS algorithms run much longer; others are fast and don't need the wait. These values should be exposed as parameters with sensible defaults:
def _observe_job_output(
pod_name: str,
namespace: str,
timeout_seconds: int = 600,
poll_interval: float = 2.0,
) -> str:
for _ in range(int(timeout_seconds / poll_interval)):
...
time.sleep(poll_interval)The same hardcoded range(300) pattern appears twice more in run_job (waiting for pod ready and waiting for job completion). All three should be unified under a single configurable timeout parameter passed through from run_job.
|
|
||
|
|
||
| def _create_namespaced_job( | ||
| job_name: str, |
There was a problem hiding this comment.
Missing activeDeadlineSeconds — sentinel-loop pods can run forever on crash
Both the init-container and the finalizer-container use while [ ! -f /etc/content_verified ]; do sleep 1; done as their entry command. If run_job crashes before executing touch /etc/content_verified, the pod will spin indefinitely and consume cluster resources.
Add activeDeadlineSeconds to the V1JobSpec to bound the maximum pod lifetime:
spec=client.V1JobSpec(
active_deadline_seconds=3600, # or make configurable
template=client.V1PodTemplateSpec(...)
)This applies to both _create_namespaced_job and _create_finalizer_job.
| force_cpu=force_cpu, | ||
| ) | ||
| logger.debug(f"GPU Device requests: {device_requests}") | ||
| user = _get_container_user(algorithm=algorithm) |
There was a problem hiding this comment.
Redundant Zenodo metadata fetch — pre-fetched result should be passed in
run_job already calls _get_zenodo_metadata_and_archive_url (around line 648) to compute additional_files_path. Then _download_additional_files calls the same function again internally (line 157), making two network round-trips to Zenodo for the same data.
Pass the already-fetched metadata into _download_additional_files (or restructure it to accept an optional pre-fetched result) to avoid the redundant call:
def _download_additional_files(
algorithm: AlgorithmData,
pod_name: str,
namespace: str,
mount_path: str = "/data",
zenodo_response: Optional[tuple] = None, # pass pre-fetched result
) -> Path:| ) | ||
| try: | ||
| core_v1_api.delete_namespaced_pod( | ||
| name=pod_name_to_delete, namespace=namespace |
There was a problem hiding this comment.
run_job is 130+ lines and does too many things — consider decomposing
run_job currently handles: name generation, PVC creation, device resolution, command building, job creation, pod readiness polling, file upload, additional file download, parameter upload, sentinel touch, log observation, completion polling, finalizer job creation, output download, and output sanity check — all inline.
Consider extracting into focused helpers, e.g.:
_prepare_job_resources(PVC + name generation)_upload_input_data(file upload + additional files + parameters + sentinel)_wait_for_job_completion(pod polling + log collection)_retrieve_output(finalizer job + download)
This would make each step independently testable and the overall flow much easier to follow.
| ] | ||
| if shm_size is not None: | ||
| shm_size_formatted = shm_size.replace("gb", "Gi") | ||
| volumes.append( |
There was a problem hiding this comment.
Use uuid.uuid4() instead of random.choices for name generation
random.choices(string.ascii_lowercase + string.digits, k=12) produces names with only ~62^12 ≈ 3×10^21 combinations, but it's seeded by the OS time by default and provides no guarantee of uniqueness. uuid.uuid4() is the idiomatic choice, has better collision properties, and removes the random and string imports:
import uuid
pvc_name = f"brats-{uuid.uuid4().hex[:12]}-pvc"
job_name = f"brats-{uuid.uuid4().hex[:12]}-job"|
|
||
| while resp.is_open(): | ||
| resp.update(timeout=1) | ||
| if resp.peek_stdout(): |
There was a problem hiding this comment.
Incorrect type annotation: relative_to: Path = None
Path = None is not a valid type annotation for an optional parameter — the type and default are inconsistent. Mypy/pyright will flag this.
Change to:
relative_to: Optional[Path] = None,
parent_dir: Optional[Path] = None,
MarcelRosier
left a comment
There was a problem hiding this comment.
Sorry for the super late review (And then it's even AI driven 🤖 ). However i also manually checked and think all comments are valid. especially the hard dependency. Do you have time to address the issue? @SimoneBendazzoli93
fixes #113