Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ google-api-core==2.25.1
google-api-python-client==2.181.0
lief==0.17.1
lzfse>=0.4.2
objectstore-client>=0.0.14
pillow>=11.3.0
pillow_heif>=1.1.0
protobuf>=5.29.5,<6
Expand Down
1 change: 1 addition & 0 deletions src/launchpad/api/update_api_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class UpdateData(BaseModel):
apple_app_info: Optional[AppleAppInfo] = None
android_app_info: Optional[AndroidAppInfo] = None
dequeued_at: Optional[datetime] = Field(None, description="Timestamp when message was dequeued from Kafka")
app_icon_id: Optional[str] = None

@field_serializer("dequeued_at")
def serialize_datetime(self, dt: datetime | None) -> str | None:
Expand Down
71 changes: 58 additions & 13 deletions src/launchpad/artifact_processor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
"""Artifact processing logic extracted from LaunchpadService."""

from __future__ import annotations

import contextlib
Expand All @@ -13,7 +11,15 @@

import sentry_sdk

from sentry_kafka_schemas.schema_types.preprod_artifact_events_v1 import PreprodArtifactEvents
from objectstore_client import (
Client as ObjectstoreClient,
)
from objectstore_client import (
Usecase,
)
from sentry_kafka_schemas.schema_types.preprod_artifact_events_v1 import (
PreprodArtifactEvents,
)

from launchpad.api.update_api_models import AndroidAppInfo as AndroidAppInfoModel
from launchpad.api.update_api_models import AppleAppInfo as AppleAppInfoModel
Expand All @@ -39,18 +45,24 @@
from launchpad.size.models.apple import AppleAppInfo
from launchpad.size.models.common import BaseAppInfo
from launchpad.tracing import request_context
from launchpad.utils.file_utils import IdPrefix, id_from_bytes
from launchpad.utils.logging import get_logger
from launchpad.utils.statsd import StatsdInterface, get_statsd

logger = get_logger(__name__)


class ArtifactProcessor:
"""Handles the processing of artifacts including download, analysis, and upload."""

def __init__(self, sentry_client: SentryClient, statsd: StatsdInterface) -> None:
def __init__(
self,
sentry_client: SentryClient,
statsd: StatsdInterface,
objectstore_client: ObjectstoreClient | None,
) -> None:
self._sentry_client = sentry_client
self._statsd = statsd
self._objectstore_client = objectstore_client
self._objectstore_usecase = Usecase(name="preprod")

