fix(metrics): correct GFE metrics extraction and enable by default#17561
fix(metrics): correct GFE metrics extraction and enable by default#17561sinhasubham wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces deferred metrics recording for streaming and async streaming RPC responses in Google Cloud Spanner by wrapping responses in specialized wrappers. It also implements GFE latency extraction from response metadata and adds corresponding tests. The review feedback highlights several critical improvements for robustness: refining streaming response detection to check for iterator methods (next and anext) rather than iterable methods to prevent incorrect wrapping of standard iterables; wrapping telemetry and metrics recording blocks in try-except blocks to ensure telemetry failures do not disrupt the main application flow; defensively validating metadata elements before unpacking to avoid unpacking errors; and properly decoding bytes metadata values before regex matching.
| if hasattr(response, "__anext__") or hasattr(response, "__aiter__"): | ||
| return _AsyncStreamingResponseWrapper(response, tracer) | ||
| elif hasattr(response, "__next__") or hasattr(response, "__iter__"): | ||
| return _StreamingResponseWrapper(response, tracer) |
There was a problem hiding this comment.
Checking hasattr(response, "__iter__") or hasattr(response, "__aiter__") is too broad because many standard unary response types (such as lists, dicts, tuples, or custom iterables) are iterable but are not streaming responses. If a unary RPC returns an iterable, it will be incorrectly wrapped in _StreamingResponseWrapper, which will break the caller's expectations.
Additionally, in unit tests, MagicMock objects have __iter__ by default, which causes them to be incorrectly wrapped.
To correctly identify streaming responses, we should only check for the iterator protocol methods __next__ and __anext__. Any gRPC streaming response is an iterator and must implement these methods.
| if hasattr(response, "__anext__") or hasattr(response, "__aiter__"): | |
| return _AsyncStreamingResponseWrapper(response, tracer) | |
| elif hasattr(response, "__next__") or hasattr(response, "__iter__"): | |
| return _StreamingResponseWrapper(response, tracer) | |
| if hasattr(response, "__anext__"): | |
| return _AsyncStreamingResponseWrapper(response, tracer) | |
| elif hasattr(response, "__next__"): | |
| return _StreamingResponseWrapper(response, tracer) |
| else: | ||
| # Unary call: execute completion and record metrics immediately | ||
| tracer.record_attempt_completion() | ||
| metadata = [] | ||
| if hasattr(response, "initial_metadata"): | ||
| try: | ||
| metadata.extend(response.initial_metadata() or []) | ||
| except Exception: | ||
| pass | ||
| if hasattr(response, "trailing_metadata"): | ||
| try: | ||
| metadata.extend(response.trailing_metadata() or []) | ||
| except Exception: | ||
| pass | ||
| tracer.record_gfe_metrics(metadata) | ||
| return response |
There was a problem hiding this comment.
The metrics recording block for unary calls is not wrapped in a try-except block. If tracer.record_attempt_completion() or tracer.record_gfe_metrics(metadata) raises an exception (e.g., due to OpenTelemetry configuration issues or unexpected metadata formats), it will crash the entire unary RPC call and prevent the response from being returned. Telemetry and metrics collection should be non-blocking and fail-safe, meaning they should never disrupt the main application flow. Avoid broad except Exception: blocks that silently pass; instead, log the exception to aid in debugging.
| else: | |
| # Unary call: execute completion and record metrics immediately | |
| tracer.record_attempt_completion() | |
| metadata = [] | |
| if hasattr(response, "initial_metadata"): | |
| try: | |
| metadata.extend(response.initial_metadata() or []) | |
| except Exception: | |
| pass | |
| if hasattr(response, "trailing_metadata"): | |
| try: | |
| metadata.extend(response.trailing_metadata() or []) | |
| except Exception: | |
| pass | |
| tracer.record_gfe_metrics(metadata) | |
| return response | |
| else: | |
| # Unary call: execute completion and record metrics immediately | |
| try: | |
| tracer.record_attempt_completion() | |
| metadata = [] | |
| if hasattr(response, "initial_metadata"): | |
| try: | |
| metadata.extend(response.initial_metadata() or []) | |
| except Exception as e: | |
| logger.warning(f"Failed to retrieve initial metadata: {e}") | |
| if hasattr(response, "trailing_metadata"): | |
| try: | |
| metadata.extend(response.trailing_metadata() or []) | |
| except Exception as e: | |
| logger.warning(f"Failed to retrieve trailing metadata: {e}") | |
| tracer.record_gfe_metrics(metadata) | |
| except Exception as e: | |
| logger.warning(f"Failed to record metrics: {e}") | |
| return response |
References
- Avoid broad
except Exception:blocks that silently returnNone. Instead, log the exception (e.g., usinglogger.warning) to aid in debugging and prevent masking underlying issues.
| def _record_metrics(self): | ||
| if self._metrics_recorded: | ||
| return | ||
| self._metrics_recorded = True | ||
| self._tracer.record_attempt_completion() | ||
| metadata = [] | ||
| if hasattr(self._response, "initial_metadata"): | ||
| try: | ||
| metadata.extend(self._response.initial_metadata() or []) | ||
| except Exception: | ||
| pass | ||
| if hasattr(self._response, "trailing_metadata"): | ||
| try: | ||
| metadata.extend(self._response.trailing_metadata() or []) | ||
| except Exception: | ||
| pass | ||
| self._tracer.record_gfe_metrics(metadata) |
There was a problem hiding this comment.
Similar to the unary call metrics recording, if any exception is raised during _record_metrics (e.g., in record_attempt_completion or record_gfe_metrics), it will propagate to the caller of __next__. This can mask the original StopIteration or other exceptions, or crash the application during stream consumption. We should wrap the entire metrics recording logic in a try-except block to ensure telemetry failures are safe, and log any exceptions to avoid silent failures.
| def _record_metrics(self): | |
| if self._metrics_recorded: | |
| return | |
| self._metrics_recorded = True | |
| self._tracer.record_attempt_completion() | |
| metadata = [] | |
| if hasattr(self._response, "initial_metadata"): | |
| try: | |
| metadata.extend(self._response.initial_metadata() or []) | |
| except Exception: | |
| pass | |
| if hasattr(self._response, "trailing_metadata"): | |
| try: | |
| metadata.extend(self._response.trailing_metadata() or []) | |
| except Exception: | |
| pass | |
| self._tracer.record_gfe_metrics(metadata) | |
| def _record_metrics(self): | |
| if self._metrics_recorded: | |
| return | |
| self._metrics_recorded = True | |
| try: | |
| self._tracer.record_attempt_completion() | |
| metadata = [] | |
| if hasattr(self._response, "initial_metadata"): | |
| try: | |
| metadata.extend(self._response.initial_metadata() or []) | |
| except Exception as e: | |
| logger.warning(f"Failed to retrieve initial metadata: {e}") | |
| if hasattr(self._response, "trailing_metadata"): | |
| try: | |
| metadata.extend(self._response.trailing_metadata() or []) | |
| except Exception as e: | |
| logger.warning(f"Failed to retrieve trailing metadata: {e}") | |
| self._tracer.record_gfe_metrics(metadata) | |
| except Exception as e: | |
| logger.warning(f"Failed to record metrics: {e}") |
References
- Avoid broad
except Exception:blocks that silently returnNone. Instead, log the exception (e.g., usinglogger.warning) to aid in debugging and prevent masking underlying issues.
| def _record_metrics(self): | ||
| if self._metrics_recorded: | ||
| return | ||
| self._metrics_recorded = True | ||
| self._tracer.record_attempt_completion() | ||
| metadata = [] | ||
| if hasattr(self._response, "initial_metadata"): | ||
| try: | ||
| metadata.extend(self._response.initial_metadata() or []) | ||
| except Exception: | ||
| pass | ||
| if hasattr(self._response, "trailing_metadata"): | ||
| try: | ||
| metadata.extend(self._response.trailing_metadata() or []) | ||
| except Exception: | ||
| pass | ||
| self._tracer.record_gfe_metrics(metadata) |
There was a problem hiding this comment.
For the asynchronous streaming wrapper, we should also wrap the metrics recording logic in a try-except block to prevent telemetry failures from crashing the async stream or masking StopAsyncIteration. Ensure exceptions are logged rather than silently ignored.
| def _record_metrics(self): | |
| if self._metrics_recorded: | |
| return | |
| self._metrics_recorded = True | |
| self._tracer.record_attempt_completion() | |
| metadata = [] | |
| if hasattr(self._response, "initial_metadata"): | |
| try: | |
| metadata.extend(self._response.initial_metadata() or []) | |
| except Exception: | |
| pass | |
| if hasattr(self._response, "trailing_metadata"): | |
| try: | |
| metadata.extend(self._response.trailing_metadata() or []) | |
| except Exception: | |
| pass | |
| self._tracer.record_gfe_metrics(metadata) | |
| def _record_metrics(self): | |
| if self._metrics_recorded: | |
| return | |
| self._metrics_recorded = True | |
| try: | |
| self._tracer.record_attempt_completion() | |
| metadata = [] | |
| if hasattr(self._response, "initial_metadata"): | |
| try: | |
| metadata.extend(self._response.initial_metadata() or []) | |
| except Exception as e: | |
| logger.warning(f"Failed to retrieve initial metadata: {e}") | |
| if hasattr(self._response, "trailing_metadata"): | |
| try: | |
| metadata.extend(self._response.trailing_metadata() or []) | |
| except Exception as e: | |
| logger.warning(f"Failed to retrieve trailing metadata: {e}") | |
| self._tracer.record_gfe_metrics(metadata) | |
| except Exception as e: | |
| logger.warning(f"Failed to record metrics: {e}") |
References
- Avoid broad
except Exception:blocks that silently returnNone. Instead, log the exception (e.g., usinglogger.warning) to aid in debugging and prevent masking underlying issues.
| elif isinstance(metadata, (list, tuple)): | ||
| for key, val in metadata: | ||
| if key and str(key).lower() in ("server-timing", "server_timing"): | ||
| if isinstance(val, (list, tuple)): | ||
| header_vals.extend(val) | ||
| else: | ||
| header_vals.append(val) |
There was a problem hiding this comment.
When iterating over metadata as a list or tuple, unpacking for key, val in metadata assumes that every element in metadata is a sequence of exactly two elements. If metadata contains any malformed elements (e.g., a 1-tuple, a string, or None), this will raise a ValueError and crash the metrics extraction. We should defensively verify that each item is a sequence of length 2 before unpacking.
| elif isinstance(metadata, (list, tuple)): | |
| for key, val in metadata: | |
| if key and str(key).lower() in ("server-timing", "server_timing"): | |
| if isinstance(val, (list, tuple)): | |
| header_vals.extend(val) | |
| else: | |
| header_vals.append(val) | |
| elif isinstance(metadata, (list, tuple)): | |
| for item in metadata: | |
| if isinstance(item, (list, tuple)) and len(item) == 2: | |
| key, val = item | |
| if key and str(key).lower() in ("server-timing", "server_timing"): | |
| if isinstance(val, (list, tuple)): | |
| header_vals.extend(val) | |
| else: | |
| header_vals.append(val) |
| if not isinstance(header_val, str): | ||
| header_val = str(header_val) |
There was a problem hiding this comment.
If header_val is of type bytes (which is common for gRPC metadata values), calling str(header_val) in Python 3 will produce the string representation b'...' (including the literal b and quotes). This will prevent the regex from matching correctly or cause unexpected behavior. We should decode bytes to str using .decode("utf-8") first.
if isinstance(header_val, bytes):
try:
header_val = header_val.decode("utf-8")
except Exception:
header_val = str(header_val)
elif not isinstance(header_val, str):
header_val = str(header_val)
Description
This PR resolves a critical issue where Spanner GFE (Google Front End) latency metrics were not being properly captured, and ensures these metrics are always enabled by default.
Key Changes:
MetricsInterceptorwhereinitial_metadata()returning standard headers (e.g.,content-type) maskedtrailing_metadata(). Theserver-timingheader is now properly extracted from both initial and trailing metadata._StreamingResponseWrapperand_AsyncStreamingResponseWrapperto the interceptor. This correctly defers metrics recording until the streaming iterators finish, ensuringtrailing_metadatais fully populated and available before attempting extraction.gfe_enabledtoggle inSpannerMetricsTracerFactory. GFE metrics capture is now always-on whenever OpenTelemetry tracing is enabled.