1# Copyright 2024 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Manages OpenTelemetry tracing span creation and handling. This is a PREVIEW FEATURE: Coverage and functionality may change.""" 
    16 
    17import logging 
    18import os 
    19 
    20from contextlib import contextmanager 
    21from urllib.parse import urlparse 
    22from google.api_core import exceptions as api_exceptions 
    23from google.api_core import retry as api_retry 
    24from google.cloud.storage import __version__ 
    25from google.cloud.storage.retry import ConditionalRetryPolicy 
    26 
    27 
    28ENABLE_OTEL_TRACES_ENV_VAR = "ENABLE_GCS_PYTHON_CLIENT_OTEL_TRACES" 
    29_DEFAULT_ENABLE_OTEL_TRACES_VALUE = False 
    30 
    31 
    32def _parse_bool_env(name: str, default: bool = False) -> bool: 
    33    val = os.environ.get(name, None) 
    34    if val is None: 
    35        return default 
    36    return str(val).strip().lower() in {"1", "true", "yes", "on"} 
    37 
    38 
    39enable_otel_traces = _parse_bool_env( 
    40    ENABLE_OTEL_TRACES_ENV_VAR, _DEFAULT_ENABLE_OTEL_TRACES_VALUE 
    41) 
    42logger = logging.getLogger(__name__) 
    43 
    44try: 
    45    from opentelemetry import trace 
    46 
    47    HAS_OPENTELEMETRY = True 
    48 
    49except ImportError: 
    50    logger.debug( 
    51        "This service is instrumented using OpenTelemetry. " 
    52        "OpenTelemetry or one of its components could not be imported; " 
    53        "please add compatible versions of opentelemetry-api and " 
    54        "opentelemetry-instrumentation packages in order to get Storage " 
    55        "Tracing data." 
    56    ) 
    57    HAS_OPENTELEMETRY = False 
    58 
    59_default_attributes = { 
    60    "rpc.service": "CloudStorage", 
    61    "rpc.system": "http", 
    62    "user_agent.original": f"gcloud-python/{__version__}", 
    63} 
    64 
    65_cloud_trace_adoption_attrs = { 
    66    "gcp.client.service": "storage", 
    67    "gcp.client.version": __version__, 
    68    "gcp.client.repo": "googleapis/python-storage", 
    69} 
    70 
    71 
    72@contextmanager 
    73def create_trace_span(name, attributes=None, client=None, api_request=None, retry=None): 
    74    """Creates a context manager for a new span and set it as the current span 
    75    in the configured tracer. If no configuration exists yields None.""" 
    76    if not HAS_OPENTELEMETRY or not enable_otel_traces: 
    77        yield None 
    78        return 
    79 
    80    tracer = trace.get_tracer(__name__) 
    81    final_attributes = _get_final_attributes(attributes, client, api_request, retry) 
    82    # Yield new span. 
    83    with tracer.start_as_current_span( 
    84        name=name, kind=trace.SpanKind.CLIENT, attributes=final_attributes 
    85    ) as span: 
    86        try: 
    87            yield span 
    88        except api_exceptions.GoogleAPICallError as error: 
    89            span.set_status(trace.Status(trace.StatusCode.ERROR)) 
    90            span.record_exception(error) 
    91            raise 
    92 
    93 
    94def _get_final_attributes(attributes=None, client=None, api_request=None, retry=None): 
    95    collected_attr = _default_attributes.copy() 
    96    collected_attr.update(_cloud_trace_adoption_attrs) 
    97    if api_request: 
    98        collected_attr.update(_set_api_request_attr(api_request, client)) 
    99    if isinstance(retry, api_retry.Retry): 
    100        collected_attr.update(_set_retry_attr(retry)) 
    101    if isinstance(retry, ConditionalRetryPolicy): 
    102        collected_attr.update( 
    103            _set_retry_attr(retry.retry_policy, retry.conditional_predicate) 
    104        ) 
    105    if attributes: 
    106        collected_attr.update(attributes) 
    107    final_attributes = {k: v for k, v in collected_attr.items() if v is not None} 
    108    return final_attributes 
    109 
    110 
    111def _set_api_request_attr(request, client): 
    112    attr = {} 
    113    if request.get("method"): 
    114        attr["http.request.method"] = request.get("method") 
    115    if request.get("path"): 
    116        full_url = client._connection.build_api_url(request.get("path")) 
    117        attr.update(_get_opentelemetry_attributes_from_url(full_url, strip_query=True)) 
    118    if "timeout" in request: 
    119        attr["connect_timeout,read_timeout"] = str(request.get("timeout")) 
    120    return attr 
    121 
    122 
    123def _set_retry_attr(retry, conditional_predicate=None): 
    124    predicate = conditional_predicate if conditional_predicate else retry._predicate 
    125    retry_info = f"multiplier{retry._multiplier}/deadline{retry._deadline}/max{retry._maximum}/initial{retry._initial}/predicate{predicate}" 
    126    return {"retry": retry_info} 
    127 
    128 
    129def _get_opentelemetry_attributes_from_url(url, strip_query=True): 
    130    """Helper to assemble OpenTelemetry span attributes from a URL.""" 
    131    u = urlparse(url) 
    132    netloc = u.netloc 
    133    # u.hostname is always lowercase. We parse netloc to preserve casing. 
    134    # netloc format: [userinfo@]host[:port] 
    135    if "@" in netloc: 
    136        netloc = netloc.split("@", 1)[1] 
    137    if ":" in netloc and not netloc.endswith("]"):  # Handle IPv6 literal 
    138        netloc = netloc.split(":", 1)[0] 
    139 
    140    attributes = { 
    141        "server.address": netloc, 
    142        "server.port": u.port, 
    143        "url.scheme": u.scheme, 
    144        "url.path": u.path, 
    145    } 
    146    if not strip_query: 
    147        attributes["url.query"] = u.query 
    148 
    149    return attributes