From 662c1ee43c38784f25b07ccb501f9f1edde29ee7 Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Thu, 21 Aug 2025 15:41:22 +0300 Subject: [PATCH 1/2] Add Elixir Security Live V2 Importer Pipeline #1933 * Add Elixir Security Live V2 Importer * Add tests for the Elixir Security Live V2 Importer * Tested functionally using the Live Evaluation API in #1969 Signed-off-by: Michael Ehab Mikhail --- vulnerabilities/importers/__init__.py | 9 ++ .../elixir_security_live_importer.py | 140 ++++++++++++++++++ .../test_elixir_security_live_importer_v2.py | 98 ++++++++++++ 3 files changed, 247 insertions(+) create mode 100644 vulnerabilities/pipelines/v2_importers/elixir_security_live_importer.py create mode 100644 vulnerabilities/tests/pipelines/v2_importers/test_elixir_security_live_importer_v2.py diff --git a/vulnerabilities/importers/__init__.py b/vulnerabilities/importers/__init__.py index c0cf04ed7..ac674bff4 100644 --- a/vulnerabilities/importers/__init__.py +++ b/vulnerabilities/importers/__init__.py @@ -82,6 +82,9 @@ from vulnerabilities.pipelines.v2_importers import vulnrichment_importer as vulnrichment_importer_v2 from vulnerabilities.pipelines.v2_importers import xen_importer as xen_importer_v2 from vulnerabilities.utils import create_registry +from vulnerabilities.pipelines.v2_importers import ( + elixir_security_live_importer as elixir_security_live_importer_v2, +) IMPORTERS_REGISTRY = create_registry( [ @@ -196,3 +199,9 @@ for key, value in IMPORTERS_REGISTRY.items() if issubclass(value, VulnerableCodeBaseImporterPipelineV2) and value.exclude_from_package_todo ] + +LIVE_IMPORTERS_REGISTRY = create_registry( + [ + elixir_security_live_importer_v2.ElixirSecurityLiveImporterPipeline, + ] +) diff --git a/vulnerabilities/pipelines/v2_importers/elixir_security_live_importer.py b/vulnerabilities/pipelines/v2_importers/elixir_security_live_importer.py new file mode 100644 index 000000000..6f3e181cb --- /dev/null +++ b/vulnerabilities/pipelines/v2_importers/elixir_security_live_importer.py @@ -0,0 +1,140 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +from typing import Iterable + +import requests +import yaml +from packageurl import PackageURL +from univers.versions import SemverVersion + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.pipelines.v2_importers.elixir_security_importer import ( + ElixirSecurityImporterPipeline, +) + + +class ElixirSecurityLiveImporterPipeline(ElixirSecurityImporterPipeline): + """ + Elixir Security Advisories Importer Pipeline + + This pipeline imports security advisories for a single elixir PURL. + """ + + pipeline_id = "elixir_security_live_importer_v2" + supported_types = ["hex"] + + @classmethod + def steps(cls): + return ( + cls.get_purl_inputs, + cls.collect_and_store_advisories, + ) + + def get_purl_inputs(self): + purl = self.inputs["purl"] + if not purl: + raise ValueError("PURL is required for ElixirSecurityLiveImporterPipeline") + + if isinstance(purl, str): + purl = PackageURL.from_string(purl) + + if not isinstance(purl, PackageURL): + raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance") + + if purl.type not in self.supported_types: + raise ValueError( + f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}" + ) + + if not purl.version: + raise ValueError(f"PURL: {purl!s} is expected to have a version") + + self.purl = purl + + def advisories_count(self) -> int: + if self.purl.type != "hex": + return 0 + + try: + directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{self.purl.name}" + response = requests.get(directory_url) + + if response.status_code != 200: + return 0 + + yaml_files = [file for file in response.json() if file["name"].endswith(".yml")] + return len(yaml_files) + except Exception: + return 0 + + def collect_advisories(self) -> Iterable[AdvisoryData]: + if self.purl.type != "hex": + self.log(f"PURL type {self.purl.type} is not supported by Elixir Security importer") + return [] + + package_name = self.purl.name + + try: + directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{package_name}" + response = requests.get(directory_url) + + if response.status_code != 200: + self.log(f"No advisories found for {package_name} in Elixir Security Database") + return [] + + yaml_entries = [file for file in response.json() if file["name"].endswith(".yml")] + + for entry in yaml_entries: + # entry["path"] looks like: packages//.yml + file_path = entry["path"] + content_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/{file_path}" + content_response = requests.get( + content_url, headers={"Accept": "application/vnd.github.v3.raw"} + ) + + if content_response.status_code != 200: + self.log(f"Failed to fetch file content for {file_path}") + continue + + advisory_text = content_response.text + + try: + yaml_file = yaml.safe_load(advisory_text) or {} + except Exception as e: + self.log(f"Failed to parse YAML for {file_path}: {e}") + continue + + for advisory in self.build_advisory_from_yaml( + yaml_file=yaml_file, advisory_text=advisory_text, relative_path=file_path + ): + if self.purl.version and not self._advisory_affects_version(advisory): + continue + yield advisory + + except Exception as e: + self.log(f"Error fetching advisories for {self.purl}: {str(e)}") + return [] + + def _advisory_affects_version(self, advisory: AdvisoryData) -> bool: + if not self.purl.version: + return True + + for affected_package in advisory.affected_packages: + if affected_package.affected_version_range: + try: + purl_version = SemverVersion(self.purl.version) + + if purl_version in affected_package.affected_version_range: + return True + except Exception as e: + self.log(f"Failed to parse version {self.purl.version}: {str(e)}") + return True + + return False diff --git a/vulnerabilities/tests/pipelines/v2_importers/test_elixir_security_live_importer_v2.py b/vulnerabilities/tests/pipelines/v2_importers/test_elixir_security_live_importer_v2.py new file mode 100644 index 000000000..b2d267cd3 --- /dev/null +++ b/vulnerabilities/tests/pipelines/v2_importers/test_elixir_security_live_importer_v2.py @@ -0,0 +1,98 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import shutil +from pathlib import Path +from unittest.mock import MagicMock +from unittest.mock import patch + +import pytest +from packageurl import PackageURL + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.pipelines.v2_importers.elixir_security_live_importer import ( + ElixirSecurityLiveImporterPipeline, +) + + +@pytest.fixture +def test_data_dir(): + return Path(__file__).parent.parent.parent / "test_data" / "elixir_security" + + +@patch("requests.get") +def test_package_first_mode_with_version_filter(mock_get, test_data_dir): + directory_response = MagicMock() + directory_response.status_code = 200 + directory_response.json.return_value = [ + {"name": "test_file.yml", "path": "packages/coherence/test_file.yml"} + ] + + advisory_file_path = test_data_dir / "test_file.yml" + advisory_content = advisory_file_path.read_text() + + content_response = MagicMock() + content_response.status_code = 200 + content_response.text = advisory_content + + mock_get.side_effect = [directory_response, content_response] + + # Version affected + purl = PackageURL(type="hex", name="coherence", version="0.5.1") + importer = ElixirSecurityLiveImporterPipeline(purl=purl) + importer.get_purl_inputs() + advisories = list(importer.collect_advisories()) + assert len(advisories) == 1 + + # Version not affected + mock_get.side_effect = [directory_response, content_response] + purl = PackageURL(type="hex", name="coherence", version="0.5.2") + importer = ElixirSecurityLiveImporterPipeline(purl=purl) + importer.get_purl_inputs() + advisories = list(importer.collect_advisories()) + assert len(advisories) == 0 + + +@patch("requests.get") +def test_package_first_mode_no_advisories(mock_get): + mock_response = MagicMock() + mock_response.status_code = 404 + mock_get.return_value = mock_response + + purl = PackageURL(type="hex", name="nonexistent-package") + importer = ElixirSecurityLiveImporterPipeline(purl=purl) + with pytest.raises(ValueError): + importer.get_purl_inputs() + + +@patch("requests.get") +def test_package_first_mode_api_error(mock_get): + directory_response = MagicMock() + directory_response.status_code = 200 + directory_response.json.return_value = [ + {"name": "test_file.yml", "path": "packages/coherence/test_file.yml"} + ] + + content_response = MagicMock() + content_response.status_code = 500 + + mock_get.side_effect = [directory_response, content_response] + + purl = PackageURL(type="hex", name="coherence", version="0.5.1") + importer = ElixirSecurityLiveImporterPipeline(purl=purl) + importer.get_purl_inputs() + advisories = list(importer.collect_advisories()) + assert len(advisories) == 0 + + +def test_package_first_mode_non_hex_purl(): + purl = PackageURL(type="npm", name="some-package") + importer = ElixirSecurityLiveImporterPipeline(purl=purl) + with pytest.raises(ValueError): + importer.get_purl_inputs() From 94e76a273c1c78a162d513e9161afe732a89b33e Mon Sep 17 00:00:00 2001 From: ziad hany Date: Thu, 7 May 2026 02:21:12 +0300 Subject: [PATCH 2/2] Migrate Elixir Security Importer live Pipeline Update the Elixir Security Importer so we can have a separate function for parsing yaml file Signed-off-by: ziad hany --- vulnerabilities/importers/__init__.py | 6 +- .../v2_importers/elixir_security_importer.py | 40 ++++------ .../elixir_security_live_importer.py | 80 +++++++------------ .../test_elixir_security_live_importer_v2.py | 10 +-- vulnerabilities/utils.py | 4 +- 5 files changed, 56 insertions(+), 84 deletions(-) diff --git a/vulnerabilities/importers/__init__.py b/vulnerabilities/importers/__init__.py index ac674bff4..349255879 100644 --- a/vulnerabilities/importers/__init__.py +++ b/vulnerabilities/importers/__init__.py @@ -52,6 +52,9 @@ from vulnerabilities.pipelines.v2_importers import ( elixir_security_importer as elixir_security_importer_v2, ) +from vulnerabilities.pipelines.v2_importers import ( + elixir_security_live_importer as elixir_security_live_importer_v2, +) from vulnerabilities.pipelines.v2_importers import epss_importer_v2 from vulnerabilities.pipelines.v2_importers import fireeye_importer_v2 from vulnerabilities.pipelines.v2_importers import gentoo_importer as gentoo_importer_v2 @@ -82,9 +85,6 @@ from vulnerabilities.pipelines.v2_importers import vulnrichment_importer as vulnrichment_importer_v2 from vulnerabilities.pipelines.v2_importers import xen_importer as xen_importer_v2 from vulnerabilities.utils import create_registry -from vulnerabilities.pipelines.v2_importers import ( - elixir_security_live_importer as elixir_security_live_importer_v2, -) IMPORTERS_REGISTRY = create_registry( [ diff --git a/vulnerabilities/pipelines/v2_importers/elixir_security_importer.py b/vulnerabilities/pipelines/v2_importers/elixir_security_importer.py index 2269d0fbc..8b80246bf 100644 --- a/vulnerabilities/pipelines/v2_importers/elixir_security_importer.py +++ b/vulnerabilities/pipelines/v2_importers/elixir_security_importer.py @@ -58,32 +58,26 @@ def advisories_count(self) -> int: return count def collect_advisories(self) -> Iterable[AdvisoryDataV2]: - try: - base_path = Path(self.vcs_response.dest_dir) - vuln = base_path / "packages" - for file in vuln.glob("**/*.yml"): - yield from self.process_file(file, base_path) - finally: - if self.vcs_response: - self.vcs_response.delete() + base_path = Path(self.vcs_response.dest_dir) + vuln = base_path / "packages" + for file in vuln.glob("**/*.yml"): + relative_path = str(file.relative_to(base_path)).strip("/") + path_segments = str(file).split("/") + # use the last two segments as the advisory ID + advisory_id = "/".join(path_segments[-2:]).replace(".yml", "") + advisory_url = f"https://github.com/dependabot/elixir-security-advisories/blob/master/{relative_path}" + + yaml_file = load_yaml(str(file)) + yield from self.build_advisory_from_text( + advisory_id=advisory_id, advisory_url=advisory_url, yaml_file=yaml_file + ) def on_failure(self): self.clean_downloads() - def process_file(self, file, base_path) -> Iterable[AdvisoryDataV2]: - relative_path = str(file.relative_to(base_path)).strip("/") - path_segments = str(file).split("/") - # use the last two segments as the advisory ID - advisory_id = "/".join(path_segments[-2:]).replace(".yml", "") - advisory_url = ( - f"https://github.com/dependabot/elixir-security-advisories/blob/master/{relative_path}" - ) - advisory_text = None - with open(str(file)) as f: - advisory_text = f.read() - - yaml_file = load_yaml(str(file)) - + def build_advisory_from_text( + self, advisory_id, advisory_url, yaml_file + ) -> Iterable[AdvisoryDataV2]: summary = yaml_file.get("description") or "" pkg_name = yaml_file.get("package") or "" @@ -138,5 +132,5 @@ def process_file(self, file, base_path) -> Iterable[AdvisoryDataV2]: affected_packages=affected_packages, url=advisory_url, date_published=date_published, - original_advisory_text=advisory_text or str(yaml_file), + original_advisory_text=str(yaml_file), ) diff --git a/vulnerabilities/pipelines/v2_importers/elixir_security_live_importer.py b/vulnerabilities/pipelines/v2_importers/elixir_security_live_importer.py index 6f3e181cb..931ada2b9 100644 --- a/vulnerabilities/pipelines/v2_importers/elixir_security_live_importer.py +++ b/vulnerabilities/pipelines/v2_importers/elixir_security_live_importer.py @@ -10,14 +10,14 @@ from typing import Iterable import requests -import yaml from packageurl import PackageURL from univers.versions import SemverVersion -from vulnerabilities.importer import AdvisoryData +from vulnerabilities.importer import AdvisoryDataV2 from vulnerabilities.pipelines.v2_importers.elixir_security_importer import ( ElixirSecurityImporterPipeline, ) +from vulnerabilities.utils import fetch_yaml class ElixirSecurityLiveImporterPipeline(ElixirSecurityImporterPipeline): @@ -53,34 +53,13 @@ def get_purl_inputs(self): f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}" ) - if not purl.version: - raise ValueError(f"PURL: {purl!s} is expected to have a version") - self.purl = purl def advisories_count(self) -> int: - if self.purl.type != "hex": - return 0 - - try: - directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{self.purl.name}" - response = requests.get(directory_url) - - if response.status_code != 200: - return 0 - - yaml_files = [file for file in response.json() if file["name"].endswith(".yml")] - return len(yaml_files) - except Exception: - return 0 - - def collect_advisories(self) -> Iterable[AdvisoryData]: - if self.purl.type != "hex": - self.log(f"PURL type {self.purl.type} is not supported by Elixir Security importer") - return [] + return 0 + def collect_advisories(self) -> Iterable[AdvisoryDataV2]: package_name = self.purl.name - try: directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{package_name}" response = requests.get(directory_url) @@ -94,27 +73,21 @@ def collect_advisories(self) -> Iterable[AdvisoryData]: for entry in yaml_entries: # entry["path"] looks like: packages//.yml file_path = entry["path"] - content_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/{file_path}" - content_response = requests.get( - content_url, headers={"Accept": "application/vnd.github.v3.raw"} + advisory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/{file_path}" + advisory_text = fetch_yaml( + advisory_url, headers={"Accept": "application/vnd.github.v3.raw"} ) - if content_response.status_code != 200: - self.log(f"Failed to fetch file content for {file_path}") - continue + path_segments = str(file_path).split("/") + # use the last two segments as the advisory ID + advisory_id = "/".join(path_segments[-2:]).replace(".yml", "") - advisory_text = content_response.text - - try: - yaml_file = yaml.safe_load(advisory_text) or {} - except Exception as e: - self.log(f"Failed to parse YAML for {file_path}: {e}") - continue - - for advisory in self.build_advisory_from_yaml( - yaml_file=yaml_file, advisory_text=advisory_text, relative_path=file_path + for advisory in self.build_advisory_from_text( + advisory_id=advisory_id, + yaml_file=advisory_text, + advisory_url=advisory_url, ): - if self.purl.version and not self._advisory_affects_version(advisory): + if self.purl.version and not self.validate_advisory(advisory): continue yield advisory @@ -122,19 +95,24 @@ def collect_advisories(self) -> Iterable[AdvisoryData]: self.log(f"Error fetching advisories for {self.purl}: {str(e)}") return [] - def _advisory_affects_version(self, advisory: AdvisoryData) -> bool: + def validate_advisory(self, advisory: AdvisoryDataV2) -> bool: if not self.purl.version: return True for affected_package in advisory.affected_packages: - if affected_package.affected_version_range: - try: - purl_version = SemverVersion(self.purl.version) - - if purl_version in affected_package.affected_version_range: - return True - except Exception as e: - self.log(f"Failed to parse version {self.purl.version}: {str(e)}") + try: + purl_version = SemverVersion(self.purl.version) + if ( + affected_package.affected_version_range + and purl_version in affected_package.affected_version_range + ) or ( + affected_package.fixed_version_range + and purl_version in affected_package.fixed_version_range + ): return True + except Exception as e: + self.log(f"Failed to parse version {self.purl.version}: {str(e)}") + # Since we have a small package file, if we fail to parse the versions, we can just return all of them + return True return False diff --git a/vulnerabilities/tests/pipelines/v2_importers/test_elixir_security_live_importer_v2.py b/vulnerabilities/tests/pipelines/v2_importers/test_elixir_security_live_importer_v2.py index b2d267cd3..07827568b 100644 --- a/vulnerabilities/tests/pipelines/v2_importers/test_elixir_security_live_importer_v2.py +++ b/vulnerabilities/tests/pipelines/v2_importers/test_elixir_security_live_importer_v2.py @@ -7,7 +7,6 @@ # See https://aboutcode.org for more information about nexB OSS projects. # -import shutil from pathlib import Path from unittest.mock import MagicMock from unittest.mock import patch @@ -15,7 +14,6 @@ import pytest from packageurl import PackageURL -from vulnerabilities.importer import AdvisoryData from vulnerabilities.pipelines.v2_importers.elixir_security_live_importer import ( ElixirSecurityLiveImporterPipeline, ) @@ -39,7 +37,7 @@ def test_package_first_mode_with_version_filter(mock_get, test_data_dir): content_response = MagicMock() content_response.status_code = 200 - content_response.text = advisory_content + content_response.content = advisory_content mock_get.side_effect = [directory_response, content_response] @@ -67,8 +65,9 @@ def test_package_first_mode_no_advisories(mock_get): purl = PackageURL(type="hex", name="nonexistent-package") importer = ElixirSecurityLiveImporterPipeline(purl=purl) - with pytest.raises(ValueError): - importer.get_purl_inputs() + importer.get_purl_inputs() + advisories = list(importer.collect_advisories()) + assert len(advisories) == 0 @patch("requests.get") @@ -81,6 +80,7 @@ def test_package_first_mode_api_error(mock_get): content_response = MagicMock() content_response.status_code = 500 + content_response.content = b"" mock_get.side_effect = [directory_response, content_response] diff --git a/vulnerabilities/utils.py b/vulnerabilities/utils.py index 2e618a920..6c8a0b341 100644 --- a/vulnerabilities/utils.py +++ b/vulnerabilities/utils.py @@ -77,8 +77,8 @@ def load_toml(path): return toml.load(f) -def fetch_yaml(url): - response = requests.get(url) +def fetch_yaml(url, headers=None): + response = requests.get(url, headers=headers) return saneyaml.load(response.content)