@staticmethod
def process_message(
Expand All @@ -72,15 +84,18 @@ def process_message(

initialize_sentry_sdk()

organization_id = payload["organization_id"]
project_id = payload["project_id"]
artifact_id = payload["artifact_id"]

if statsd is None:
statsd = get_statsd()
if artifact_processor is None:
sentry_client = SentryClient(base_url=service_config.sentry_base_url)
artifact_processor = ArtifactProcessor(sentry_client, statsd)

organization_id = payload["organization_id"]
project_id = payload["project_id"]
artifact_id = payload["artifact_id"]
objectstore_client = None
if service_config.objectstore_url is not None:
objectstore_client = ObjectstoreClient(service_config.objectstore_url)
artifact_processor = ArtifactProcessor(sentry_client, statsd, objectstore_client)

requested_features = []
for feature in payload.get("requested_features", []):
Expand Down Expand Up @@ -140,14 +155,15 @@ def process_artifact(
path = stack.enter_context(self._download_artifact(organization_id, project_id, artifact_id))
artifact = self._parse_artifact(organization_id, project_id, artifact_id, path)
analyzer = self._create_analyzer(artifact)

app_icon_object_id = self._process_app_icon(organization_id, project_id, artifact_id, artifact)
info = self._preprocess_artifact(
organization_id,
project_id,
artifact_id,
artifact,
analyzer,
dequeued_at,
app_icon_object_id,
)

if PreprodFeature.SIZE_ANALYSIS in requested_features:
Expand Down Expand Up @@ -212,11 +228,12 @@ def _preprocess_artifact(
artifact: Artifact,
analyzer: AndroidAnalyzer | AppleAppAnalyzer,
dequeued_at: datetime,
app_icon_id: str | None,
) -> AppleAppInfo | BaseAppInfo:
logger.info(f"Preprocessing for {artifact_id} (project: {project_id}, org: {organization_id})")
try:
info = analyzer.preprocess(cast(Any, artifact))
update_data = self._prepare_update_data(info, artifact, dequeued_at)
update_data = self._prepare_update_data(info, artifact, dequeued_at, app_icon_id)
self._sentry_client.update_artifact(
org=organization_id,
project=project_id,
Expand All @@ -237,6 +254,32 @@ def _preprocess_artifact(
else:
return info

def _process_app_icon(
self,
organization_id: str,
project_id: str,
artifact_id: str,
artifact: Artifact,
) -> str | None:
if self._objectstore_client is None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If for some reason objectstore_client is None (should only ever happen if the OBJECSTORE_URL env variable isn't set), this will gracefully fail and log.

We should have everything set up properly following https://github.com/getsentry/ops/blob/bc63aef2e313ac482f1ffbca826fb9273b1aa643/k8s/services/launchpad/deployment.yaml#L34

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it should log an error not info?

logger.info(
f"No objectstore client found for {artifact_id} (project: {project_id}, org: {organization_id})"
)
return None

logger.info(f"Processing app icon for {artifact_id} (project: {project_id}, org: {organization_id})")
app_icon = artifact.get_app_icon()
if app_icon is None:
logger.info(f"No app icon found for {artifact_id} (project: {project_id}, org: {organization_id})")
return None

image_id = id_from_bytes(app_icon, IdPrefix.ICON)
icon_key = f"{organization_id}/{project_id}/{image_id}"
logger.info(f"Uploading app icon to object store: {icon_key}")
session = self._objectstore_client.session(self._objectstore_usecase, org=organization_id, project=project_id)
session.put(app_icon, id=icon_key)
return image_id

def _do_distribution(
self,
organization_id: str,
Expand Down Expand Up @@ -416,6 +459,7 @@ def _prepare_update_data(
app_info: AppleAppInfo | BaseAppInfo,
artifact: Artifact,
dequeued_at: datetime,
app_icon_id: str | None,
) -> Dict[str, Any]:
def _get_artifact_type(artifact: Artifact) -> ArtifactType:
if isinstance(artifact, ZippedXCArchive):
Expand Down Expand Up @@ -459,6 +503,7 @@ def _get_artifact_type(artifact: Artifact) -> ArtifactType:
apple_app_info=apple_app_info,
android_app_info=android_app_info,
dequeued_at=dequeued_at,
app_icon_id=app_icon_id,
)

return update_data.model_dump(exclude_none=True)
Expand Down
3 changes: 3 additions & 0 deletions src/launchpad/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,23 @@ class ServiceConfig:

sentry_base_url: str
projects_to_skip: list[str]
objectstore_url: str | None


def get_service_config() -> ServiceConfig:
"""Get service configuration from environment."""
sentry_base_url = os.getenv("SENTRY_BASE_URL")
projects_to_skip_str = os.getenv("PROJECT_IDS_TO_SKIP")
projects_to_skip = projects_to_skip_str.split(",") if projects_to_skip_str else []
objectstore_url = os.getenv("OBJECTSTORE_URL")

if sentry_base_url is None:
sentry_base_url = "http://getsentry.default"

return ServiceConfig(
sentry_base_url=sentry_base_url,
projects_to_skip=projects_to_skip,
objectstore_url=objectstore_url,
)


Expand Down
74 changes: 26 additions & 48 deletions src/launchpad/utils/file_utils.py
Original file line number Diff line number Diff line change
@@ -1,100 +1,78 @@
"""File utilities for app size analyzer."""

import hashlib
import shutil
import tempfile

from enum import Enum
from io import BytesIO
from pathlib import Path
from typing import IO

from .logging import get_logger

logger = get_logger(__name__)

_HASH_CHUNK_SIZE = 8192

def calculate_file_hash(file_path: Path, algorithm: str = "md5") -> str:
"""Calculate hash of a file.
Args:
file_path: Path to the file
algorithm: Hash algorithm to use ("md5", "sha1", "sha256")

Returns:
Hexadecimal hash string
class IdPrefix(Enum):
ICON = "icn"
SNAPSHOT = "snap"

Raises:
ValueError: If algorithm is not supported
FileNotFoundError: If file doesn't exist
"""
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")

def _calculate_hash(data: IO[bytes], algorithm: str) -> str:
hasher = None
if algorithm == "md5":
hasher = hashlib.md5()
elif algorithm == "sha1":
hasher = hashlib.sha1()
elif algorithm == "sha256":
hasher = hashlib.sha256()
else:

if hasher is None:
raise ValueError(f"Unsupported hash algorithm: {algorithm}")

for chunk in iter(lambda: data.read(_HASH_CHUNK_SIZE), b""):
hasher.update(chunk)

return hasher.hexdigest()


def id_from_bytes(data: bytes, prefix: IdPrefix) -> str:
return f"{prefix.value}_{_calculate_hash(BytesIO(data), 'sha256')[:12]}"


def calculate_file_hash(file_path: Path, algorithm: str = "md5") -> str:
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")

try:
with open(file_path, "rb") as f:
# Read file in chunks to handle large files efficiently
for chunk in iter(lambda: f.read(8192), b""):
hasher.update(chunk)

return hasher.hexdigest()
return _calculate_hash(f, algorithm)
except Exception as e:
raise RuntimeError(f"Failed to calculate hash for {file_path}: {e}")


def get_file_size(file_path: Path) -> int:
"""Get file size in bytes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason a bunch of the comments in this file were removed?

Args:
file_path: Path to the file
Returns:
File size in bytes
Raises:
FileNotFoundError: If file doesn't exist
"""
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")

return file_path.stat().st_size


def to_nearest_block_size(file_size: int, block_size: int) -> int:
"""Round file size up to the nearest filesystem block size."""

if file_size == 0:
return 0

return ((file_size - 1) // block_size + 1) * block_size


def create_temp_directory(prefix: str = "app-analyzer-") -> Path:
"""Create a temporary directory.
Args:
prefix: Prefix for the temporary directory name
Returns:
Path to the created temporary directory
"""
temp_dir = Path(tempfile.mkdtemp(prefix=prefix))
logger.debug(f"Created temporary directory: {temp_dir}")
return temp_dir


def cleanup_directory(directory: Path) -> None:
"""Remove a directory and all its contents.
Args:
directory: Directory to remove
"""
if directory.exists() and directory.is_dir():
shutil.rmtree(directory)
logger.debug(f"Cleaned up directory: {directory}")
3 changes: 3 additions & 0 deletions tests/integration/test_kafka_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def test_process_message_with_skipped_project(self):
service_config = ServiceConfig(
sentry_base_url="http://test.sentry.io",
projects_to_skip=["skip-project"],
objectstore_url="http://test.objectstore.io",
)

test_message = {
Expand All @@ -196,6 +197,7 @@ def test_process_message_with_allowed_project(self):
service_config = ServiceConfig(
sentry_base_url="http://test.sentry.io",
projects_to_skip=["other-project"],
objectstore_url="http://test.objectstore.io",
)

test_message = {
Expand Down Expand Up @@ -226,6 +228,7 @@ def test_process_message_error_handling(self):
service_config = ServiceConfig(
sentry_base_url="http://test.sentry.io",
projects_to_skip=[],
objectstore_url="http://test.objectstore.io",
)

test_message = {
Expand Down
Loading
Loading