diff --git a/docs/HEARTBEAT_SYSTEM_README.md b/docs/HEARTBEAT_SYSTEM_README.md new file mode 100644 index 000000000..3006ef9a0 --- /dev/null +++ b/docs/HEARTBEAT_SYSTEM_README.md @@ -0,0 +1,634 @@ +# Profiling Control System with Heartbeat Protocol + +This document describes the implementation of a centralized profiling control system where a Performance Studio backend can dynamically issue profiling commands (start/stop) to gProfiler agents via a heartbeat protocol. + +## System Overview + +``` +┌─────────────────────┐ Heartbeat ┌──────────────────────┐ +│ │ ◄──────────────► │ │ +│ Performance Studio │ │ gProfiler Agent │ +│ Backend │ Commands │ │ +│ │ ────────────────► │ │ +└─────────────────────┘ └──────────────────────┘ +``` + +### Key Components + +1. **Performance Studio Backend** - Central control server that: + - Receives profiling requests via REST API + - Manages profiling commands for hosts/services + - Responds to agent heartbeats with pending commands + - Tracks command execution status + +2. **gProfiler Agent** - Profiling agent that: + - Sends periodic heartbeats to the backend + - Receives and executes profiling commands + - Ensures idempotent command execution + - Reports command completion status + +## Features + +### ✅ Backend Features +- **REST API** for submitting profiling requests +- **Heartbeat endpoint** for agent communication +- **Command merging** for multiple requests targeting same host +- **Process-level and host-level** stop commands +- **Idempotent command execution** using unique command IDs +- **Command completion tracking** +- **PerfSpect integration** for hardware metrics collection + +### ✅ Agent Features +- **Heartbeat communication** with configurable intervals +- **Dynamic profiling** based on server commands +- **Command-driven execution** (start/stop profiling) +- **Idempotency** to prevent duplicate command execution +- **Persistent command tracking** across agent restarts +- **Graceful error handling** and retry logic +- **PerfSpect auto-installation** for hardware metrics collection +- **Hardware metrics integration** with CPU profiling data + +## API Endpoints + +### 1. Submit Profiling Request + +```http +POST /api/metrics/profile_request +``` + +**Request Body:** +```json +{ + "service_name": "my-service", + "command_type": "start", // "start" or "stop" + "duration": 60, + "frequency": 11, + "profiling_mode": "cpu", + "target_hostnames": ["host1", "host2"], + "pids": [1234, 5678], // Optional: specific PIDs + "stop_level": "process", // "process" or "host" (for stop commands) + "additional_args": { + "enable_perfspect": true // Optional: enable hardware metrics collection + } +} +``` + +**Response:** +```json +{ + "success": true, + "message": "Start profiling request submitted successfully", + "request_id": "req-uuid", + "command_id": "cmd-uuid", + "estimated_completion_time": "2025-01-08T12:00:00Z" +} +``` + +### 2. Agent Heartbeat + +```http +POST /api/metrics/heartbeat +``` + +**Request Body:** +```json +{ + "ip_address": "192.168.1.100", + "hostname": "worker-01", + "service_name": "my-service", + "last_command_id": "cmd-uuid", + "available_pids" : [java:{}, python:{}], + "namespaces" : [{namespace: kube_system, pods : [{pod_name: gprofiler, containers : {{pid:123, name: metrics-exporter},{pid:123, name: metrics-exporter}},{pod_name: webapp, containers : {{pid:123, name: metrics-exporter},{pid:123, name: metrics-exporter}}]}], + "status": "active", + "timestamp": "2025-01-08T11:00:00Z" +} +"containers" -> "host" Table -> {container_name, array_of_hosts} +"pod" -> "host" Table -> {pod_name, array_of_hosts} +"namespace" -> "host" Table -> {namespace, array_of_hosts} + +1. add k8s namespace hierarchy info as part of heartbeat +2. save k8s information in hostheartbeats table and create de-normalized table for containersToHosts, podsToHost and namespaceToHosts, +3. perform profiling : support profiling request by namespaces, pods and containers ( 5 ) +4. test e2e ( 3 ) +``` + +**Response:** +```json +{ + "success": true, + "message": "Heartbeat received. New profiling command available.", + "profiling_command": { + "command_type": "start", + "combined_config": { + "duration": 60, + "frequency": 11, + "profiling_mode": "cpu", + "pids": "" + } + }, + "command_id": "cmd-uuid" +} +``` + +### 3. Report Command Completion + +```http +POST /api/metrics/command_completion +``` + +**Request Body:** +```json +{ + "command_id": "cmd-uuid", + "hostname": "worker-01", + "status": "completed", // "completed" or "failed" + "execution_time": 65, + "error_message": null, + "results_path": "s3://bucket/path/to/results" +} +``` + +## PerfSpect Hardware Metrics Integration + +The heartbeat system supports Intel PerfSpect integration for collecting hardware performance metrics alongside CPU profiling data. This feature enables comprehensive performance analysis by combining software-level profiling with hardware-level metrics. + +### Overview + +When `enable_perfspect: true` is included in the `additional_args` of a profiling request, the gProfiler agent will: + +1. **Auto-install PerfSpect**: Downloads and extracts the latest PerfSpect binary from GitHub releases +2. **Configure hardware collection**: Enables `--enable-hw-metrics-collection` flag +3. **Set PerfSpect path**: Configures `--perfspect-path` to the auto-installed binary +4. **Collect metrics**: Runs PerfSpect alongside CPU profiling to gather hardware metrics + +### Agent Behavior + +#### Command Processing +When the agent receives a heartbeat response with `enable_perfspect: true` in the `combined_config`: + +```python +# Agent processes the configuration +if combined_config.get("enable_perfspect", False): + new_args.collect_hw_metrics = True + + # Auto-install PerfSpect + from gprofiler.perfspect_installer import get_or_install_perfspect + perfspect_path = get_or_install_perfspect() + if perfspect_path: + new_args.tool_perfspect_path = str(perfspect_path) + logger.info(f"PerfSpect auto-installed at: {perfspect_path}") +``` + +#### Installation Process +1. **Download**: Fetches `perfspect.tgz` from `https://github.com/intel/PerfSpect/releases/latest/download/perfspect.tgz` +2. **Extract**: Unpacks to `/tmp/gprofiler_perfspect/perfspect/` +3. **Verify**: Checks binary exists and is executable +4. **Configure**: Sets path for gProfiler to use + +#### Data Collection +PerfSpect runs with the following command: +```bash +/tmp/gprofiler_perfspect/perfspect/perfspect metrics \ + --duration 60 \ + --output /tmp/perfspect_data +``` + +### Output Files + +When PerfSpect is enabled, additional files are generated: + +- **Hardware Metrics CSV**: `/tmp/perfspect_data/{hostname}_metrics.csv` +- **Hardware Summary CSV**: `/tmp/perfspect_data/{hostname}_metrics_summary.csv` +- **Hardware HTML Report**: `/tmp/perfspect_data/{hostname}_metrics_summary.html` +- **Latest Metrics**: `/tmp/perfspect_data/{hostname}_metrics_summary_latest.csv` +- **Latest HTML**: `/tmp/perfspect_data/{hostname}_metrics_summary_latest.html` + +### Example Request with PerfSpect + +```bash +curl -X POST http://localhost:8000/api/metrics/profile_request \ + -H "Content-Type: application/json" \ + -d '{ + "service_name": "web-service", + "command_type": "start", + "duration": 60, + "frequency": 11, + "profiling_mode": "cpu", + "target_hostnames": ["worker-01", "worker-02"], + "additional_args": { + "enable_perfspect": true + } + }' +``` + +### Combined Config Example + +The agent receives the following `combined_config` in heartbeat responses: + +```json +{ + "duration": 60, + "frequency": 11, + "continuous": true, + "command_type": "start", + "profiling_mode": "cpu", + "enable_perfspect": true +} +``` + +### Requirements + +- **Platform**: Linux x86_64 (PerfSpect requirement) +- **Permissions**: Root access for hardware performance counter access +- **Network**: Internet access to download PerfSpect binary +- **Storage**: ~50MB for PerfSpect installation and data files + +### Troubleshooting + +#### Common Issues + +1. **Permission Denied**: Ensure agent runs with sufficient privileges + ```bash + sudo ./gprofiler --enable-heartbeat-server ... + ``` + +2. **Download Failures**: Check network connectivity and GitHub access + ```bash + curl -I https://github.com/intel/PerfSpect/releases/latest/download/perfspect.tgz + ``` + +3. **Binary Not Found**: Verify installation directory permissions + ```bash + ls -la /tmp/gprofiler_perfspect/perfspect/ + ``` + +#### Debug Logging + +Enable verbose logging to see PerfSpect installation and execution details: +```bash +./gprofiler --enable-heartbeat-server --verbose +``` + +Look for log messages: +- `PerfSpect auto-installed at: /path/to/binary` +- `Using perfspect path: /path/to/binary` +- `Failed to auto-install PerfSpect, hardware metrics disabled` + +## Usage Examples + +### Backend - Submit Start Command + +```bash +curl -X POST http://localhost:8000/api/metrics/profile_request \ + -H "Content-Type: application/json" \ + -d '{ + "service_name": "web-service", + "command_type": "start", + "duration": 120, + "frequency": 11, + "profiling_mode": "cpu", + "target_hostnames": ["web-01", "web-02"] + "containers" : [], + "pods" : [], + "namespaces" : [], + }' +``` + +### Backend - Submit Stop Command + +```bash +curl -X POST http://localhost:8000/api/metrics/profile_request \ + -H "Content-Type: application/json" \ + -d '{ + "service_name": "web-service", + "command_type": "stop", + "stop_level": "host", + "target_hostnames": ["web-01"] + }' +``` + +### Agent - Run in Heartbeat Mode + +**Basic heartbeat mode:** +```bash +python gprofiler/main.py \ + --enable-heartbeat-server \ + --upload-results \ + --token "your-token" \ + --service-name "web-service" \ + --api-server "http://performance-studio:8000" \ + --heartbeat-interval 30 \ + --output-dir /tmp/profiles \ + --verbose +``` + +**Production deployment with all optimizations:** +```bash +# Set environment variables first +export GPROFILER_TOKEN="my_token" +export GPROFILER_SERVICE="your-service-name" +export GPROFILER_SERVER="http://localhost:8080" + +# Production command (can also source /opt/gprofiler/envs.sh for variables) +/opt/gprofiler/gprofiler \ + -u \ + --token=$GPROFILER_TOKEN \ + --service-name=$GPROFILER_SERVICE \ + --server-host $GPROFILER_SERVER \ + --dont-send-logs \ + --server-upload-timeout 10 \ + -c \ + --disable-metrics-collection \ + --java-safemode= \ + -d 60 \ + --java-no-version-check +``` + +## Implementation Details + +### Backend Logic + +1. **Command Generation**: Each profiling request generates a unique `command_id` +2. **Command Merging**: Multiple requests for the same host are merged into single commands +3. **Stop Handling**: + - Process-level stops remove specific PIDs from commands + - Host-level stops terminate all profiling for the host +4. **Heartbeat Response**: Returns pending commands with `command_type` and configuration + +### Agent Logic + +1. **Heartbeat Loop**: Sends heartbeats at configured intervals +2. **Command Processing**: + - `start`: Stop current profiler (if any) and start new one with given config + - `stop`: Stop current profiler without starting a new one +3. **Idempotency**: Track executed command IDs to prevent duplicates +4. **Persistence**: Save executed command IDs to disk for restart resilience + +### Command Flow + +``` +1. User submits profiling request to backend + ↓ +2. Backend creates command with unique ID + ↓ +3. Agent sends heartbeat to backend + ↓ +4. Backend responds with pending command + ↓ +5. Agent executes command (start/stop profiling) + ↓ +6. Agent reports completion to backend + ↓ +7. Backend updates command status +``` + +## Configuration + +### Backend Configuration +- Database connection for command storage +- API endpoints for profiling control +- Command merging and deduplication logic + +### Agent Configuration +```bash +--enable-heartbeat-server # Enable heartbeat mode +--heartbeat-interval 30 # Heartbeat frequency (seconds) +--api-server URL # Backend server URL +--upload-results # Required for heartbeat mode +--token TOKEN # Authentication token +--service-name NAME # Service identifier +``` + +## Testing + +### Test Scripts + +1. **test_heartbeat_system.py** - Test backend API and heartbeat flow +2. **run_heartbeat_agent.py** - Run agent in heartbeat mode for testing + +### Test Workflow + +1. Start Performance Studio backend +2. Run test agent: `python run_heartbeat_agent.py` +3. Submit test commands: `python test_heartbeat_system.py` +4. Verify agent receives and executes commands +5. Check idempotency and error handling + +## Error Handling + +### Backend +- Validates profiling request parameters +- Handles database connection errors +- Returns appropriate HTTP status codes +- Logs all operations for debugging + +### Agent +- Retries failed heartbeats with backoff +- Continues heartbeat loop on command execution errors +- Persists executed command IDs across restarts +- Graceful shutdown on termination signals + +## Security Considerations + +- **Authentication**: Token-based authentication for agent-backend communication +- **Authorization**: Service-based access control for profiling commands +- **Command Validation**: Validate all command parameters before execution +- **Rate Limiting**: Prevent abuse of profiling requests +- **Audit Logging**: Track all profiling activities for compliance + +## Future Enhancements + +- **Real-time Status**: WebSocket connection for real-time agent status +- **Command Scheduling**: Schedule profiling commands for future execution +- **Resource Monitoring**: Check system resources before starting profiling +- **Multi-tenant Support**: Isolation between different services/teams +- **Command Prioritization**: Priority queues for urgent profiling requests +- **Distributed Coordination**: Coordinate profiling across multiple agents + +## Troubleshooting + +### Common Issues + +1. **Agent not receiving commands** + - Check network connectivity to backend + - Verify authentication token + - Check service name matching + +2. **Commands not executing** + - Check agent logs for errors + - Verify command parameters are valid + - Check system permissions for profiling + +3. **Duplicate commands** + - Verify idempotency implementation + - Check command ID persistence + - Review heartbeat timing + +4. **PerfSpect hardware metrics not working** + - Ensure Linux x86_64 platform (PerfSpect requirement) + - Verify root/sudo permissions for hardware counters + - Check internet connectivity for auto-installation + - Look for "PerfSpect auto-installed" or "Failed to auto-install" log messages + - Verify `/tmp/gprofiler_perfspect/perfspect/perfspect` binary exists and is executable + +### Debugging + +- Enable verbose logging: `--verbose` +- Check heartbeat logs: `/tmp/gprofiler-heartbeat.log` +- Monitor backend API logs +- Use test scripts to isolate issues +- For PerfSpect issues: + - Check PerfSpect installation: `ls -la /tmp/gprofiler_perfspect/perfspect/` + - Test PerfSpect manually: `/tmp/gprofiler_perfspect/perfspect/perfspect --help` + - Check PerfSpect data directory: `ls -la /tmp/perfspect_data/` + - Monitor hardware metrics collection in agent logs + +## Building and Running gProfiler Locally + +### Prerequisites +- Linux system (x86_64 or Aarch64) +- Python 3.10+ for source builds +- Docker for containerized builds +- 16GB+ RAM for full builds +- Root access for profiling operations + +### Build Options + +#### 1. Build Executable (Recommended) + +```bash +cd gprofiler + +# Full build (takes 20-30 minutes, builds all profilers from source) +./scripts/build_x86_64_executable.sh + +# Fast build (for development, skips some optimizations) +./scripts/build_x86_64_executable.sh --fast +``` + +The executable will be created at `build/x86_64/gprofiler`. + +#### 2. Build Docker Image + +```bash +./scripts/build_x86_64_container.sh -t gprofiler +``` + +#### 3. Run from Source (Development) + +```bash +# Install dependencies +pip3 install -r requirements.txt + +# Copy required resources +./scripts/copy_resources_from_image.sh + +# Run directly from source (requires root) +sudo python3 -m gprofiler [options] +``` + +### Running Locally + +#### Basic Local Profiling + +```bash +# Make executable and run basic profiling +chmod +x build/x86_64/gprofiler +sudo ./build/x86_64/gprofiler -o /tmp/gprofiler-output -d 30 +``` + +#### Production-Style Local Run + +```bash +# Set environment variables +export GPROFILER_TOKEN="my_token" +export GPROFILER_SERVICE="your-service-name" +export GPROFILER_SERVER="http://localhost:8080" + +# Run with production flags +sudo ./build/x86_64/gprofiler \ + -u \ + --token=$GPROFILER_TOKEN \ + --service-name=$GPROFILER_SERVICE \ + --server-host $GPROFILER_SERVER \ + --dont-send-logs \ + --server-upload-timeout 10 \ + -c \ + --disable-metrics-collection \ + --java-safemode= \ + -d 60 \ + --java-no-version-check +``` + +#### Local Heartbeat Mode Testing + +```bash +# Run agent in heartbeat mode for testing +sudo ./build/x86_64/gprofiler \ + --enable-heartbeat-server \ + --upload-results \ + --token=$GPROFILER_TOKEN \ + --service-name=$GPROFILER_SERVICE \ + --api-server $GPROFILER_SERVER \ + --heartbeat-interval 30 \ + --output-dir /tmp/profiles \ + --dont-send-logs \ + --server-upload-timeout 10 \ + --disable-metrics-collection \ + --java-safemode= \ + --java-no-version-check \ + --verbose +``` + +#### Local PerfSpect Testing (Manual) + +```bash +# Test PerfSpect integration manually (Linux x86_64 only) +sudo ./build/x86_64/gprofiler \ + --enable-hw-metrics-collection \ + --perfspect-path /path/to/perfspect \ + --perfspect-duration 60 \ + --output-dir /tmp/profiles \ + --duration 60 \ + --verbose +``` + +### Command Line Options Explained + +```bash +-u, --upload-results # Upload results to Performance Studio +--token=$GPROFILER_TOKEN # Authentication token +--service-name=$GPROFILER_SERVICE # Service identifier +--server-host $GPROFILER_SERVER # Performance Studio backend URL +--dont-send-logs # Disable log transmission +--server-upload-timeout 10 # Upload timeout (seconds) +-c, --continuous # Continuous profiling mode +--disable-metrics-collection # Disable system metrics collection +--java-safemode= # Disable Java safe mode (empty value) +-d 60 # Profiling duration (seconds) +--java-no-version-check # Skip Java version check +--enable-heartbeat-server # Enable heartbeat communication +--heartbeat-interval 30 # Heartbeat frequency (seconds) +--api-server URL # Heartbeat API server URL +-o, --output-dir PATH # Local output directory +--verbose # Enable verbose logging + +# PerfSpect Hardware Metrics Options (Linux x86_64 only) +--enable-hw-metrics-collection # Enable hardware metrics via PerfSpect +--perfspect-path PATH # Path to PerfSpect binary (auto-installed in heartbeat mode) +--perfspect-duration SECONDS # PerfSpect collection duration (default: 60) +``` + +### Development Workflow + +1. **Build**: `./scripts/build_x86_64_executable.sh --fast` +2. **Test locally**: `sudo ./build/x86_64/gprofiler -o /tmp/results -d 30` +3. **View results**: Open `/tmp/results/last_flamegraph.html` in browser +4. **Test heartbeat**: Run with `--enable-heartbeat-server` flag + +### Troubleshooting Local Builds + +- **Build fails**: Ensure 16GB+ RAM available +- **Permission errors**: Run profiling commands with `sudo` +- **Docker issues**: Ensure Docker daemon is running +- **Missing dependencies**: Install build requirements with package manager diff --git a/exe-requirements.txt b/exe-requirements.txt index a5f892587..527ddc3d1 100644 --- a/exe-requirements.txt +++ b/exe-requirements.txt @@ -1,2 +1,2 @@ -pyinstaller==6.12.0 +pyinstaller==6.17.0 staticx @ git+https://github.com/Granulate/staticx.git@33eefdadc72832d5aa67c0792768c9e76afb746d; platform.machine == "x86_64" diff --git a/gprofiler/heartbeat.py b/gprofiler/heartbeat.py new file mode 100644 index 000000000..6ebc6e904 --- /dev/null +++ b/gprofiler/heartbeat.py @@ -0,0 +1,609 @@ +# +# Copyright (C) 2022 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import datetime +import logging +import os +import socket +import threading +from pathlib import Path +from typing import TYPE_CHECKING, Any, Dict, Optional + +import configargparse +import requests + +# Use TYPE_CHECKING to avoid circular imports +if TYPE_CHECKING: + from gprofiler.main import GProfiler + +from gprofiler.client import ProfilerAPIClient +from gprofiler.metadata.enrichment import EnrichmentOptions +from gprofiler.metadata.system_metadata import get_hostname +from gprofiler.state import get_state +from gprofiler.usage_loggers import NoopUsageLogger +from gprofiler.utils import resource_path + +logger = logging.getLogger(__name__) + + +class HeartbeatClient: + """Client for sending heartbeats to the server and receiving profiling commands""" + + def __init__(self, api_server: str, service_name: str, server_token: str, verify: bool = True): + self.api_server = api_server.rstrip("/") + self.service_name = service_name + self.server_token = server_token + self.verify = verify + self.hostname = get_hostname() + self.ip_address = self._get_local_ip() + self.last_command_id: Optional[str] = None + self.executed_command_ids: set = set() # Track executed command IDs for idempotency (in-memory) + self.max_command_history = 1000 # Limit command history to prevent memory growth + self.session = requests.Session() + + # Set up authentication headers + if self.server_token: + self.session.headers.update( + {"Authorization": f"Bearer {self.server_token}", "Content-Type": "application/json"} + ) + + def _get_local_ip(self) -> str: + """Get the local IP address""" + try: + # Connect to a remote address to determine local IP + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect(("8.8.8.8", 80)) + return s.getsockname()[0] + except Exception: + return "127.0.0.1" + + def send_heartbeat(self) -> Optional[Dict[str, Any]]: + """Send heartbeat to server and return any profiling commands""" + try: + heartbeat_data = { + "ip_address": self.ip_address, + "hostname": self.hostname, + "service_name": self.service_name, + "last_command_id": self.last_command_id, + "status": "active", + "timestamp": datetime.datetime.now().isoformat(), + } + + url = f"{self.api_server}/api/metrics/heartbeat" + response = self.session.post(url, json=heartbeat_data, verify=self.verify, timeout=30) + + if response.status_code == 200: + result = response.json() + + # Check if the response indicates success + if not result.get("success"): + logger.error(f"Heartbeat returned unsuccessful status: {result}") + return None + + # Check if there's a profiling command + if result.get("profiling_command"): + logger.info(f"Received profiling command from server: {result.get('command_id')}") + return result + else: + logger.debug("Heartbeat sent. No pending commands, waiting for instructions...") + return None + else: + logger.error(f"Heartbeat failed with status {response.status_code}: {response.text}") + return None + + except Exception as e: + logger.error(f"Failed to send heartbeat: {e}") + return None + + def send_command_completion( + self, + command_id: str, + status: str, + execution_time: Optional[int] = None, + error_message: Optional[str] = None, + results_path: Optional[str] = None, + ) -> bool: + """ + Send command completion status to the server. + + Args: + command_id: The ID of the completed command + status: 'completed' or 'failed' + execution_time: Duration of execution in seconds + error_message: Error message if status is 'failed' + results_path: Path to profiling results if available + + Returns: + bool: True if completion was successfully reported, False otherwise + """ + try: + completion_data = { + "command_id": command_id, + "hostname": self.hostname, + "status": status, + "execution_time": execution_time, + "error_message": error_message, + "results_path": results_path, + } + + url = f"{self.api_server}/api/metrics/command_completion" + response = self.session.post(url, json=completion_data, verify=self.verify, timeout=30) + + if response.status_code == 200: + logger.info(f"Successfully reported command completion for {command_id} with status: {status}") + return True + else: + logger.error( + f"Failed to report command completion for {command_id}. " + f"Status: {response.status_code}, Response: {response.text}" + ) + return False + + except Exception as e: + logger.error(f"Failed to send command completion for {command_id}: {e}") + return False + + def mark_command_executed(self, command_id: str): + """Mark a command as executed (in-memory)""" + self.executed_command_ids.add(command_id) + + # Cleanup old command IDs if we exceed the limit + if len(self.executed_command_ids) > self.max_command_history: + self._cleanup_old_command_ids() + + logger.debug(f"Marked command ID {command_id} as executed") + + def _cleanup_old_command_ids(self): + """Remove old command IDs to prevent memory growth""" + try: + # Keep only the most recent commands (this is a simple approach) + # In production, you might want to implement time-based cleanup + if len(self.executed_command_ids) > self.max_command_history: + # Convert to list, sort, and keep the last max_command_history items + command_list = list(self.executed_command_ids) + # Since UUIDs don't sort chronologically, we'll just remove some arbitrary ones + # In a real implementation, you'd want to track timestamps + commands_to_keep = command_list[-self.max_command_history :] + self.executed_command_ids = set(commands_to_keep) + logger.info( + f"Cleaned up command ID history in memory, keeping {len(self.executed_command_ids)} entries" + ) + except Exception as e: + logger.warning(f"Failed to cleanup old command IDs: {e}") + + +class DynamicGProfilerManager: + """Manager for dynamically starting/stopping gProfiler instances based on server commands""" + + def __init__(self, base_args: configargparse.Namespace, heartbeat_client: HeartbeatClient): + self.base_args = base_args + self.heartbeat_client = heartbeat_client + self.current_gprofiler: Optional["GProfiler"] = None + self.current_thread: Optional[threading.Thread] = None + self.stop_event = threading.Event() + self.heartbeat_interval = 30 # seconds + + def start_heartbeat_loop(self): + """Start the main heartbeat loop""" + logger.info("Starting heartbeat loop...") + + while not self.stop_event.is_set(): + try: + # Send heartbeat and check for commands + command_response = self.heartbeat_client.send_heartbeat() + + if command_response and command_response.get("profiling_command"): + profiling_command = command_response["profiling_command"] + command_id = command_response["command_id"] + command_type = profiling_command.get("command_type", "start") + + logger.debug(f"Processing profiling command: {profiling_command}") + + # Check for idempotency - skip if command already executed + if command_id in self.heartbeat_client.executed_command_ids: + logger.debug(f"Command ID {command_id} already executed, skipping...") + + # Wait for next heartbeat + self.stop_event.wait(self.heartbeat_interval) + + continue + + logger.info(f"Received {command_type} command: {command_id}") + + # Validate command type first + if command_type not in ["start", "stop"]: + logger.warning(f"Unknown command type: {command_type}") + # Mark invalid command as executed to prevent retry spam + self.heartbeat_client.mark_command_executed(command_id) + # Report completion for unknown command type + self.heartbeat_client.send_command_completion( + command_id=command_id, + status="failed", + execution_time=0, + error_message=f"Unknown command type: {command_type}", + results_path=None, + ) + continue + + # Mark valid command as executed for idempotency + self.heartbeat_client.mark_command_executed(command_id) + self.heartbeat_client.last_command_id = command_id + + try: + if command_type == "stop": + # Stop current profiler without starting a new one + logger.info(f"Executing STOP command for command ID: {command_id}") + logger.info(f"STOP command details: {profiling_command}") + self._stop_current_profiler() + + # Report completion for stop command + self.heartbeat_client.send_command_completion( + command_id=command_id, + status="completed", + execution_time=0, + error_message=None, + results_path=None, + ) + + # Log a clear message after stop is completed + logger.info("Profiling stopped. Running in heartbeat mode, waiting for commands...") + elif command_type == "start": + # Stop current profiler if running, then start new one + logger.info(f"Executing START command for command ID: {command_id}") + logger.info(f"START command details: {profiling_command}") + self._stop_current_profiler() + self._start_new_profiler(profiling_command, command_id) + + # Report command completion to the server + self.heartbeat_client.send_command_completion( + command_id=command_id, + status="completed", + execution_time=0, + error_message=None, + results_path=None, + ) + except Exception as e: + logger.error(f"Failed to execute {command_type} command {command_id}: {e}", exc_info=True) + # Report failure to the server + self.heartbeat_client.send_command_completion( + command_id=command_id, + status="failed", + execution_time=0, + error_message=str(e), + results_path=None, + ) + + # Wait for next heartbeat + self.stop_event.wait(self.heartbeat_interval) + + except Exception as e: + logger.error(f"Error in heartbeat loop: {e}", exc_info=True) + self.stop_event.wait(self.heartbeat_interval) + + def _stop_current_profiler(self): + """Stop the currently running profiler""" + if self.current_gprofiler: + logger.info("STOPPING current gProfiler instance...") + try: + self.current_gprofiler.stop() # This sets the stop_event! + logger.info("Successfully called gprofiler.stop()") + except Exception as e: + # TODO: This is a huge leak, report it + logger.error(f"Error stopping gProfiler: {e}") + + # ALWAYS cleanup subprocesses regardless of stop() success/failure + try: + logger.info("Starting comprehensive cleanup after heartbeat stop...") + self.current_gprofiler.maybe_cleanup_subprocesses() + logger.info("Comprehensive cleanup completed") + except Exception as cleanup_error: + # Cleanup errors are non-fatal - log and continue + logger.info(f"Cleanup completed with minor errors (expected during stop): {cleanup_error}") + + # Always clear the reference + self.current_gprofiler = None + + if self.current_thread and self.current_thread.is_alive(): + # No need to actively kill the thread, the self.current_gprofiler.stop() already handles it using events + logger.info("Waiting for profiler thread to finish...") + self.current_thread.join(timeout=10) + self.current_thread = None + + def _start_new_profiler(self, profiling_command: Dict[str, Any], command_id: str): + """Start a new profiler with the given configuration""" + try: + # Import here to avoid circular imports + from gprofiler.main import DEFAULT_PROFILING_DURATION + + # Create modified args for the new profiler + new_args = self._create_profiler_args(profiling_command) + + # Create new GProfiler instance + self.current_gprofiler = self._create_gprofiler_instance(new_args) + + # Start profiler in a separate thread + self.current_thread = threading.Thread( + target=self._run_profiler, + args=( + self.current_gprofiler, + new_args.continuous, + getattr(new_args, "duration", DEFAULT_PROFILING_DURATION), + command_id, + ), + daemon=True, + ) + self.current_thread.start() + + logger.info(f"Started new gProfiler instance with command ID: {command_id}") + + except Exception as e: + logger.error(f"Failed to start new profiler: {e}", exc_info=True) + # Report failure to the server + self.heartbeat_client.send_command_completion( + command_id=command_id, status="failed", execution_time=0, error_message=str(e), results_path=None + ) + + def _create_profiler_args(self, profiling_command: Dict[str, Any]) -> configargparse.Namespace: + """Create modified args based on profiling command""" + # Copy base args + new_args = configargparse.Namespace(**vars(self.base_args)) + + # Update with profiling command parameters from combined_config + combined_config = profiling_command.get("combined_config", {}) + if "duration" in combined_config: + new_args.duration = combined_config["duration"] + if "frequency" in combined_config: + new_args.frequency = combined_config["frequency"] + if "profiling_mode" in combined_config: + new_args.profiling_mode = combined_config["profiling_mode"] + if "target_hostnames" in combined_config and combined_config["target_hostnames"]: + # Only profile if this hostname is in the target list or no specific targets + if self.heartbeat_client.hostname not in combined_config["target_hostnames"]: + logger.info(f"Hostname {self.heartbeat_client.hostname} not in target list, skipping profiling") + return None + if "pids" in combined_config and combined_config["pids"]: + new_args.pids_to_profile = combined_config["pids"] + + # Set continuous mode + new_args.continuous = combined_config.get("continuous", False) + + # Handle PerfSpect configuration + enable_perfspect = combined_config.get("enable_perfspect", False) + if enable_perfspect: + new_args.collect_hw_metrics = True + + # Assume PerfSpect is pre-installed as a resource + perfspect_path = resource_path("perfspect/perfspect") + + # Check if PerfSpect binary exists + if os.path.exists(perfspect_path) and os.access(perfspect_path, os.X_OK): + new_args.tool_perfspect_path = perfspect_path + logger.info(f"Using pre-installed PerfSpect at: {perfspect_path}") + else: + logger.error(f"PerfSpect not found at {perfspect_path}, hardware metrics disabled") + new_args.collect_hw_metrics = False + + # Handle max_processes configuration + max_processes = combined_config.get("max_processes", 10) + new_args.max_processes_per_profiler = max_processes + logger.info(f"Setting max processes per profiler: {max_processes}") + + # Handle Profiler Configurations + profiler_configs = combined_config.get("profiler_configs", {}) + if profiler_configs: + logger.info(f"Applying profiler configurations: {profiler_configs}") + + # Handle Perf Profiler configuration + perf_config = profiler_configs.get("perf", "enabled_restricted") + if perf_config == "enabled_restricted": + new_args.max_system_processes_for_system_profilers = 600 + new_args.perf_use_cgroups = True + new_args.perf_max_docker_containers = 2 + logger.info("Perf profiler: enabled restricted mode") + elif perf_config == "enabled_aggressive": + new_args.max_system_processes_for_system_profilers = 1500 + new_args.perf_use_cgroups = True + new_args.perf_max_docker_containers = 50 + logger.info("Perf profiler: enabled aggressive mode") + elif perf_config == "disabled": + new_args.perf_mode = "disabled" + logger.info("Perf profiler: disabled") + + # Handle Pyperf configuration + pyperf_config = profiler_configs.get("pyperf", "enabled") + if pyperf_config == "enabled": + new_args.python_skip_pyperf_profiler_above = 1500 + new_args.python_mode = "pyperf" + logger.info("Pyperf profiler: enabled") + elif pyperf_config == "disabled": + new_args.python_mode = "disabled" + logger.info("Pyperf profiler: disabled, using pyspy") + + # Handle Pyspy configuration + pyspy_config = profiler_configs.get("pyspy", "enabled_fallback") + if pyspy_config == "enabled_fallback": + new_args.python_mode = "auto" + logger.info("Pyspy profiler: enabled as fallback") + elif pyspy_config == "enabled": + new_args.python_mode = "pyspy" + logger.info("Pyspy profiler: enabled") + elif pyspy_config == "disabled" and pyperf_config == "disabled": + new_args.python_mode = "disabled" + logger.info("Pyspy profiler: disabled") + + # Handle Java Async Profiler configuration + async_profiler_config = profiler_configs.get("async_profiler", "enabled") + if async_profiler_config == "disabled": + new_args.java_mode = "disabled" + logger.info("Java async profiler: disabled") + else: + logger.info("Java async profiler: enabled") + + # Handle PHP configuration + phpspy_config = profiler_configs.get("phpspy", "enabled") + if phpspy_config == "disabled": + new_args.php_mode = "disabled" + logger.info("PHP profiler: disabled") + else: + logger.info("PHP profiler: enabled") + + # Handle Ruby configuration + rbspy_config = profiler_configs.get("rbspy", "enabled") + if rbspy_config == "disabled": + new_args.ruby_mode = "disabled" + logger.info("Ruby profiler: disabled") + else: + logger.info("Ruby profiler: enabled") + + # Handle .NET configuration + dotnet_config = profiler_configs.get("dotnet_trace", "enabled") + if dotnet_config == "disabled": + new_args.dotnet_mode = "disabled" + logger.info(".NET profiler: disabled") + else: + logger.info(".NET profiler: enabled") + + # Handle NodeJS configuration + nodejs_config = profiler_configs.get("nodejs_perf", "enabled") + if nodejs_config == "disabled": + new_args.nodejs_mode = "none" + logger.info("NodeJS profiler: disabled") + else: + logger.info("NodeJS profiler: enabled") + + return new_args + + def _create_gprofiler_instance(self, args: configargparse.Namespace) -> "GProfiler": + """Create a new GProfiler instance with the given args""" + if args is None: + return None + + # Import here to avoid circular imports + from gprofiler.main import GProfiler, pids_to_processes + + processes_to_profile = pids_to_processes(args) + state = get_state() + + # Create profiler API client + profiler_api_client = None + if args.upload_results: + profiler_api_client = ProfilerAPIClient( + token=args.server_token, + service_name=args.service_name, + server_address=args.server_host, + curlify_requests=getattr(args, "curlify_requests", False), + hostname=get_hostname(), + verify=args.verify, + upload_timeout=getattr(args, "server-upload-timeout", 120), # Default to 120 seconds + ) + + enrichment_options = EnrichmentOptions( + profile_api_version=args.profile_api_version, + container_names=args.container_names, + application_identifiers=args.collect_appids, + application_identifier_args_filters=args.app_id_args_filters, + application_metadata=args.application_metadata, + ) + + # Create external metadata path if specified + external_metadata_path = None + if hasattr(args, "external_metadata") and args.external_metadata: + external_metadata_path = Path(args.external_metadata) + + # Create heartbeat file path if specified + heartbeat_file_path = None + if hasattr(args, "heartbeat_file") and args.heartbeat_file: + heartbeat_file_path = Path(args.heartbeat_file) + + # Create perfspect path if specified + perfspect_path = None + if hasattr(args, "tool_perfspect_path") and args.tool_perfspect_path: + perfspect_path = Path(args.tool_perfspect_path) + + return GProfiler( + output_dir=getattr(args, "output_dir", None), + flamegraph=getattr(args, "flamegraph", True), + rotating_output=getattr(args, "rotating_output", False), + rootless=getattr(args, "rootless", False), + profiler_api_client=profiler_api_client, + collect_metrics=getattr(args, "collect_metrics", True), + collect_metadata=getattr(args, "collect_metadata", True), + enrichment_options=enrichment_options, + state=state, + usage_logger=NoopUsageLogger(), # Simplified for dynamic profiling + user_args=args.__dict__, + duration=args.duration, + profile_api_version=args.profile_api_version, + profiling_mode=args.profiling_mode, + collect_hw_metrics=getattr(args, "collect_hw_metrics", False), + profile_spawned_processes=getattr(args, "profile_spawned_processes", False), + remote_logs_handler=None, # Simplified for dynamic profiling + controller_process=None, + processes_to_profile=processes_to_profile, + external_metadata_path=external_metadata_path, + heartbeat_file_path=heartbeat_file_path, + perfspect_path=perfspect_path, + perfspect_duration=getattr(args, "tool_perfspect_duration", 60), + ) + + def _run_profiler(self, gprofiler: "GProfiler", continuous: bool, duration: int, command_id: str): + """Run the profiler with specified args""" + if gprofiler is None: + return + + start_time = datetime.datetime.now() + + try: + if continuous: + logger.info(f"Running continuous profiler for command ID: {command_id}") + gprofiler.run_continuous() + else: + logger.info(f"Running profiler for {duration} seconds (command ID: {command_id})...") + gprofiler.run_single() + + # After run completes, check if it was stopped or completed + if gprofiler._profiler_state.stop_event.is_set(): + logger.info(f"Profiler run was stopped before completion for command ID: {command_id}") + else: + logger.info(f"Profiler run completed successfully for command ID: {command_id}") + + # Try to get results path if available + if hasattr(gprofiler, "output_dir") and gprofiler.output_dir: + _ = str(gprofiler.output_dir) # Available for future use + + except Exception as e: + # Internal exceptions can occur during profiling stop + # Only consider a failure if it was not due to a stop event + if not gprofiler._profiler_state.stop_event.is_set(): + _ = str(e) # Available for future error reporting + logger.error(f"Profiler run failed for command ID {command_id}: {e}", exc_info=True) + else: + logger.info(f"Profiler run was stopped before completion for command ID: {command_id}") + + finally: + # Calculate execution time + end_time = datetime.datetime.now() + _ = int((end_time - start_time).total_seconds()) # Available for future use + + # Clear the current profiler reference + if self.current_gprofiler == gprofiler: + self.current_gprofiler = None + + def stop(self): + """Stop the heartbeat manager""" + logger.info("Stopping heartbeat manager...") + self.stop_event.set() + self._stop_current_profiler() diff --git a/gprofiler/main.py b/gprofiler/main.py index ca932a267..e832f5c44 100644 --- a/gprofiler/main.py +++ b/gprofiler/main.py @@ -33,7 +33,7 @@ from granulate_utils.linux.ns import is_root, is_running_in_init_pid from granulate_utils.linux.process import is_process_running from granulate_utils.metadata.cloud import get_aws_execution_env -from psutil import NoSuchProcess, Process +from psutil import NoSuchProcess, Process, process_iter from requests import RequestException, Timeout from gprofiler import __version__ @@ -48,6 +48,7 @@ from gprofiler.diagnostics import log_diagnostics, set_diagnostics from gprofiler.exceptions import APIError, NoProfilersEnabledError from gprofiler.gprofiler_types import ProcessToProfileData, UserArgs, integers_list, positive_integer +from gprofiler.heartbeat import DynamicGProfilerManager, HeartbeatClient from gprofiler.hw_metrics import HWMetricsMonitor, HWMetricsMonitorBase, NoopHWMetricsMonitor from gprofiler.log import RemoteLogsHandler, initial_root_logger_setup from gprofiler.merge import concatenate_from_external_file, concatenate_profiles, merge_profiles @@ -167,6 +168,8 @@ def __init__( profiling_mode=profiling_mode, container_names_client=container_names_client, processes_to_profile=processes_to_profile, + max_processes_per_profiler=user_args.get("max_processes_per_profiler", 0), + max_system_processes_for_system_profilers=user_args.get("max_system_processes_for_system_profilers", 0), ) self.system_profiler, self.process_profilers = get_profilers(user_args, profiler_state=self._profiler_state) self._usage_logger = usage_logger @@ -279,8 +282,37 @@ def start(self) -> None: self._system_metrics_monitor.start() self._hw_metrics_monitor.start() + # Check if system should skip continuous profilers due to process count + skip_system_profilers = False + if self._profiler_state.max_system_processes_for_system_profilers > 0: + try: + total_processes = len(list(process_iter())) + if total_processes > self._profiler_state.max_system_processes_for_system_profilers: + skip_system_profilers = True + logger.warning( + f"Skipping system profilers (perf) - {total_processes} processes exceed threshold " + f"of {self._profiler_state.max_system_processes_for_system_profilers}. " + f"Runtime profilers (py-spy, Java, etc.) will continue normally." + ) + else: + logger.debug( + f"System process count: {total_processes} " + f"(threshold: {self._profiler_state.max_system_processes_for_system_profilers})" + ) + except Exception as e: + logger.warning(f"Could not count system processes, continuing with all profilers: {e}") + for prof in list(self.all_profilers): try: + # Skip system profilers if threshold exceeded + if ( + skip_system_profilers + and hasattr(prof, "_is_system_wide_profiler") + and prof._is_system_wide_profiler() + ): + logger.info(f"Skipping {prof.__class__.__name__} due to high system process count") + continue + prof.start() except Exception: # the SystemProfiler is handled separately - let the user run with '--perf-mode none' if they @@ -594,6 +626,25 @@ def parse_cmd_args() -> configargparse.Namespace: help="Comma separated list of processes that will be filtered to profile," " given multiple times will append pids to one list", ) + parser.add_argument( + "--max-processes-runtime-profiler", + dest="max_processes_per_profiler", + type=positive_integer, + default=0, + help="Maximum number of processes to profile per runtime profiler (0=unlimited). " + "When exceeded, profiles only the top N processes by CPU usage. " + "Does not affect system-wide profilers (perf, eBPF). Default: %(default)s", + ) + parser.add_argument( + "--skip-system-profilers-above", + dest="max_system_processes_for_system_profilers", + type=positive_integer, + default=0, + help="Skip system-wide profilers (perf only) when total system processes exceed this threshold (0=unlimited). " + "When exceeded, prevents perf profiler from starting to reduce resource usage on busy systems. " + "PyPerf has its own threshold via --python-skip-pyperf-profiler-above. " + "Runtime profilers (py-spy, Java, etc.) continue normally with --max-processes limiting. Default: %(default)s", + ) parser.add_argument( "--rootless", action="store_true", @@ -861,6 +912,22 @@ def parse_cmd_args() -> configargparse.Namespace: "The file modification indicates the last snapshot time.", ) + parser.add_argument( + "--enable-heartbeat-server", + action="store_true", + dest="enable_heartbeat_server", + default=False, + help="Enable heartbeat communication with server for dynamic profiling commands", + ) + + parser.add_argument( + "--heartbeat-interval", + type=positive_integer, + dest="heartbeat_interval", + default=30, + help="Interval in seconds for sending heartbeats to server (default: %(default)s)", + ) + if is_linux() and not is_aarch64(): hw_metrics_options = parser.add_argument_group("hardware metrics") hw_metrics_options.add_argument( @@ -936,6 +1003,14 @@ def parse_cmd_args() -> configargparse.Namespace: if args.profile_spawned_processes and args.pids_to_profile is not None: parser.error("--pids is not allowed when profiling spawned processes") + if args.enable_heartbeat_server: + if not args.upload_results: + parser.error("--enable-heartbeat-server requires --upload-results to be enabled") + if not args.server_token: + parser.error("--enable-heartbeat-server requires --token to be provided") + if not args.service_name: + parser.error("--enable-heartbeat-server requires --service-name to be provided") + return args @@ -1215,37 +1290,61 @@ def main() -> None: ApplicationIdentifiers.init(enrichment_options) set_diagnostics(args.diagnostics) - gprofiler = GProfiler( - output_dir=args.output_dir, - flamegraph=args.flamegraph, - rotating_output=args.rotating_output, - rootless=args.rootless, - profiler_api_client=profiler_api_client, - collect_metrics=args.collect_metrics, - collect_metadata=args.collect_metadata, - enrichment_options=enrichment_options, - state=state, - usage_logger=usage_logger, - user_args=args.__dict__, - duration=args.duration, - profile_api_version=args.profile_api_version, - profiling_mode=args.profiling_mode, - collect_hw_metrics=getattr(args, "collect_hw_metrics", False), - profile_spawned_processes=args.profile_spawned_processes, - remote_logs_handler=remote_logs_handler, - controller_process=controller_process, - processes_to_profile=processes_to_profile, - external_metadata_path=external_metadata_path, - heartbeat_file_path=heartbeat_file_path, - perfspect_path=perfspect_path, - perfspect_duration=getattr(args, "tool_perfspect_duration", 60), - verbose=args.verbose, - ) - logger.info("gProfiler initialized and ready to start profiling") - if args.continuous: - gprofiler.run_continuous() + + # Check if heartbeat server mode is enabled FIRST + if args.enable_heartbeat_server: + # Create heartbeat client + heartbeat_client = HeartbeatClient( + api_server=args.api_server, + service_name=args.service_name, + server_token=args.server_token, + verify=args.verify, + ) + + # Create dynamic profiler manager + manager = DynamicGProfilerManager(args, heartbeat_client) + manager.heartbeat_interval = args.heartbeat_interval + + try: + logger.info("Starting heartbeat mode - waiting for server commands...") + manager.start_heartbeat_loop() + except KeyboardInterrupt: + logger.info("Received interrupt signal, stopping heartbeat mode...") + finally: + manager.stop() else: - gprofiler.run_single() + # Normal profiling mode + gprofiler = GProfiler( + output_dir=args.output_dir, + flamegraph=args.flamegraph, + rotating_output=args.rotating_output, + rootless=args.rootless, + profiler_api_client=profiler_api_client, + collect_metrics=args.collect_metrics, + collect_metadata=args.collect_metadata, + enrichment_options=enrichment_options, + state=state, + usage_logger=usage_logger, + user_args=args.__dict__, + duration=args.duration, + profile_api_version=args.profile_api_version, + profiling_mode=args.profiling_mode, + collect_hw_metrics=getattr(args, "collect_hw_metrics", False), + profile_spawned_processes=args.profile_spawned_processes, + remote_logs_handler=remote_logs_handler, + controller_process=controller_process, + processes_to_profile=processes_to_profile, + external_metadata_path=external_metadata_path, + heartbeat_file_path=heartbeat_file_path, + perfspect_path=perfspect_path, + perfspect_duration=getattr(args, "tool_perfspect_duration", 60), + verbose=args.verbose, + ) + logger.info("gProfiler initialized and ready to start profiling") + if args.continuous: + gprofiler.run_continuous() + else: + gprofiler.run_single() except KeyboardInterrupt: pass diff --git a/gprofiler/profiler_state.py b/gprofiler/profiler_state.py index 58597778d..64fa64314 100644 --- a/gprofiler/profiler_state.py +++ b/gprofiler/profiler_state.py @@ -25,6 +25,8 @@ class ProfilerState: profiling_mode: str container_names_client: Optional[ContainerNamesClient] processes_to_profile: Optional[List[Process]] + max_processes_per_profiler: int + max_system_processes_for_system_profilers: int def __post_init__(self) -> None: self._temporary_dir = TemporaryDirectoryWithMode(dir=self.storage_dir, mode=0o755) diff --git a/gprofiler/profilers/perf.py b/gprofiler/profilers/perf.py index d48676e07..28a60fba1 100644 --- a/gprofiler/profilers/perf.py +++ b/gprofiler/profilers/perf.py @@ -125,6 +125,30 @@ def add_highest_avg_depth_stacks_per_process( action="store_false", dest="perf_memory_restart", ), + ProfilerArgument( + "--perf-use-cgroups", + help="Use cgroup-based profiling instead of PID-based profiling for better reliability. " + "Profiles the top N cgroups by resource usage, avoiding crashes from invalid PIDs.", + action="store_true", + default=False, + dest="perf_use_cgroups", + ), + ProfilerArgument( + "--perf-max-cgroups", + help="Maximum number of cgroups to profile when using --perf-use-cgroups. Default: %(default)s", + type=int, + default=50, + dest="perf_max_cgroups", + ), + ProfilerArgument( + "--perf-max-docker-containers", + help="Maximum number of individual Docker containers to profile instead of the broad 'docker' cgroup. " + "When set, profiles the top N highest-resource individual containers rather than all containers together. " + "Set to 0 to use the broad 'docker' cgroup (default behavior). Default: %(default)s", + type=int, + default=0, + dest="perf_max_docker_containers", + ), ], disablement_help="Disable the global perf of processes," " and instead only concatenate runtime-specific profilers results", @@ -138,6 +162,10 @@ class SystemProfiler(ProfilerBase): versions of Go processes. """ + def _is_system_wide_profiler(self) -> bool: + """Perf is a system-wide profiler that can be disabled on busy systems.""" + return True + def __init__( self, frequency: int, @@ -148,6 +176,9 @@ def __init__( perf_inject: bool, perf_node_attach: bool, perf_memory_restart: bool, + perf_use_cgroups: bool = False, + perf_max_cgroups: int = 50, + perf_max_docker_containers: int = 0, min_duration: int = 0, ): super().__init__(frequency, duration, profiler_state, min_duration) @@ -159,6 +190,12 @@ def __init__( self._node_processes: List[Process] = [] self._node_processes_attached: List[Process] = [] self._perf_memory_restart = perf_memory_restart + self._perf_mode = perf_mode + self._perf_dwarf_stack_size = perf_dwarf_stack_size + self._perf_inject = perf_inject + self._perf_use_cgroups = perf_use_cgroups + self._perf_max_cgroups = perf_max_cgroups + self._perf_max_docker_containers = perf_max_docker_containers switch_timeout_s = duration * 3 # allow gprofiler to be delayed up to 3 intervals before timing out. extra_args = [] try: @@ -184,6 +221,9 @@ def __init__( extra_args=extra_args, processes_to_profile=self._profiler_state.processes_to_profile, switch_timeout_s=switch_timeout_s, + use_cgroups=self._perf_use_cgroups, + max_cgroups=self._perf_max_cgroups, + max_docker_containers=self._perf_max_docker_containers, ) self._perfs.append(self._perf_fp) else: @@ -200,6 +240,9 @@ def __init__( extra_args=extra_args, processes_to_profile=self._profiler_state.processes_to_profile, switch_timeout_s=switch_timeout_s, + use_cgroups=self._perf_use_cgroups, + max_cgroups=self._perf_max_cgroups, + max_docker_containers=self._perf_max_docker_containers, ) self._perfs.append(self._perf_dwarf) else: diff --git a/gprofiler/profilers/profiler_base.py b/gprofiler/profilers/profiler_base.py index 78d2ba2c4..61e26103e 100644 --- a/gprofiler/profilers/profiler_base.py +++ b/gprofiler/profilers/profiler_base.py @@ -16,6 +16,7 @@ import concurrent.futures import contextlib +import logging import os import sched import time @@ -180,6 +181,65 @@ def _profile_process(self, process: Process, duration: int, spawned: bool) -> Pr def _notify_selected_processes(self, processes: List[Process]) -> None: pass + def _should_limit_processes(self) -> bool: + """ + Override this in profilers that should NOT respect the max_processes_per_profiler limit. + System-wide profilers (perf, eBPF) should return False. + Runtime profilers (py-spy, Java, Ruby, etc.) should return True (default). + """ + return True + + def _is_system_wide_profiler(self) -> bool: + """ + Override this in system-wide profilers (perf, eBPF) to return True. + These profilers can be disabled when system has too many processes. + """ + return False + + def _get_top_processes_by_cpu(self, processes: List[Process], max_processes: int) -> List[Process]: + """ + Filter processes to the top N by CPU usage to reduce memory consumption. + + Args: + processes: List of processes to filter + max_processes: Maximum number of processes to return + + Returns: + List of top N processes by CPU usage, or all processes if max_processes <= 0 + """ + if max_processes <= 0 or len(processes) <= max_processes: + return processes + + logger.info( + f"{self.__class__.__name__}: Limiting to top {max_processes} processes " + f"(from {len(processes)}) by CPU usage to reduce memory consumption" + ) + + # Get CPU usage for each process, handling exceptions gracefully + processes_with_cpu = [] + for process in processes: + try: + # Use short interval for CPU measurement to avoid blocking + cpu_percent = process.cpu_percent(interval=0.1) + processes_with_cpu.append((process, cpu_percent)) + except (NoSuchProcess, ZombieProcess, PermissionError): + # Process may have died or we don't have permission + # Still include it with 0% CPU so it's considered but deprioritized + processes_with_cpu.append((process, 0.0)) + except Exception as e: + logger.debug(f"Error getting CPU usage for process {process.pid}: {e}") + processes_with_cpu.append((process, 0.0)) + + # Sort by CPU usage (descending) and take top N + processes_with_cpu.sort(key=lambda x: x[1], reverse=True) + top_processes = [proc for proc, cpu in processes_with_cpu[:max_processes]] + + if logger.isEnabledFor(logging.DEBUG): + top_cpu_info = [(proc.pid, cpu) for proc, cpu in processes_with_cpu[: min(5, max_processes)]] + logger.debug(f"{self.__class__.__name__}: Selected top processes by CPU: {top_cpu_info}") + + return top_processes + def _get_process_age(self, process: Process) -> float: """Get the age of a process in seconds.""" try: @@ -206,6 +266,13 @@ def snapshot(self) -> ProcessToProfileData: process for process in processes_to_profile if process in self._profiler_state.processes_to_profile ] logger.debug(f"{self.__class__.__name__}: processes left after filtering: {len(processes_to_profile)}") + + # Apply max_processes_per_profiler limit for runtime profilers (not system-wide profilers) + if self._should_limit_processes() and self._profiler_state.max_processes_per_profiler > 0: + processes_to_profile = self._get_top_processes_by_cpu( + processes_to_profile, self._profiler_state.max_processes_per_profiler + ) + self._notify_selected_processes(processes_to_profile) if not processes_to_profile: diff --git a/gprofiler/profilers/python.py b/gprofiler/profilers/python.py index 61d47e863..42d7bf4bf 100644 --- a/gprofiler/profilers/python.py +++ b/gprofiler/profilers/python.py @@ -45,6 +45,7 @@ StackToSampleCount, integers_list, nonnegative_integer, + positive_integer, ) from gprofiler.log import get_logger_adapter from gprofiler.metadata import application_identifiers @@ -378,6 +379,16 @@ def _should_skip_process(self, process: Process) -> bool: " they are not recognized by gProfiler as Python processes." " Note - gProfiler assumes that the given processes are kept running as long as gProfiler runs.", ), + ProfilerArgument( + name="--python-skip-pyperf-profiler-above", + dest="python_skip_pyperf_profiler_above", + type=positive_integer, + default=0, + help="Skip PyPerf (eBPF Python profiler) when Python processes exceed this threshold (0=unlimited). " + "When exceeded, prevents PyPerf from starting but allows py-spy fallback for Python profiling. " + "This provides fine-grained control over PyPerf resource usage independent of system profilers. " + "Default: %(default)s", + ), ], supported_profiling_modes=["cpu"], ) @@ -398,6 +409,7 @@ def __init__( python_pyperf_verbose: bool, python_pyspy_process: List[int], min_duration: int = 0, + python_skip_pyperf_profiler_above: int = 0, ): if python_mode == "py-spy": python_mode = "pyspy" @@ -422,6 +434,7 @@ def __init__( python_pyperf_user_stacks_pages, python_pyperf_verbose, min_duration, + python_skip_pyperf_profiler_above, ) else: self._ebpf_profiler = None @@ -449,6 +462,7 @@ def _create_ebpf_profiler( user_stacks_pages: Optional[int], verbose: bool, min_duration: int, + python_skip_pyperf_profiler_above: int, ) -> Optional[PythonEbpfProfiler]: try: profiler = PythonEbpfProfiler( @@ -459,6 +473,7 @@ def _create_ebpf_profiler( user_stacks_pages=user_stacks_pages, verbose=verbose, min_duration=min_duration, + python_skip_pyperf_profiler_above=python_skip_pyperf_profiler_above, ) profiler.test() return profiler @@ -468,6 +483,18 @@ def _create_ebpf_profiler( return None def start(self) -> None: + # Check PyPerf-specific skip logic first + if self._ebpf_profiler is not None: + if self._ebpf_profiler.should_skip_due_to_python_threshold(): + # Skip PyPerf but keep py-spy as fallback + logger.info("PyPerf skipped due to Python process threshold, falling back to py-spy") + self._ebpf_profiler = None + + # Ensure py-spy profiler exists as fallback + if self._pyspy_profiler is None: + logger.warning("PyPerf skipped but no py-spy fallback available") + + # Start the appropriate profiler if self._ebpf_profiler is not None: self._ebpf_profiler.start() elif self._pyspy_profiler is not None: diff --git a/gprofiler/profilers/python_ebpf.py b/gprofiler/profilers/python_ebpf.py index cf76ca698..aae0792ec 100644 --- a/gprofiler/profilers/python_ebpf.py +++ b/gprofiler/profilers/python_ebpf.py @@ -78,6 +78,7 @@ def __init__( user_stacks_pages: Optional[int] = None, verbose: bool, min_duration: int = 0, + python_skip_pyperf_profiler_above: int = 0, ): super().__init__(frequency, duration, profiler_state, min_duration) self.process: Optional[Popen] = None @@ -89,10 +90,64 @@ def __init__( self._metadata = python.PythonMetadata(self._profiler_state.stop_event) self._verbose = verbose self._pyperf_staticx_tmpdir: Optional[Path] = None + self._python_skip_pyperf_profiler_above = python_skip_pyperf_profiler_above if os.environ.get("TMPDIR", None) is not None: # We want to create a new level of hirerachy in our current staticx tempdir. self._pyperf_staticx_tmpdir = Path(os.environ["TMPDIR"]) / ("pyperf_" + random_prefix()) + def _count_python_processes(self) -> int: + """ + Count Python processes using the same detection logic as py-spy. + This ensures consistent counting between PyPerf skip logic and py-spy process selection. + """ + try: + from gprofiler.utils import pgrep_exe, pgrep_maps + + # Count all processes that match Python detection criteria + python_pattern = "python" + python_processes = set() + + # Check via maps (memory mappings contain libpython) + try: + python_processes.update(pgrep_maps(python_pattern)) + except Exception: + pass + + # Check via executable name + try: + python_processes.update(pgrep_exe(python_pattern)) + except Exception: + pass + + return len(python_processes) + except Exception as e: + logger.debug(f"Error counting Python processes: {e}") + return 0 + + def should_skip_due_to_python_threshold(self) -> bool: + """ + Check if PyPerf should be skipped due to too many Python processes. + This provides PyPerf-specific resource management independent of system profiler logic. + """ + if self._python_skip_pyperf_profiler_above <= 0: + return False # No threshold set, don't skip + + python_process_count = self._count_python_processes() + should_skip = python_process_count > self._python_skip_pyperf_profiler_above + + if should_skip: + logger.info( + f"Skipping PyPerf - {python_process_count} Python processes exceed threshold " + f"of {self._python_skip_pyperf_profiler_above}. py-spy fallback will be used for Python profiling." + ) + else: + logger.debug( + f"PyPerf: Python process count {python_process_count} " + f"(threshold: {self._python_skip_pyperf_profiler_above})" + ) + + return should_skip + @classmethod def _check_output(cls, process: Popen, output_path: Path) -> None: if not glob.glob(f"{str(output_path)}.*"): diff --git a/gprofiler/utils/cgroup_utils.py b/gprofiler/utils/cgroup_utils.py new file mode 100644 index 000000000..7b6090926 --- /dev/null +++ b/gprofiler/utils/cgroup_utils.py @@ -0,0 +1,534 @@ +# +# Copyright (C) 2025 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import os +from dataclasses import dataclass +from enum import Enum +from typing import List, Optional + +logger = logging.getLogger(__name__) + + +class CgroupVersion(Enum): + """Cgroup version enumeration""" + + V1 = "v1" + V2 = "v2" + UNKNOWN = "unknown" + + +@dataclass +class CgroupResourceUsage: + """Represents resource usage for a cgroup""" + + cgroup_path: str + name: str + cpu_usage: int # CPU usage in nanoseconds + memory_usage: int # Memory usage in bytes + + @property + def total_score(self) -> float: + """Calculate a combined score for ranking cgroups by resource usage + + Prioritizes CPU usage over memory since CPU indicates active processes + that are more interesting for profiling. + """ + # Normalize CPU (ns) and memory (bytes) to comparable scales + cpu_score = self.cpu_usage / 1_000_000_000 # ns to seconds + memory_score = self.memory_usage / (1024 * 1024) # bytes to MB + + # Weight CPU heavily (10x) since active CPU usage is more important for profiling + # than static memory usage + return (cpu_score * 10) + memory_score + + +def detect_cgroup_version() -> CgroupVersion: + """Detect which cgroup version is in use for Docker containers""" + try: + # Check if Docker containers are using cgroup v1 paths (hybrid systems) + if os.path.exists("/sys/fs/cgroup/memory/docker") or os.path.exists("/sys/fs/cgroup/cpu,cpuacct/docker"): + return CgroupVersion.V1 + + # Check if cgroup v2 is mounted and being used + with open("/proc/mounts", "r") as f: + mounts = f.read() + if "cgroup2" in mounts and "/sys/fs/cgroup" in mounts: + # Check if Docker containers exist in v2 paths + v2_docker_paths = ["/sys/fs/cgroup/system.slice", "/sys/fs/cgroup/docker"] + for path in v2_docker_paths: + if os.path.exists(path): + try: + entries = os.listdir(path) + if any("docker" in entry.lower() for entry in entries): + return CgroupVersion.V2 + except (OSError, PermissionError): + continue + + # If cgroup2 is mounted but no Docker containers found in v2, check v1 + if "/sys/fs/cgroup/memory" in mounts or "/sys/fs/cgroup/cpu" in mounts: + return CgroupVersion.V1 + else: + return CgroupVersion.V2 + elif "cgroup" in mounts and ("/sys/fs/cgroup/memory" in mounts or "/sys/fs/cgroup/cpu" in mounts): + return CgroupVersion.V1 + except (IOError, OSError) as e: + logger.debug(f"Failed to read /proc/mounts: {e}") + + # Fallback: check filesystem structure + if os.path.exists("/sys/fs/cgroup/memory") or os.path.exists("/sys/fs/cgroup/cpu,cpuacct"): + return CgroupVersion.V1 + elif os.path.exists("/sys/fs/cgroup/cgroup.controllers"): + return CgroupVersion.V2 + + return CgroupVersion.UNKNOWN + + +def is_cgroup_available() -> bool: + """Check if cgroup filesystem is available and mounted""" + return os.path.exists("/sys/fs/cgroup") and detect_cgroup_version() != CgroupVersion.UNKNOWN + + +def get_cgroup_cpu_usage(cgroup_path: str) -> Optional[int]: + """Get CPU usage for a cgroup in nanoseconds""" + cgroup_version = detect_cgroup_version() + + if cgroup_version == CgroupVersion.V2: + # cgroup v2 uses cpu.stat file + cpu_stat_file = os.path.join(cgroup_path, "cpu.stat") + if os.path.exists(cpu_stat_file): + try: + with open(cpu_stat_file, "r") as f: + for line in f: + if line.startswith("usage_usec "): + # Convert microseconds to nanoseconds + return int(line.split()[1]) * 1000 + except (IOError, ValueError) as e: + logger.debug(f"Failed to read CPU usage from {cpu_stat_file}: {e}") + return None + + else: # cgroup v1 + usage_file = os.path.join(cgroup_path, "cpuacct.usage") + if not os.path.exists(usage_file): + # Try alternative path + alt_path = cgroup_path.replace("/cpu,cpuacct/", "/cpuacct/") + usage_file = os.path.join(alt_path, "cpuacct.usage") + if not os.path.exists(usage_file): + return None + + try: + with open(usage_file, "r") as f: + return int(f.read().strip()) + except (IOError, ValueError) as e: + logger.debug(f"Failed to read CPU usage from {usage_file}: {e}") + return None + + +def get_cgroup_memory_usage(cgroup_path: str) -> Optional[int]: + """Get memory usage for a cgroup in bytes""" + cgroup_version = detect_cgroup_version() + + if cgroup_version == CgroupVersion.V2: + # cgroup v2 uses memory.current file + usage_file = os.path.join(cgroup_path, "memory.current") + else: # cgroup v1 + usage_file = os.path.join(cgroup_path, "memory.usage_in_bytes") + + if not os.path.exists(usage_file): + return None + + try: + with open(usage_file, "r") as f: + return int(f.read().strip()) + except (IOError, ValueError) as e: + logger.debug(f"Failed to read memory usage from {usage_file}: {e}") + return None + + +def find_all_cgroups() -> List[str]: + """Find all available cgroups in the system""" + cgroups = [] + cgroup_version = detect_cgroup_version() + + if cgroup_version == CgroupVersion.V2: + # cgroup v2 unified hierarchy + base = "/sys/fs/cgroup" + try: + for root, dirs, files in os.walk(base): + # Skip the root directory itself + if root == base: + continue + + # Check if this directory has the necessary files for v2 + cpu_file = os.path.join(root, "cpu.stat") + memory_file = os.path.join(root, "memory.current") + + if os.path.exists(cpu_file) or os.path.exists(memory_file): + cgroups.append(root) + except OSError as e: + logger.debug(f"Error walking cgroup v2 directory {base}: {e}") + + else: # cgroup v1 + # Common cgroup mount points to check + cgroup_bases = [ + "/sys/fs/cgroup/cpu,cpuacct", + "/sys/fs/cgroup/memory", + "/sys/fs/cgroup/cpuacct", + ] + + for base in cgroup_bases: + if os.path.exists(base): + try: + # Walk through all subdirectories + for root, dirs, files in os.walk(base): + # Skip the base directory itself + if root == base: + continue + + # Check if this directory has the necessary files + cpu_file = os.path.join(root, "cpuacct.usage") + memory_file = root.replace("/cpu,cpuacct/", "/memory/") + "/memory.usage_in_bytes" + + if os.path.exists(cpu_file) or os.path.exists(memory_file): + cgroups.append(root) + except OSError as e: + logger.debug(f"Error walking cgroup directory {base}: {e}") + continue + + return list(set(cgroups)) # Remove duplicates + + +def get_cgroup_resource_usage(cgroup_path: str) -> Optional[CgroupResourceUsage]: + """Get resource usage for a single cgroup""" + cpu_usage = get_cgroup_cpu_usage(cgroup_path) + + # For memory, try to find the corresponding memory cgroup path + memory_path = cgroup_path.replace("/cpu,cpuacct/", "/memory/") + if not os.path.exists(memory_path): + memory_path = cgroup_path.replace("/cpuacct/", "/memory/") + + memory_usage = get_cgroup_memory_usage(memory_path) + + # If we can't get any usage data, skip this cgroup + if cpu_usage is None and memory_usage is None: + return None + + # Use 0 as default if one metric is missing + cpu_usage = cpu_usage or 0 + memory_usage = memory_usage or 0 + + # Extract a readable name from the path + name = os.path.basename(cgroup_path) + if len(name) > 12: # Truncate long container IDs + name = name[:12] + + return CgroupResourceUsage(cgroup_path=cgroup_path, name=name, cpu_usage=cpu_usage, memory_usage=memory_usage) + + +def get_top_cgroups_by_usage(limit: int = 50) -> List[CgroupResourceUsage]: + """Get the top N cgroups by resource usage""" + if not is_cgroup_available(): + logger.warning("Cgroup filesystem not available") + return [] + + all_cgroups = find_all_cgroups() + logger.debug(f"Found {len(all_cgroups)} cgroups to analyze") + + cgroup_usages = [] + for cgroup_path in all_cgroups: + usage = get_cgroup_resource_usage(cgroup_path) + if usage: + cgroup_usages.append(usage) + + # Sort by total resource usage score (descending) + cgroup_usages.sort(key=lambda x: x.total_score, reverse=True) + + logger.debug(f"Analyzed {len(cgroup_usages)} cgroups with resource data") + + return cgroup_usages[:limit] + + +def cgroup_to_perf_name(cgroup_path: str) -> str: + """Convert a cgroup path to the name format expected by perf -G option""" + # perf expects the cgroup name relative to the cgroup mount point + # For example: /sys/fs/cgroup/memory/docker/abc123 -> docker/abc123 + + # Find the relative path from the cgroup mount point + for base in ["/sys/fs/cgroup/memory/", "/sys/fs/cgroup/cpu,cpuacct/", "/sys/fs/cgroup/cpuacct/"]: + if cgroup_path.startswith(base): + return cgroup_path[len(base) :] + + # Fallback: just use the basename + return os.path.basename(cgroup_path) + + +def convert_cgroupv2_path_to_perf_name(cgroup_path: str) -> str: + """Convert a cgroup v2 path to perf-compatible name""" + # Remove the base cgroup path + if cgroup_path.startswith("/sys/fs/cgroup/"): + relative_path = cgroup_path[len("/sys/fs/cgroup/") :] + else: + relative_path = cgroup_path + + # Handle Docker container paths in cgroup v2 + if "docker-" in relative_path and ".scope" in relative_path: + # Extract container ID from system.slice/docker-.scope + import re + + match = re.search(r"docker-([a-f0-9]{64})\.scope", relative_path) + if match: + container_id = match.group(1) + return f"docker/{container_id}" + + # Handle other Docker paths + if relative_path.startswith("docker/"): + return relative_path + + # For other cgroups, use the relative path + return relative_path + + +def validate_cgroup_perf_event_access(cgroup_name: str) -> bool: + """Check if a cgroup is available for perf profiling""" + cgroup_version = detect_cgroup_version() + + if cgroup_version == CgroupVersion.V2: + # In cgroup v2, perf events are handled differently + # The cgroup path should exist in the unified hierarchy + if cgroup_name.startswith("docker/"): + # For Docker containers in cgroup v2, check common paths + container_id = cgroup_name.replace("docker/", "") + possible_paths = [ + f"/sys/fs/cgroup/system.slice/docker-{container_id}.scope", + f"/sys/fs/cgroup/docker/{container_id}", + f"/sys/fs/cgroup/system.slice/docker.service/docker/{container_id}", + ] + for path in possible_paths: + if os.path.exists(path) and os.path.isdir(path): + return True + return False + else: + # For other cgroups in v2, check if the path exists + # Handle both absolute and relative paths + if cgroup_name.startswith("/sys/fs/cgroup/"): + cgroup_path = cgroup_name + else: + cgroup_path = f"/sys/fs/cgroup/{cgroup_name}" + return os.path.exists(cgroup_path) and os.path.isdir(cgroup_path) + + else: # cgroup v1 + perf_event_path = f"/sys/fs/cgroup/perf_event/{cgroup_name}" + return os.path.exists(perf_event_path) and os.path.isdir(perf_event_path) + + +def get_top_docker_containers_for_perf(limit: int) -> List[str]: + """Get top Docker containers by resource usage for perf profiling + + Returns individual Docker container cgroup names that exist in perf_event controller. + """ + import subprocess + + docker_containers = [] + cgroup_version = detect_cgroup_version() + + try: + # Get running Docker containers with resource stats + result = subprocess.run( + ["docker", "stats", "--no-stream", "--format", "{{.Container}}\t{{.CPUPerc}}\t{{.MemUsage}}"], + capture_output=True, + text=True, + timeout=10, + ) + + if result.returncode == 0: + container_stats = [] + for line in result.stdout.strip().split("\n"): + if line.strip(): + parts = line.split("\t") + if len(parts) >= 2: + container_id = parts[0] + cpu_percent_str = parts[1].replace("%", "") + try: + cpu_percent = float(cpu_percent_str) + container_stats.append((container_id, cpu_percent)) + except ValueError: + continue + + # Sort by CPU usage (descending) + container_stats.sort(key=lambda x: x[1], reverse=True) + + # Get full container IDs and check perf_event access + for container_id, cpu_percent in container_stats[ + : limit * 2 + ]: # Get more than needed in case some don't have perf access + try: + # Get full container ID + full_id_result = subprocess.run( + ["docker", "inspect", "--format", "{{.Id}}", container_id], + capture_output=True, + text=True, + timeout=5, + ) + + if full_id_result.returncode == 0: + full_id = full_id_result.stdout.strip() + + if cgroup_version == CgroupVersion.V2: + # For cgroup v2, we need to find the actual cgroup path + # and use the relative path for perf + possible_paths = [ + f"/sys/fs/cgroup/system.slice/docker-{full_id}.scope", + f"/sys/fs/cgroup/docker/{full_id}", + f"/sys/fs/cgroup/system.slice/docker.service/docker/{full_id}", + ] + + docker_cgroup = None + for path in possible_paths: + if os.path.exists(path) and os.path.isdir(path): + # For cgroup v2, perf expects the relative path from /sys/fs/cgroup/ + docker_cgroup = path.replace("/sys/fs/cgroup/", "") + logger.debug( + f"Found cgroup v2 path for container {container_id}: {path} -> {docker_cgroup}" + ) + break + + if not docker_cgroup: + # Fallback: try to find any docker-related path for this container + try: + import glob + + pattern = f"/sys/fs/cgroup/**/docker*{full_id[:12]}*" + matches = glob.glob(pattern, recursive=True) + if matches: + docker_cgroup = matches[0].replace("/sys/fs/cgroup/", "") + logger.debug(f"Found fallback cgroup v2 path: {matches[0]} -> {docker_cgroup}") + else: + docker_cgroup = f"docker/{full_id}" # Last resort fallback + logger.debug(f"No cgroup v2 path found, using fallback: {docker_cgroup}") + except Exception as e: + docker_cgroup = f"docker/{full_id}" + logger.debug(f"Error finding cgroup v2 path: {e}, using fallback: {docker_cgroup}") + else: + # cgroup v1 format + docker_cgroup = f"docker/{full_id}" + + # Check if this container has perf_event access + if validate_cgroup_perf_event_access(docker_cgroup): + docker_containers.append(docker_cgroup) + logger.debug( + f"Added Docker container for profiling: {container_id} " + f"(CPU: {cpu_percent}%) -> {docker_cgroup}" + ) + + if len(docker_containers) >= limit: + break + else: + logger.debug(f"Docker container {container_id} not available for perf profiling") + + except (subprocess.TimeoutExpired, subprocess.CalledProcessError) as e: + logger.debug(f"Failed to get full ID for container {container_id}: {e}") + continue + + except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError) as e: + logger.debug(f"Failed to get Docker container stats: {e}") + + return docker_containers + + +def get_top_cgroup_names_for_perf(limit: int = 50, max_docker_containers: int = 0) -> List[str]: + """Get top cgroup names in the format needed for perf -G option + + Args: + limit: Maximum total number of cgroups to return + max_docker_containers: If > 0, profile individual Docker containers instead of broad 'docker' cgroup + + Only returns cgroups that exist in both resource controllers (memory/cpu) + and the perf_event controller, since perf needs access to both. + """ + if max_docker_containers > 0: + # Use individual Docker container profiling + docker_containers = get_top_docker_containers_for_perf(max_docker_containers) + + # Get other non-Docker cgroups + top_cgroups = get_top_cgroups_by_usage(limit) + other_cgroups = [] + seen_names = set(docker_containers) # Track unique cgroup names to avoid duplicates + + for cgroup in top_cgroups: + cgroup_name = cgroup_to_perf_name(cgroup.cgroup_path) + + # Skip Docker cgroups (we're handling them individually) + if cgroup_name.startswith("docker"): + continue + + # Skip duplicates + if cgroup_name in seen_names: + logger.debug(f"Skipping duplicate cgroup name {cgroup_name}") + continue + + if validate_cgroup_perf_event_access(cgroup_name): + other_cgroups.append(cgroup_name) + seen_names.add(cgroup_name) + + # Respect total limit + if len(docker_containers) + len(other_cgroups) >= limit: + break + else: + logger.debug(f"Skipping cgroup {cgroup_name} - not available in perf_event controller") + + valid_cgroups = docker_containers + other_cgroups + + if docker_containers: + logger.info( + f"Using individual Docker container profiling: {len(docker_containers)} containers, " + f"{len(other_cgroups)} other cgroups" + ) + + else: + # Use traditional cgroup profiling (including broad 'docker' cgroup) + top_cgroups = get_top_cgroups_by_usage(limit) + valid_cgroups = [] + seen_names = set() # Track unique cgroup names to avoid duplicates + + for cgroup in top_cgroups: + cgroup_name = cgroup_to_perf_name(cgroup.cgroup_path) + + # Skip duplicates (same cgroup from different controllers) + if cgroup_name in seen_names: + logger.debug(f"Skipping duplicate cgroup name {cgroup_name}") + continue + + if validate_cgroup_perf_event_access(cgroup_name): + valid_cgroups.append(cgroup_name) + seen_names.add(cgroup_name) + else: + logger.debug(f"Skipping cgroup {cgroup_name} - not available in perf_event controller") + + if len(valid_cgroups) < limit: + logger.info(f"Filtered cgroups for perf: {len(valid_cgroups)}/{limit} cgroups have perf_event access") + + return valid_cgroups + + +def validate_perf_cgroup_support() -> bool: + """Check if the current perf binary supports cgroup filtering""" + try: + import subprocess + + result = subprocess.run(["perf", "record", "--help"], capture_output=True, text=True, timeout=10) + return "--cgroup" in result.stdout or "-G" in result.stdout + except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError): + return False diff --git a/gprofiler/utils/perf_process.py b/gprofiler/utils/perf_process.py index ef8d623b8..167af7fee 100644 --- a/gprofiler/utils/perf_process.py +++ b/gprofiler/utils/perf_process.py @@ -48,6 +48,9 @@ def __init__( extra_args: List[str], processes_to_profile: Optional[List[Process]], switch_timeout_s: int, + use_cgroups: bool = False, + max_cgroups: int = 50, + max_docker_containers: int = 0, ): self._start_time = 0.0 self._frequency = frequency @@ -55,12 +58,77 @@ def __init__( self._output_path = output_path self._type = "dwarf" if is_dwarf else "fp" self._inject_jit = inject_jit + self._use_cgroups = use_cgroups + self._max_cgroups = max_cgroups self._pid_args = [] - if processes_to_profile is not None: + self._cgroup_args = [] + + # Determine profiling strategy + if use_cgroups: + from gprofiler.utils.cgroup_utils import ( + get_top_cgroup_names_for_perf, + is_cgroup_available, + validate_perf_cgroup_support, + ) + + # Use cgroup-based profiling for better reliability + if is_cgroup_available() and validate_perf_cgroup_support(): + try: + top_cgroups = get_top_cgroup_names_for_perf(max_cgroups, max_docker_containers) + if top_cgroups: + # Cgroup monitoring requires system-wide mode (-a) + self._pid_args.append("-a") + self._cgroup_args.extend(["-G", ",".join(top_cgroups)]) + logger.info( + f"Using cgroup-based profiling with {len(top_cgroups)} top cgroups: " + f"{top_cgroups[:3]}{'...' if len(top_cgroups) > 3 else ''}" + ) + else: + # Never fall back to system-wide profiling when cgroups are explicitly requested + from gprofiler.exceptions import PerfNoSupportedEvent + + if max_docker_containers > 0: + logger.error( + f"No Docker containers found for profiling despite " + f"--perf-max-docker-containers={max_docker_containers}. " + "This could indicate cgroup v2 compatibility issues or no running containers. " + "Perf profiler will be disabled to prevent system-wide profiling." + ) + raise PerfNoSupportedEvent( + "Docker container profiling requested but no containers available" + ) + elif max_cgroups > 0: + logger.error( + f"No cgroups found for profiling despite --perf-max-cgroups={max_cgroups}. " + "This could indicate cgroup compatibility issues or no active cgroups. " + "Perf profiler will be disabled to prevent system-wide profiling." + ) + raise PerfNoSupportedEvent("Cgroup profiling requested but no cgroups available") + else: + logger.error( + "Cgroup profiling was requested (--perf-use-cgroups) but no specific limits were set. " + "Perf profiler will be disabled to prevent system-wide profiling." + ) + raise PerfNoSupportedEvent( + "Cgroup profiling requested but no containers or cgroups specified" + ) + except Exception as e: + # Never fall back to system-wide profiling when cgroups are explicitly requested + from gprofiler.exceptions import PerfNoSupportedEvent + + logger.error( + f"Failed to get cgroups for profiling: {e}. " + "Perf profiler will be disabled to prevent system-wide profiling." + ) + raise PerfNoSupportedEvent(f"Cgroup profiling failed: {e}") + elif processes_to_profile is not None: + # Traditional PID-based profiling self._pid_args.append("--pid") self._pid_args.append(",".join([str(process.pid) for process in processes_to_profile])) else: + # System-wide profiling self._pid_args.append("-a") + self._extra_args = extra_args self._switch_timeout_s = switch_timeout_s self._process: Optional[Popen] = None @@ -70,6 +138,44 @@ def _log_name(self) -> str: return f"perf ({self._type} mode)" def _get_perf_cmd(self) -> List[str]: + # When using cgroups, perf requires events to be specified before cgroups. + # If no explicit events are provided but cgroups are used, add default event. + # For multiple cgroups, perf requires one event per cgroup. + extra_args = self._extra_args + + # Separate extra_args into perf options and application command + # The "--" separator marks the boundary between perf args and the app command + perf_extra_args = [] + app_command = [] + separator_found = False + + for arg in extra_args: + if arg == "--": + separator_found = True + app_command.append(arg) + elif separator_found: + app_command.append(arg) + else: + perf_extra_args.append(arg) + + if self._cgroup_args and not perf_extra_args: + # Count the number of cgroups (they are comma-separated in -G argument) + cgroup_arg = None + for i, arg in enumerate(self._cgroup_args): + if arg == "-G" and i + 1 < len(self._cgroup_args): + cgroup_arg = self._cgroup_args[i + 1] + break + + if cgroup_arg: + num_cgroups = len(cgroup_arg.split(",")) + # Add one event per cgroup (perf requirement) + perf_extra_args = [] + for _ in range(num_cgroups): + perf_extra_args.extend(["-e", "cycles"]) + else: + # Fallback: single event + perf_extra_args = ["-e", "cycles"] + return ( [ perf_path(), @@ -88,9 +194,11 @@ def _get_perf_cmd(self) -> List[str]: "-m", str(self._MMAP_SIZES[self._type]), ] + + perf_extra_args # Events must come before cgroups + self._pid_args + + self._cgroup_args + (["-k", "1"] if self._inject_jit else []) - + self._extra_args + + app_command # Application command (with "--") must be last ) def start(self) -> None: diff --git a/tests/run_heartbeat_agent.py b/tests/run_heartbeat_agent.py new file mode 100644 index 000000000..14aed9e6a --- /dev/null +++ b/tests/run_heartbeat_agent.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 +""" +Test runner for the gProfiler agent with heartbeat mode enabled. + +This script demonstrates how to run the gProfiler agent in heartbeat mode +to receive dynamic profiling commands from the Performance Studio backend. +""" + +import os +import signal +import subprocess +import sys +from pathlib import Path + + +def run_gprofiler_heartbeat_mode(): + """Run gProfiler in heartbeat mode""" + + # Configuration - adjust these values for your environment + config = { + "server_token": "test-token", + "service_name": "test-service", + "api_server": "http://localhost:8000", # Performance Studio backend URL + "server_host": "http://localhost:8000", # Profile upload server URL (can be same) + "output_dir": "/tmp/gprofiler-test", + "log_file": "/tmp/gprofiler-heartbeat.log", + "heartbeat_interval": "10", # seconds + "verbose": True, + } + + # Ensure output directory exists + os.makedirs(config["output_dir"], exist_ok=True) + + # Build the command + gprofiler_path = Path(__file__).parent.parent / "gprofiler" / "main.py" + + cmd = [ + sys.executable, + str(gprofiler_path), + "--enable-heartbeat-server", + "--upload-results", + "--token", + config["server_token"], + "--service-name", + config["service_name"], + "--api-server", + config["api_server"], + "--server-host", + config["server_host"], + "--output-dir", + config["output_dir"], + "--log-file", + config["log_file"], + "--heartbeat-interval", + config["heartbeat_interval"], + "--no-verify", # For testing with localhost + ] + + if config["verbose"]: + cmd.append("--verbose") + + print("🤖 Starting gProfiler in heartbeat mode...") + print(f"📝 Command: {' '.join(cmd)}") + print("=" * 60) + print("The agent will:") + print("1. Send heartbeats to the backend every 10 seconds") + print("2. Wait for profiling commands from the server") + print("3. Execute start/stop commands as received") + print("4. Maintain idempotency for duplicate commands") + print("=" * 60) + print("💡 To test the system:") + print("1. Start the Performance Studio backend") + print("2. Run this script to start the agent") + print("3. Use the backend API to send profiling requests") + print("4. Watch the agent logs to see command execution") + print("=" * 60) + print("\n🚀 Starting agent... (Press Ctrl+C to stop)") + + try: + # Start the process + process = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, bufsize=1 + ) + + # Monitor output + for line in iter(process.stdout.readline, ""): + print(f"[AGENT] {line.rstrip()}") + + process.wait() + + except KeyboardInterrupt: + print("\n🛑 Received interrupt signal, stopping agent...") + if process: + process.send_signal(signal.SIGINT) + try: + process.wait(timeout=10) + except subprocess.TimeoutExpired: + print("⚠️ Process didn't stop gracefully, forcing termination...") + process.kill() + process.wait() + + except Exception as e: + print(f"❌ Error running gProfiler: {e}") + return 1 + + print("✅ Agent stopped") + return 0 + + +def print_usage(): + """Print usage instructions""" + print("📖 gProfiler Heartbeat Mode Test Runner") + print("=" * 50) + print("\nThis script runs gProfiler in heartbeat mode for testing.") + print("\nPrerequisites:") + print("1. Performance Studio backend running on http://localhost:8000") + print("2. gProfiler agent code in the expected location") + print("3. Python dependencies installed") + print("\nUsage:") + print(f" {sys.argv[0]}") + print("\nConfiguration:") + print("- Edit the 'config' dictionary in this script to customize settings") + print("- Logs will be written to /tmp/gprofiler-heartbeat.log") + print("- Profiles will be saved to /tmp/gprofiler-test/") + print("\nTesting flow:") + print("1. Start the backend server") + print("2. Run this script to start the agent") + print("3. Use test_heartbeat_system.py to send commands") + print("4. Watch the agent respond to commands") + + +def main(): + """Main function""" + if len(sys.argv) > 1 and sys.argv[1] in ["-h", "--help"]: + print_usage() + return 0 + + return run_gprofiler_heartbeat_mode() + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/test_heartbeat_system.py b/tests/test_heartbeat_system.py new file mode 100644 index 000000000..a0a15e976 --- /dev/null +++ b/tests/test_heartbeat_system.py @@ -0,0 +1,351 @@ +#!/usr/bin/env python3 +""" +Test script to verify the heartbeat-based profiling control system. + +This script demonstrates: +1. Agent sending heartbeat to backend +2. Backend responding with start/stop commands +3. Agent acting on commands with idempotency +4. Command completion acknowledgments + +Supports both mock mode (default) and live mode with real backend. +""" + +import sys +import time +import unittest.mock +from datetime import datetime +from typing import Any, Dict, Optional + +import requests + +# Configuration +BACKEND_URL = "http://localhost:8000" # Adjust based on your setup +SERVICE_NAME = "test-service" +HOSTNAME = "test-host" +IP_ADDRESS = "127.0.0.1" + +# Check if we should run in mock mode (no real backend) +MOCK_MODE = "--live" not in sys.argv # Default to mock mode unless --live specified + + +class HeartbeatClient: + """Client to simulate agent heartbeat behavior""" + + def __init__(self, backend_url: str, service_name: str, hostname: str, ip_address: str): + self.backend_url = backend_url.rstrip("/") + self.service_name = service_name + self.hostname = hostname + self.ip_address = ip_address + self.last_command_id: Optional[str] = None + self.executed_commands = set() + + def send_heartbeat(self) -> Optional[Dict[str, Any]]: + """Send heartbeat to backend and return response""" + heartbeat_data = { + "ip_address": self.ip_address, + "hostname": self.hostname, + "service_name": self.service_name, + "last_command_id": self.last_command_id, + "status": "active", + "timestamp": datetime.now().isoformat(), + } + + try: + response = requests.post(f"{self.backend_url}/api/metrics/heartbeat", json=heartbeat_data, timeout=10) + + if response.status_code == 200: + result = response.json() + print(f"✓ Heartbeat successful: {result.get('message')}") + + if result.get("profiling_command") and result.get("command_id"): + command_id = result["command_id"] + profiling_command = result["profiling_command"] + command_type = profiling_command.get("command_type", "unknown") + + print(f"📋 Received command: {command_type} (ID: {command_id})") + + # Check idempotency + if command_id in self.executed_commands: + print(f"⚠️ Command {command_id} already executed, skipping...") + return None + + # Mark as executed + self.executed_commands.add(command_id) + self.last_command_id = command_id + + return { + "command_type": command_type, + "command_id": command_id, + "profiling_command": profiling_command, + } + else: + print("📭 No pending commands") + return None + else: + print(f"❌ Heartbeat failed: {response.status_code} - {response.text}") + return None + + except Exception as e: + print(f"❌ Heartbeat error: {e}") + return None + + def send_command_completion( + self, command_id: str, status: str, execution_time: int = 0, error_message: str = None, results_path: str = None + ) -> bool: + """Send command completion status to backend""" + completion_data = { + "command_id": command_id, + "hostname": self.hostname, + "status": status, + "execution_time": execution_time, + "error_message": error_message, + "results_path": results_path, + } + + try: + response = requests.post( + f"{self.backend_url}/api/metrics/command_completion", json=completion_data, timeout=10 + ) + + if response.status_code == 200: + print(f"✅ Command completion sent successfully for {command_id} with status: {status}") + return True + else: + print(f"❌ Failed to send command completion: {response.status_code} - {response.text}") + return False + + except Exception as e: + print(f"❌ Error sending command completion: {e}") + return False + + def simulate_profiling_action(self, command_type: str, command_id: str): + """Simulate profiling action (start/stop)""" + if command_type == "start": + print(f"🚀 Starting profiler for command {command_id}") + # Simulate profiling work + time.sleep(2) + print("✅ Profiler completed successfully") + # Send completion acknowledgment + self.send_command_completion(command_id, "completed", execution_time=2) + elif command_type == "stop": + print(f"🛑 Stopping profiler for command {command_id}") + # Simulate stopping + time.sleep(1) + print("✅ Profiler stopped successfully") + # Send completion acknowledgment + self.send_command_completion(command_id, "completed", execution_time=1) + else: + print(f"⚠️ Unknown command type: {command_type}") + # Send failure acknowledgment + self.send_command_completion(command_id, "failed", error_message=f"Unknown command type: {command_type}") + + +def create_test_profiling_request(backend_url: str, service_name: str, command_type: str = "start") -> bool: + """Create a test profiling request""" + request_data = { + "service_name": service_name, + "command_type": command_type, + "duration": 60, + "frequency": 11, + "profiling_mode": "cpu", + "target_hostnames": [HOSTNAME], + "additional_args": {"test": True}, + } + + try: + response = requests.post(f"{backend_url}/api/metrics/profile_request", json=request_data, timeout=10) + + if response.status_code == 200: + result = response.json() + print(f"✅ Profiling request created: {result.get('message')}") + print(f" Request ID: {result.get('request_id')}") + print(f" Command ID: {result.get('command_id')}") + return True + else: + print(f"❌ Failed to create profiling request: {response.status_code} - {response.text}") + return False + + except Exception as e: + print(f"❌ Error creating profiling request: {e}") + return False + + +def create_mock_responses(): + """Create mock responses for testing without a real backend""" + mock_state = {"pending_commands": [], "completed_commands": [], "heartbeat_count": 0} + + def mock_heartbeat_post(url, json=None, timeout=None): # noqa: F811 + """Mock heartbeat endpoint""" + mock_state["heartbeat_count"] += 1 + + # Mock response object + response = unittest.mock.Mock() + response.status_code = 200 + + # Check if there are pending commands + if mock_state["pending_commands"]: + command = mock_state["pending_commands"].pop(0) + response.json.return_value = { + "message": "Heartbeat received", + "command_id": command["command_id"], + "profiling_command": command["profiling_command"], + } + else: + response.json.return_value = {"message": "Heartbeat received, no pending commands"} + + return response + + def mock_profile_request_post(url, json=None, timeout=None): # noqa: F811 + """Mock profile request endpoint""" + # Generate unique IDs based on total requests made + total_requests = len(mock_state["completed_commands"]) + len(mock_state["pending_commands"]) + 1 + command_id = f"cmd_{total_requests}" + request_id = f"req_{total_requests}" + + # Add command to pending queue + mock_state["pending_commands"].append( + { + "command_id": command_id, + "profiling_command": { + "command_type": json.get("command_type", "start"), + "combined_config": { + "duration": json.get("duration", 60), + "frequency": json.get("frequency", 11), + "profiling_mode": json.get("profiling_mode", "cpu"), + }, + }, + } + ) + + response = unittest.mock.Mock() + response.status_code = 200 + response.json.return_value = { + "message": "Profiling request created", + "request_id": request_id, + "command_id": command_id, + } + + return response + + def mock_command_completion_post(url, json=None, timeout=None): # noqa: F811 + """Mock command completion endpoint""" + mock_state["completed_commands"].append( + { + "command_id": json.get("command_id"), + "status": json.get("status"), + "execution_time": json.get("execution_time"), + } + ) + + response = unittest.mock.Mock() + response.status_code = 200 + response.json.return_value = {"message": "Command completion received"} + + return response + + def mock_post(url, json=None, timeout=None): # noqa: F811 + """Route mock requests to appropriate handlers""" + if "/heartbeat" in url: + return mock_heartbeat_post(url, json, timeout) + elif "/profile_request" in url: + return mock_profile_request_post(url, json, timeout) + elif "/command_completion" in url: + return mock_command_completion_post(url, json, timeout) + else: + # Unknown endpoint + response = unittest.mock.Mock() + response.status_code = 404 + response.text = "Not found" + return response + + return mock_post, mock_state + + +def run_tests(): + """Run the actual test logic""" + + # Initialize test client + client = HeartbeatClient(BACKEND_URL, SERVICE_NAME, HOSTNAME, IP_ADDRESS) + + # Test 1: Send initial heartbeat (should have no commands) + print("\n1️⃣ Test: Initial heartbeat (no commands expected)") + client.send_heartbeat() + + # Test 2: Create a START profiling request + print("\n2️⃣ Test: Create START profiling request") + if create_test_profiling_request(BACKEND_URL, SERVICE_NAME, "start"): + time.sleep(0.1) # Give backend time to process + + # Send heartbeat to receive the command + print("\n 📡 Sending heartbeat to receive command...") + command = client.send_heartbeat() + + if command: + client.simulate_profiling_action(command["command_type"], command["command_id"]) + + # Test idempotency - send heartbeat again + print("\n 🔄 Testing idempotency - sending heartbeat again...") + command = client.send_heartbeat() + if command is None: + print("✅ Idempotency working - no duplicate command received") + + # Test 3: Create a STOP profiling request + print("\n3️⃣ Test: Create STOP profiling request") + if create_test_profiling_request(BACKEND_URL, SERVICE_NAME, "stop"): + time.sleep(0.1) # Give backend time to process + + # Send heartbeat to receive the stop command + print("\n 📡 Sending heartbeat to receive stop command...") + command = client.send_heartbeat() + + if command: + client.simulate_profiling_action(command["command_type"], command["command_id"]) + + # Test 4: Multiple heartbeats with no commands + print("\n4️⃣ Test: Multiple heartbeats with no pending commands") + for i in range(3): + print(f"\n Heartbeat {i+1}/3:") + client.send_heartbeat() + time.sleep(0.1) + + print("\n✅ Test completed!") + print("\nTest Summary:") + print(f" - Executed commands: {len(client.executed_commands)}") + print(f" - Last command ID: {client.last_command_id}") + print(f" - Commands executed: {list(client.executed_commands)}") + + +def main(): + """Main test function""" + print("🧪 Testing Heartbeat-Based Profiling Control System") + + if MOCK_MODE: + print("🎭 Running in MOCK MODE (no real backend required)") + print(" Use --live flag to test against real backend on localhost:8000") + mock_post, mock_state = create_mock_responses() + + # Patch requests.post for mock mode + with unittest.mock.patch("requests.post", side_effect=mock_post): + print("=" * 60) + run_tests() + + # Print mock state summary + print("\n📊 Mock Backend State:") + print(f" - Total heartbeats: {mock_state['heartbeat_count']}") + print(f" - Pending commands: {len(mock_state['pending_commands'])}") + print(f" - Completed commands: {len(mock_state['completed_commands'])}") + + if mock_state["completed_commands"]: + print(" - Command completions:") + for cmd in mock_state["completed_commands"]: + print(f" * {cmd['command_id']}: {cmd['status']} ({cmd['execution_time']}s)") + + else: + print("🌐 Running in LIVE MODE (requires backend on localhost:8000)") + print("=" * 60) + run_tests() + + +if __name__ == "__main__": + main()