diff --git a/requirements.txt b/requirements.txt index 8358fa21..da155d76 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,6 +9,7 @@ google-api-core==2.25.1 google-api-python-client==2.181.0 lief==0.17.1 lzfse>=0.4.2 +objectstore-client>=0.0.14 pillow>=11.3.0 pillow_heif>=1.1.0 protobuf>=5.29.5,<6 diff --git a/src/launchpad/api/update_api_models.py b/src/launchpad/api/update_api_models.py index a564c90b..b57fe0d5 100644 --- a/src/launchpad/api/update_api_models.py +++ b/src/launchpad/api/update_api_models.py @@ -63,6 +63,7 @@ class UpdateData(BaseModel): apple_app_info: Optional[AppleAppInfo] = None android_app_info: Optional[AndroidAppInfo] = None dequeued_at: Optional[datetime] = Field(None, description="Timestamp when message was dequeued from Kafka") + app_icon_id: Optional[str] = None @field_serializer("dequeued_at") def serialize_datetime(self, dt: datetime | None) -> str | None: diff --git a/src/launchpad/artifact_processor.py b/src/launchpad/artifact_processor.py index 716b3c7a..4c5ad0d1 100644 --- a/src/launchpad/artifact_processor.py +++ b/src/launchpad/artifact_processor.py @@ -1,5 +1,3 @@ -"""Artifact processing logic extracted from LaunchpadService.""" - from __future__ import annotations import contextlib @@ -13,7 +11,15 @@ import sentry_sdk -from sentry_kafka_schemas.schema_types.preprod_artifact_events_v1 import PreprodArtifactEvents +from objectstore_client import ( + Client as ObjectstoreClient, +) +from objectstore_client import ( + Usecase, +) +from sentry_kafka_schemas.schema_types.preprod_artifact_events_v1 import ( + PreprodArtifactEvents, +) from launchpad.api.update_api_models import AndroidAppInfo as AndroidAppInfoModel from launchpad.api.update_api_models import AppleAppInfo as AppleAppInfoModel @@ -39,6 +45,7 @@ from launchpad.size.models.apple import AppleAppInfo from launchpad.size.models.common import BaseAppInfo from launchpad.tracing import request_context +from launchpad.utils.file_utils import IdPrefix, id_from_bytes from launchpad.utils.logging import get_logger from launchpad.utils.statsd import StatsdInterface, get_statsd @@ -46,11 +53,16 @@ class ArtifactProcessor: - """Handles the processing of artifacts including download, analysis, and upload.""" - - def __init__(self, sentry_client: SentryClient, statsd: StatsdInterface) -> None: + def __init__( + self, + sentry_client: SentryClient, + statsd: StatsdInterface, + objectstore_client: ObjectstoreClient | None, + ) -> None: self._sentry_client = sentry_client self._statsd = statsd + self._objectstore_client = objectstore_client + self._objectstore_usecase = Usecase(name="preprod") @staticmethod def process_message( @@ -72,15 +84,18 @@ def process_message( initialize_sentry_sdk() + organization_id = payload["organization_id"] + project_id = payload["project_id"] + artifact_id = payload["artifact_id"] + if statsd is None: statsd = get_statsd() if artifact_processor is None: sentry_client = SentryClient(base_url=service_config.sentry_base_url) - artifact_processor = ArtifactProcessor(sentry_client, statsd) - - organization_id = payload["organization_id"] - project_id = payload["project_id"] - artifact_id = payload["artifact_id"] + objectstore_client = None + if service_config.objectstore_url is not None: + objectstore_client = ObjectstoreClient(service_config.objectstore_url) + artifact_processor = ArtifactProcessor(sentry_client, statsd, objectstore_client) requested_features = [] for feature in payload.get("requested_features", []): @@ -140,7 +155,7 @@ def process_artifact( path = stack.enter_context(self._download_artifact(organization_id, project_id, artifact_id)) artifact = self._parse_artifact(organization_id, project_id, artifact_id, path) analyzer = self._create_analyzer(artifact) - + app_icon_object_id = self._process_app_icon(organization_id, project_id, artifact_id, artifact) info = self._preprocess_artifact( organization_id, project_id, @@ -148,6 +163,7 @@ def process_artifact( artifact, analyzer, dequeued_at, + app_icon_object_id, ) if PreprodFeature.SIZE_ANALYSIS in requested_features: @@ -212,11 +228,12 @@ def _preprocess_artifact( artifact: Artifact, analyzer: AndroidAnalyzer | AppleAppAnalyzer, dequeued_at: datetime, + app_icon_id: str | None, ) -> AppleAppInfo | BaseAppInfo: logger.info(f"Preprocessing for {artifact_id} (project: {project_id}, org: {organization_id})") try: info = analyzer.preprocess(cast(Any, artifact)) - update_data = self._prepare_update_data(info, artifact, dequeued_at) + update_data = self._prepare_update_data(info, artifact, dequeued_at, app_icon_id) self._sentry_client.update_artifact( org=organization_id, project=project_id, @@ -237,6 +254,32 @@ def _preprocess_artifact( else: return info + def _process_app_icon( + self, + organization_id: str, + project_id: str, + artifact_id: str, + artifact: Artifact, + ) -> str | None: + if self._objectstore_client is None: + logger.info( + f"No objectstore client found for {artifact_id} (project: {project_id}, org: {organization_id})" + ) + return None + + logger.info(f"Processing app icon for {artifact_id} (project: {project_id}, org: {organization_id})") + app_icon = artifact.get_app_icon() + if app_icon is None: + logger.info(f"No app icon found for {artifact_id} (project: {project_id}, org: {organization_id})") + return None + + image_id = id_from_bytes(app_icon, IdPrefix.ICON) + icon_key = f"{organization_id}/{project_id}/{image_id}" + logger.info(f"Uploading app icon to object store: {icon_key}") + session = self._objectstore_client.session(self._objectstore_usecase, org=organization_id, project=project_id) + session.put(app_icon, id=icon_key) + return image_id + def _do_distribution( self, organization_id: str, @@ -416,6 +459,7 @@ def _prepare_update_data( app_info: AppleAppInfo | BaseAppInfo, artifact: Artifact, dequeued_at: datetime, + app_icon_id: str | None, ) -> Dict[str, Any]: def _get_artifact_type(artifact: Artifact) -> ArtifactType: if isinstance(artifact, ZippedXCArchive): @@ -459,6 +503,7 @@ def _get_artifact_type(artifact: Artifact) -> ArtifactType: apple_app_info=apple_app_info, android_app_info=android_app_info, dequeued_at=dequeued_at, + app_icon_id=app_icon_id, ) return update_data.model_dump(exclude_none=True) diff --git a/src/launchpad/service.py b/src/launchpad/service.py index 5320dd6d..84a2722b 100644 --- a/src/launchpad/service.py +++ b/src/launchpad/service.py @@ -127,6 +127,7 @@ class ServiceConfig: sentry_base_url: str projects_to_skip: list[str] + objectstore_url: str | None def get_service_config() -> ServiceConfig: @@ -134,6 +135,7 @@ def get_service_config() -> ServiceConfig: sentry_base_url = os.getenv("SENTRY_BASE_URL") projects_to_skip_str = os.getenv("PROJECT_IDS_TO_SKIP") projects_to_skip = projects_to_skip_str.split(",") if projects_to_skip_str else [] + objectstore_url = os.getenv("OBJECTSTORE_URL") if sentry_base_url is None: sentry_base_url = "http://getsentry.default" @@ -141,6 +143,7 @@ def get_service_config() -> ServiceConfig: return ServiceConfig( sentry_base_url=sentry_base_url, projects_to_skip=projects_to_skip, + objectstore_url=objectstore_url, ) diff --git a/src/launchpad/utils/file_utils.py b/src/launchpad/utils/file_utils.py index 96595cad..977975ca 100644 --- a/src/launchpad/utils/file_utils.py +++ b/src/launchpad/utils/file_utils.py @@ -1,65 +1,58 @@ -"""File utilities for app size analyzer.""" - import hashlib import shutil import tempfile +from enum import Enum +from io import BytesIO from pathlib import Path +from typing import IO from .logging import get_logger logger = get_logger(__name__) +_HASH_CHUNK_SIZE = 8192 -def calculate_file_hash(file_path: Path, algorithm: str = "md5") -> str: - """Calculate hash of a file. - - Args: - file_path: Path to the file - algorithm: Hash algorithm to use ("md5", "sha1", "sha256") - Returns: - Hexadecimal hash string +class IdPrefix(Enum): + ICON = "icn" + SNAPSHOT = "snap" - Raises: - ValueError: If algorithm is not supported - FileNotFoundError: If file doesn't exist - """ - if not file_path.exists(): - raise FileNotFoundError(f"File not found: {file_path}") +def _calculate_hash(data: IO[bytes], algorithm: str) -> str: + hasher = None if algorithm == "md5": hasher = hashlib.md5() elif algorithm == "sha1": hasher = hashlib.sha1() elif algorithm == "sha256": hasher = hashlib.sha256() - else: + + if hasher is None: raise ValueError(f"Unsupported hash algorithm: {algorithm}") + for chunk in iter(lambda: data.read(_HASH_CHUNK_SIZE), b""): + hasher.update(chunk) + + return hasher.hexdigest() + + +def id_from_bytes(data: bytes, prefix: IdPrefix) -> str: + return f"{prefix.value}_{_calculate_hash(BytesIO(data), 'sha256')[:12]}" + + +def calculate_file_hash(file_path: Path, algorithm: str = "md5") -> str: + if not file_path.exists(): + raise FileNotFoundError(f"File not found: {file_path}") + try: with open(file_path, "rb") as f: - # Read file in chunks to handle large files efficiently - for chunk in iter(lambda: f.read(8192), b""): - hasher.update(chunk) - - return hasher.hexdigest() + return _calculate_hash(f, algorithm) except Exception as e: raise RuntimeError(f"Failed to calculate hash for {file_path}: {e}") def get_file_size(file_path: Path) -> int: - """Get file size in bytes. - - Args: - file_path: Path to the file - - Returns: - File size in bytes - - Raises: - FileNotFoundError: If file doesn't exist - """ if not file_path.exists(): raise FileNotFoundError(f"File not found: {file_path}") @@ -67,8 +60,6 @@ def get_file_size(file_path: Path) -> int: def to_nearest_block_size(file_size: int, block_size: int) -> int: - """Round file size up to the nearest filesystem block size.""" - if file_size == 0: return 0 @@ -76,25 +67,12 @@ def to_nearest_block_size(file_size: int, block_size: int) -> int: def create_temp_directory(prefix: str = "app-analyzer-") -> Path: - """Create a temporary directory. - - Args: - prefix: Prefix for the temporary directory name - - Returns: - Path to the created temporary directory - """ temp_dir = Path(tempfile.mkdtemp(prefix=prefix)) logger.debug(f"Created temporary directory: {temp_dir}") return temp_dir def cleanup_directory(directory: Path) -> None: - """Remove a directory and all its contents. - - Args: - directory: Directory to remove - """ if directory.exists() and directory.is_dir(): shutil.rmtree(directory) logger.debug(f"Cleaned up directory: {directory}") diff --git a/tests/integration/test_kafka_service.py b/tests/integration/test_kafka_service.py index 7fd5fab3..4a882e95 100644 --- a/tests/integration/test_kafka_service.py +++ b/tests/integration/test_kafka_service.py @@ -176,6 +176,7 @@ def test_process_message_with_skipped_project(self): service_config = ServiceConfig( sentry_base_url="http://test.sentry.io", projects_to_skip=["skip-project"], + objectstore_url="http://test.objectstore.io", ) test_message = { @@ -196,6 +197,7 @@ def test_process_message_with_allowed_project(self): service_config = ServiceConfig( sentry_base_url="http://test.sentry.io", projects_to_skip=["other-project"], + objectstore_url="http://test.objectstore.io", ) test_message = { @@ -226,6 +228,7 @@ def test_process_message_error_handling(self): service_config = ServiceConfig( sentry_base_url="http://test.sentry.io", projects_to_skip=[], + objectstore_url="http://test.objectstore.io", ) test_message = { diff --git a/tests/unit/artifacts/test_artifact_processor.py b/tests/unit/artifacts/test_artifact_processor.py index 7b1cb1f4..fe63fa46 100644 --- a/tests/unit/artifacts/test_artifact_processor.py +++ b/tests/unit/artifacts/test_artifact_processor.py @@ -1,8 +1,9 @@ -"""Tests for ArtifactProcessor including error handling, retry logic, and message processing.""" - from unittest.mock import Mock, patch -from sentry_kafka_schemas.schema_types.preprod_artifact_events_v1 import PreprodArtifactEvents +from objectstore_client import Client as ObjectstoreClient +from sentry_kafka_schemas.schema_types.preprod_artifact_events_v1 import ( + PreprodArtifactEvents, +) from launchpad.artifact_processor import ArtifactProcessor from launchpad.constants import ( @@ -16,13 +17,12 @@ class TestArtifactProcessorErrorHandling: - """Test error handling and retry logic in ArtifactProcessor.""" - def setup_method(self): """Set up test fixtures.""" mock_sentry_client = Mock(spec=SentryClient) mock_statsd = Mock() - self.processor = ArtifactProcessor(mock_sentry_client, mock_statsd) + mock_objectstore_client = Mock(spec=ObjectstoreClient) + self.processor = ArtifactProcessor(mock_sentry_client, mock_statsd, mock_objectstore_client) def test_update_artifact_error_success(self): """Test that _update_artifact_error successfully updates artifact with error.""" @@ -147,7 +147,11 @@ class TestArtifactProcessorMessageHandling: def test_process_message_ios(self, mock_process, mock_sentry_client): """Test processing iOS artifact messages.""" fake_statsd = FakeStatsd() - service_config = ServiceConfig(sentry_base_url="http://test.sentry.io", projects_to_skip=[]) + service_config = ServiceConfig( + sentry_base_url="http://test.sentry.io", + projects_to_skip=[], + objectstore_url="http://test.objectstore.io", + ) # Create a payload for iOS artifact payload: PreprodArtifactEvents = { @@ -162,20 +166,33 @@ def test_process_message_ios(self, mock_process, mock_sentry_client): # Verify process_artifact was called with correct args mock_process.assert_called_once_with( - "test-org-123", "test-project-ios", "ios-test-123", [PreprodFeature.SIZE_ANALYSIS] + "test-org-123", + "test-project-ios", + "ios-test-123", + [PreprodFeature.SIZE_ANALYSIS], ) # Verify metrics were recorded calls = fake_statsd.calls - assert ("increment", {"metric": "artifact.processing.started", "value": 1, "tags": None}) in calls - assert ("increment", {"metric": "artifact.processing.completed", "value": 1, "tags": None}) in calls + assert ( + "increment", + {"metric": "artifact.processing.started", "value": 1, "tags": None}, + ) in calls + assert ( + "increment", + {"metric": "artifact.processing.completed", "value": 1, "tags": None}, + ) in calls @patch("launchpad.artifact_processor.SentryClient") @patch.object(ArtifactProcessor, "process_artifact") def test_process_message_android(self, mock_process, mock_sentry_client): """Test processing Android artifact messages.""" fake_statsd = FakeStatsd() - service_config = ServiceConfig(sentry_base_url="http://test.sentry.io", projects_to_skip=[]) + service_config = ServiceConfig( + sentry_base_url="http://test.sentry.io", + projects_to_skip=[], + objectstore_url="http://test.objectstore.io", + ) # Create a payload for Android artifact payload: PreprodArtifactEvents = { @@ -198,15 +215,25 @@ def test_process_message_android(self, mock_process, mock_sentry_client): # Verify metrics were recorded calls = fake_statsd.calls - assert ("increment", {"metric": "artifact.processing.started", "value": 1, "tags": None}) in calls - assert ("increment", {"metric": "artifact.processing.completed", "value": 1, "tags": None}) in calls + assert ( + "increment", + {"metric": "artifact.processing.started", "value": 1, "tags": None}, + ) in calls + assert ( + "increment", + {"metric": "artifact.processing.completed", "value": 1, "tags": None}, + ) in calls @patch("launchpad.artifact_processor.SentryClient") @patch.object(ArtifactProcessor, "process_artifact") def test_process_message_error(self, mock_process, mock_sentry_client): """Test error handling in message processing.""" fake_statsd = FakeStatsd() - service_config = ServiceConfig(sentry_base_url="http://test.sentry.io", projects_to_skip=[]) + service_config = ServiceConfig( + sentry_base_url="http://test.sentry.io", + projects_to_skip=[], + objectstore_url="http://test.objectstore.io", + ) # Make process_artifact raise an exception mock_process.side_effect = RuntimeError("Download failed: HTTP 404") @@ -224,7 +251,10 @@ def test_process_message_error(self, mock_process, mock_sentry_client): # Verify process_artifact was called mock_process.assert_called_once_with( - "test-org", "test-project", "test-123", [PreprodFeature.SIZE_ANALYSIS, PreprodFeature.BUILD_DISTRIBUTION] + "test-org", + "test-project", + "test-123", + [PreprodFeature.SIZE_ANALYSIS, PreprodFeature.BUILD_DISTRIBUTION], ) # Verify the metrics were called correctly @@ -240,7 +270,9 @@ def test_process_message_project_skipped(self, mock_process, mock_sentry_client) """Test that projects in the skip list are not processed.""" fake_statsd = FakeStatsd() service_config = ServiceConfig( - sentry_base_url="http://test.sentry.io", projects_to_skip=["skip-project-1", "skip-project-2"] + sentry_base_url="http://test.sentry.io", + projects_to_skip=["skip-project-1", "skip-project-2"], + objectstore_url="http://test.objectstore.io", ) # Create a payload for a project that should be skipped @@ -266,7 +298,11 @@ def test_process_message_project_skipped(self, mock_process, mock_sentry_client) def test_process_message_project_not_skipped(self, mock_process, mock_sentry_client): """Test that projects not in the skip list are processed normally.""" fake_statsd = FakeStatsd() - service_config = ServiceConfig(sentry_base_url="http://test.sentry.io", projects_to_skip=["other-project"]) + service_config = ServiceConfig( + sentry_base_url="http://test.sentry.io", + projects_to_skip=["other-project"], + objectstore_url="http://test.objectstore.io", + ) # Create a payload for a project that should NOT be skipped payload: PreprodArtifactEvents = { @@ -289,5 +325,11 @@ def test_process_message_project_not_skipped(self, mock_process, mock_sentry_cli # Verify normal metrics were recorded calls = fake_statsd.calls - assert ("increment", {"metric": "artifact.processing.started", "value": 1, "tags": None}) in calls - assert ("increment", {"metric": "artifact.processing.completed", "value": 1, "tags": None}) in calls + assert ( + "increment", + {"metric": "artifact.processing.started", "value": 1, "tags": None}, + ) in calls + assert ( + "increment", + {"metric": "artifact.processing.completed", "value": 1, "tags": None}, + ) in calls