Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/2096.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed syncing of cosign signatures, attestations, and SBOMs (stored as companion tags) being silently skipped when `include_tags` was set on the remote.
102 changes: 80 additions & 22 deletions pulp_container/app/tasks/sync_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

log = logging.getLogger(__name__)

COSIGN_TAG_SUFFIXES = (".sig", ".att", ".sbom")


class ContainerFirstStage(Stage):
"""
Expand All @@ -60,6 +62,9 @@ def __init__(self, remote, signed_only):
self.manifest_list_dcs = []
self.manifest_dcs = []
self.signature_dcs = []
self._synced_digests = set()
self._full_tag_list = []
self._cosign_tags = []

async def _download_manifest_data(self, manifest_url):
downloader = self.remote.get_downloader(url=manifest_url)
Expand Down Expand Up @@ -92,24 +97,54 @@ async def run(self):
"""
ContainerFirstStage.
"""

to_download = []
BATCH_SIZE = 500

# it can be whether a separate sigstore location or registry with extended signatures API
signature_source = await self.get_signature_source()

async with ProgressReport(
message="Downloading tag list", code="sync.downloading.tag_list", total=1
) as pb:
repo_name = self.remote.namespaced_upstream_name
tag_list_url = "/v2/{name}/tags/list".format(name=repo_name)
tag_list = await self.get_paginated_tag_list(tag_list_url, repo_name)
tag_list = filter_resources(
tag_list, self.remote.include_tags, self.remote.exclude_tags
self._full_tag_list = await self.get_paginated_tag_list(tag_list_url, repo_name)
self._cosign_tags = filter_resources(
self._full_tag_list, ["sha256-*"], self.remote.exclude_tags
)
if self.remote.include_tags or self.remote.exclude_tags:
exclude_tags_and_cosign = (self.remote.exclude_tags or []) + ["sha256-*"]
tag_list = filter_resources(
self._full_tag_list, self.remote.include_tags, exclude_tags_and_cosign
)
else:
tag_list = self._full_tag_list
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you comment this inline a bit better?

e.g. if the goal was to fix the cosign tags being silently skipped, then why are the cosign tags being added to the exclusion list here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the point just to make a separate call to self._process_tags() w/ a separate log message?

await pb.aincrement()

await self._process_tags(tag_list, signature_source)

if self.remote.include_tags or self.remote.exclude_tags:
companion_tags = self._find_cosign_companion_tags()
if companion_tags:
log.info(
"Syncing %d cosign companion tag(s) for filtered images",
len(companion_tags),
)
await self._process_tags(
companion_tags, signature_source, msg="Processing Cosign Companion Tags"
)

def _find_cosign_companion_tags(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise some comments about the structure of the tags and the transformations going on here would be nice.

"""Find cosign companion tags for synced digests."""
companion_tags = []
for tag in self._cosign_tags:
tag_without_suffix = tag.rsplit(".", 1)[0]
digest = tag_without_suffix.replace("-", ":", 1)
if digest in self._synced_digests:
companion_tags.append(tag)
return companion_tags

async def _process_tags(self, tag_list, signature_source, msg="Processing Tags"):
"""Download and process a batch of tags, creating declarative content objects."""
BATCH_SIZE = 500
to_download = []

for tag_name in tag_list:
relative_url = "/v2/{name}/manifests/{tag}".format(
name=self.remote.namespaced_upstream_name, tag=tag_name
Expand All @@ -121,7 +156,7 @@ async def run(self):
)

async with ProgressReport(
message="Processing Tags",
message=msg,
code="sync.processing.tag",
total=len(tag_list),
) as pb_parsed_tags:
Expand All @@ -135,25 +170,21 @@ async def run(self):

digest = calculate_digest(raw_text_data)
tag_name = response.url.split("/")[-1]
media_type = determine_media_type(content_data, response)

# Look for cosign signatures
# cosign signature has a tag convention 'sha256-1234.sig'
if self.signed_only and not signature_source:
if (
not (tag_name.endswith(".sig") and tag_name.startswith("sha256-"))
and f"sha256-{digest.removeprefix('sha256:')}.sig" not in tag_list
if not (
self._is_cosign_companion_tag(tag_name, media_type, content_data)
or await self._has_cosign_signature(digest)
):
# skip this tag, there is no corresponding signature
log.info(
"The unsigned image {digest} can't be synced "
"due to a requirement to sync signed content "
"only.".format(digest=digest)
)
# Count the skipped tagks as parsed too.
await pb_parsed_tags.aincrement()
continue

media_type = determine_media_type(content_data, response)
validate_manifest(content_data, media_type, digest)

tag_dc = DeclarativeContent(Tag(name=tag_name))
Expand Down Expand Up @@ -183,23 +214,21 @@ async def run(self):
tag=tag_name,
)
)
# do not pass down the pipeline a manifest list with unsigned
# manifests.
break
self.signature_dcs.extend(man_sig_dcs)
list_dc.extra_data["listed_manifests"].append(listed_manifest)

else:
# Manifest indices can be signed too. It is not mandatory.
# If signature is available mirror it.
self._synced_digests.add(digest)
if signature_source is not None:
list_sig_dcs = await self.create_signatures(list_dc, signature_source)
if list_sig_dcs:
self.signature_dcs.extend(list_sig_dcs)
# only pass the manifest list and tag down the pipeline if there were no
# issues with signatures (no `break` in the `for` loop)
tag_dc.extra_data["tagged_manifest_dc"] = list_dc
for listed_manifest in list_dc.extra_data["listed_manifests"]:
self._synced_digests.add(listed_manifest["manifest_dc"].content.digest)
await self.handle_blobs(
listed_manifest["manifest_dc"], listed_manifest["content_data"]
)
Expand All @@ -215,9 +244,9 @@ async def run(self):
if signature_source is not None:
man_sig_dcs = await self.create_signatures(man_dc, signature_source)
if self.signed_only and not man_sig_dcs:
# do not pass down the pipeline unsigned manifests
continue
self.signature_dcs.extend(man_sig_dcs)
self._synced_digests.add(digest)
tag_dc.extra_data["tagged_manifest_dc"] = man_dc
await self.handle_blobs(man_dc, content_data)
self.tag_dcs.append(tag_dc)
Expand All @@ -239,6 +268,35 @@ async def run(self):

await self.resolve_flush()

async def _has_cosign_signature(self, digest):
"""Check if a digest has a cosign signature."""
cosign_digest = digest.replace("sha256:", "sha256-")
if f"{cosign_digest}.sig" in self._cosign_tags:
return True
if cosign_digest in self._cosign_tags:
# Potential V3 cosign tag needs to be checked if it is a cosign companion tag
relative_url = f"/v2/{self.remote.namespaced_upstream_name}/manifests/{cosign_digest}"
tag_url = urljoin(self.remote.url, relative_url)
content_data, raw_text_data, response = await self._download_manifest_data(tag_url)
media_type = determine_media_type(content_data, response)
if self._is_cosign_companion_tag(cosign_digest, media_type, content_data):
return True
return False

def _is_cosign_companion_tag(self, tag_name, media_type, content_data):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be unit tested? self isn't actually required.

Copy link
Copy Markdown
Contributor

@dralley dralley May 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit testing _has_cosign_signature and _find_cosign_companion_tags is likely possible also, just more difficult.

But in any case you can skip the unit tests if you think they aren't needed.

"""Check if a fetched tag is a cosign companion tag."""
if tag_name.startswith("sha256-"):
if len(tag_name) == 71:
# V3 cosign companion tags are index lists with each entry having an artifactType
if media_type == MEDIA_TYPE.INDEX_OCI:
if manifests := content_data.get("manifests", []):
if all(entry.has("artifactType") for entry in manifests):
return True
elif any(tag_name.endswith(s) for s in COSIGN_TAG_SUFFIXES):
# V2 cosign companion tags are in the format sha256-<digest>.<suffix>
return True
return False

async def get_signature_source(self):
"""
Find out where signatures come from: sigstore, extension API or not available at all.
Expand Down
53 changes: 51 additions & 2 deletions pulp_container/tests/functional/api/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from pulp_container.constants import MANIFEST_TYPE, MEDIA_TYPE
from pulp_container.tests.functional.constants import (
PULP_COSIGN_COMPANION_TAGS,
PULP_FIXTURE_1,
PULP_HELLO_WORLD_LINUX_AMD64_DIGEST,
PULP_LABELED_FIXTURE,
Expand All @@ -23,12 +24,12 @@ def synced_container_repository_factory(
container_repository_factory, container_remote_factory, container_repository_api, container_sync
):
def _synced_container_repository_factory(
url=REGISTRY_V2_FEED_URL, include_tags=None, exclude_tags=None
url=REGISTRY_V2_FEED_URL, include_tags=None, exclude_tags=None, upstream_name=PULP_FIXTURE_1
):
"""Sync a new repository with the included tags passed as an argument."""
remote = container_remote_factory(
url,
upstream_name=PULP_FIXTURE_1,
upstream_name=upstream_name,
include_tags=include_tags,
exclude_tags=exclude_tags,
)
Expand Down Expand Up @@ -187,3 +188,51 @@ def test_sync_with_complex_filtering(
tags = container_tag_api.list(repository_version=synced_repo.latest_version_href).results

assert sorted(include_tags) == sorted(tag.name for tag in tags)


@pytest.mark.parallel
def test_sync_cosign_companion_tags(
synced_container_repository_factory, container_tag_api, container_manifest_api
):
"""Test syncing a repository with cosign companion tags."""
synced_repo = synced_container_repository_factory(upstream_name=PULP_COSIGN_COMPANION_TAGS)

tags = container_tag_api.list(repository_version=synced_repo.latest_version_href)
manifests = container_manifest_api.list(repository_version=synced_repo.latest_version_href)
assert tags.count == 9
assert manifests.count == 13


@pytest.mark.parallel
def test_sync_cosign_companion_tags_with_filtering(
synced_container_repository_factory, container_tag_api, container_manifest_api
):
"""Test syncing a repository with cosign companion tags and filtering."""
synced_repo = synced_container_repository_factory(
upstream_name=PULP_COSIGN_COMPANION_TAGS, include_tags=["manifest_a"]
)

tags = container_tag_api.list(repository_version=synced_repo.latest_version_href)
manifests = container_manifest_api.list(repository_version=synced_repo.latest_version_href)
assert tags.count == 3 # manifest_a, sha256-<a-digest>.sig, sha256-<a-digest>.att
assert manifests.count == 3

synced_repo = synced_container_repository_factory(
upstream_name=PULP_COSIGN_COMPANION_TAGS, include_tags=["manifest_b"]
)

tags = container_tag_api.list(repository_version=synced_repo.latest_version_href)
manifests = container_manifest_api.list(repository_version=synced_repo.latest_version_href)
assert tags.count == 3 # manifest_b, sha256-<b-digest>.sig, sha256-<b-digest>
assert manifests.count == 5 # The V3 sig is a manifest list with 2 manifests

synced_repo = synced_container_repository_factory(
upstream_name=PULP_COSIGN_COMPANION_TAGS, exclude_tags=["manifest_a"]
)

tags = container_tag_api.list(repository_version=synced_repo.latest_version_href)
manifests = container_manifest_api.list(repository_version=synced_repo.latest_version_href)
assert (
tags.count == 6
) # manifest_b, manifest_c, manifest_d, sha256-<b-digest>.sig, sha256-<b-digest>, sha256-<c-digest>
assert manifests.count == 10
2 changes: 2 additions & 0 deletions pulp_container/tests/functional/api/test_sync_signatures.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def synced_repository(
upstream_name=DEPRECATED_REPOSITORY_NAME,
policy="on_demand",
include_tags=[MANIFEST_LIST_TAG, IMAGE_MANIFEST_TAG],
exclude_tags=["sha256-*"], # exclude cosign companion tags
)

if request.param["sigstore"]:
Expand Down Expand Up @@ -137,6 +138,7 @@ def test_sync_image_with_pqc_signatures(
upstream_name=UBI10_MICRO_REPOSITORY_NAME,
policy="on_demand",
include_tags=[UBI10_MICRO_TAG],
exclude_tags=["sha256-*"],
sigstore=SIGSTORE_URL,
)
remote = container_remote_factory(**data)
Expand Down
12 changes: 12 additions & 0 deletions pulp_container/tests/functional/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,15 @@

REGISTRY_V2_REPO_PULP = f"{REGISTRY_V2}/{PULP_FIXTURE_1}"
REGISTRY_V2_REPO_HELLO_WORLD = f"{REGISTRY_V2}/{PULP_HELLO_WORLD_REPO}"

# a repository containing cosign companion tags
PULP_COSIGN_COMPANION_TAGS = "pulp/cosign-tags"
# It contains 4 normal tags:
# manifest_a, manifest_b, manifest_c, manifest_d
# and 5 cosign companion tags:
# 2 for manifest_a: sha256-<digest>.sig (v2: 1 signature), sha256-<digest>.att (v2: 1 attestation)
# 2 for manifest_b: sha256-<digest>.sig (v2: 2 signatures), sha256-<digest> (v3: 2 signatures)
# 1 for manifest_c: sha256-<digest> (v3: 1 signature, 1 attestation)
# V2 signatures are stored in one manifest with each signature in a separate layer
# V3 signatures are collected in one manifest list with each signature getting its own manifest
# Repo total contains 2 manifest lists and 11 manifests
Loading