1# Copyright 2024 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Manages OpenTelemetry tracing span creation and handling. This is a PREVIEW FEATURE: Coverage and functionality may change."""
16
17import logging
18import os
19
20from contextlib import contextmanager
21from urllib.parse import urlparse
22from google.api_core import exceptions as api_exceptions
23from google.api_core import retry as api_retry
24from google.cloud.storage import __version__
25from google.cloud.storage.retry import ConditionalRetryPolicy
26
27
28ENABLE_OTEL_TRACES_ENV_VAR = "ENABLE_GCS_PYTHON_CLIENT_OTEL_TRACES"
29_DEFAULT_ENABLE_OTEL_TRACES_VALUE = False
30
31
32def _parse_bool_env(name: str, default: bool = False) -> bool:
33 val = os.environ.get(name, None)
34 if val is None:
35 return default
36 return str(val).strip().lower() in {"1", "true", "yes", "on"}
37
38
39enable_otel_traces = _parse_bool_env(
40 ENABLE_OTEL_TRACES_ENV_VAR, _DEFAULT_ENABLE_OTEL_TRACES_VALUE
41)
42logger = logging.getLogger(__name__)
43
44try:
45 from opentelemetry import trace
46
47 HAS_OPENTELEMETRY = True
48
49except ImportError:
50 logger.debug(
51 "This service is instrumented using OpenTelemetry. "
52 "OpenTelemetry or one of its components could not be imported; "
53 "please add compatible versions of opentelemetry-api and "
54 "opentelemetry-instrumentation packages in order to get Storage "
55 "Tracing data."
56 )
57 HAS_OPENTELEMETRY = False
58
59_default_attributes = {
60 "rpc.service": "CloudStorage",
61 "rpc.system": "http",
62 "user_agent.original": f"gcloud-python/{__version__}",
63}
64
65_cloud_trace_adoption_attrs = {
66 "gcp.client.service": "storage",
67 "gcp.client.version": __version__,
68 "gcp.client.repo": "googleapis/python-storage",
69}
70
71
72@contextmanager
73def create_trace_span(name, attributes=None, client=None, api_request=None, retry=None):
74 """Creates a context manager for a new span and set it as the current span
75 in the configured tracer. If no configuration exists yields None."""
76 if not HAS_OPENTELEMETRY or not enable_otel_traces:
77 yield None
78 return
79
80 tracer = trace.get_tracer(__name__)
81 final_attributes = _get_final_attributes(attributes, client, api_request, retry)
82 # Yield new span.
83 with tracer.start_as_current_span(
84 name=name, kind=trace.SpanKind.CLIENT, attributes=final_attributes
85 ) as span:
86 try:
87 yield span
88 except api_exceptions.GoogleAPICallError as error:
89 span.set_status(trace.Status(trace.StatusCode.ERROR))
90 span.record_exception(error)
91 raise
92
93
94def _get_final_attributes(attributes=None, client=None, api_request=None, retry=None):
95 collected_attr = _default_attributes.copy()
96 collected_attr.update(_cloud_trace_adoption_attrs)
97 if api_request:
98 collected_attr.update(_set_api_request_attr(api_request, client))
99 if isinstance(retry, api_retry.Retry):
100 collected_attr.update(_set_retry_attr(retry))
101 if isinstance(retry, ConditionalRetryPolicy):
102 collected_attr.update(
103 _set_retry_attr(retry.retry_policy, retry.conditional_predicate)
104 )
105 if attributes:
106 collected_attr.update(attributes)
107 final_attributes = {k: v for k, v in collected_attr.items() if v is not None}
108 return final_attributes
109
110
111def _set_api_request_attr(request, client):
112 attr = {}
113 if request.get("method"):
114 attr["http.request.method"] = request.get("method")
115 if request.get("path"):
116 full_url = client._connection.build_api_url(request.get("path"))
117 attr.update(_get_opentelemetry_attributes_from_url(full_url, strip_query=True))
118 if "timeout" in request:
119 attr["connect_timeout,read_timeout"] = str(request.get("timeout"))
120 return attr
121
122
123def _set_retry_attr(retry, conditional_predicate=None):
124 predicate = conditional_predicate if conditional_predicate else retry._predicate
125 retry_info = f"multiplier{retry._multiplier}/deadline{retry._deadline}/max{retry._maximum}/initial{retry._initial}/predicate{predicate}"
126 return {"retry": retry_info}
127
128
129def _get_opentelemetry_attributes_from_url(url, strip_query=True):
130 """Helper to assemble OpenTelemetry span attributes from a URL."""
131 u = urlparse(url)
132 netloc = u.netloc
133 # u.hostname is always lowercase. We parse netloc to preserve casing.
134 # netloc format: [userinfo@]host[:port]
135 if "@" in netloc:
136 netloc = netloc.split("@", 1)[1]
137 if ":" in netloc and not netloc.endswith("]"): # Handle IPv6 literal
138 netloc = netloc.split(":", 1)[0]
139
140 attributes = {
141 "server.address": netloc,
142 "server.port": u.port,
143 "url.scheme": u.scheme,
144 "url.path": u.path,
145 }
146 if not strip_query:
147 attributes["url.query"] = u.query
148
149 return attributes