Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/storage/_opentelemetry_tracing.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

79 statements  

1# Copyright 2024 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Manages OpenTelemetry tracing span creation and handling. This is a PREVIEW FEATURE: Coverage and functionality may change.""" 

16 

17import logging 

18import os 

19 

20from contextlib import contextmanager 

21from urllib.parse import urlparse 

22from google.api_core import exceptions as api_exceptions 

23from google.api_core import retry as api_retry 

24from google.cloud.storage import __version__ 

25from google.cloud.storage.retry import ConditionalRetryPolicy 

26 

27 

28ENABLE_OTEL_TRACES_ENV_VAR = "ENABLE_GCS_PYTHON_CLIENT_OTEL_TRACES" 

29_DEFAULT_ENABLE_OTEL_TRACES_VALUE = False 

30 

31 

32def _parse_bool_env(name: str, default: bool = False) -> bool: 

33 val = os.environ.get(name, None) 

34 if val is None: 

35 return default 

36 return str(val).strip().lower() in {"1", "true", "yes", "on"} 

37 

38 

39enable_otel_traces = _parse_bool_env( 

40 ENABLE_OTEL_TRACES_ENV_VAR, _DEFAULT_ENABLE_OTEL_TRACES_VALUE 

41) 

42logger = logging.getLogger(__name__) 

43 

44try: 

45 from opentelemetry import trace 

46 

47 HAS_OPENTELEMETRY = True 

48 

49except ImportError: 

50 logger.debug( 

51 "This service is instrumented using OpenTelemetry. " 

52 "OpenTelemetry or one of its components could not be imported; " 

53 "please add compatible versions of opentelemetry-api and " 

54 "opentelemetry-instrumentation packages in order to get Storage " 

55 "Tracing data." 

56 ) 

57 HAS_OPENTELEMETRY = False 

58 

59_default_attributes = { 

60 "rpc.service": "CloudStorage", 

61 "rpc.system": "http", 

62 "user_agent.original": f"gcloud-python/{__version__}", 

63} 

64 

65_cloud_trace_adoption_attrs = { 

66 "gcp.client.service": "storage", 

67 "gcp.client.version": __version__, 

68 "gcp.client.repo": "googleapis/python-storage", 

69} 

70 

71 

72@contextmanager 

73def create_trace_span(name, attributes=None, client=None, api_request=None, retry=None): 

74 """Creates a context manager for a new span and set it as the current span 

75 in the configured tracer. If no configuration exists yields None.""" 

76 if not HAS_OPENTELEMETRY or not enable_otel_traces: 

77 yield None 

78 return 

79 

80 tracer = trace.get_tracer(__name__) 

81 final_attributes = _get_final_attributes(attributes, client, api_request, retry) 

82 # Yield new span. 

83 with tracer.start_as_current_span( 

84 name=name, kind=trace.SpanKind.CLIENT, attributes=final_attributes 

85 ) as span: 

86 try: 

87 yield span 

88 except api_exceptions.GoogleAPICallError as error: 

89 span.set_status(trace.Status(trace.StatusCode.ERROR)) 

90 span.record_exception(error) 

91 raise 

92 

93 

94def _get_final_attributes(attributes=None, client=None, api_request=None, retry=None): 

95 collected_attr = _default_attributes.copy() 

96 collected_attr.update(_cloud_trace_adoption_attrs) 

97 if api_request: 

98 collected_attr.update(_set_api_request_attr(api_request, client)) 

99 if isinstance(retry, api_retry.Retry): 

100 collected_attr.update(_set_retry_attr(retry)) 

101 if isinstance(retry, ConditionalRetryPolicy): 

102 collected_attr.update( 

103 _set_retry_attr(retry.retry_policy, retry.conditional_predicate) 

104 ) 

105 if attributes: 

106 collected_attr.update(attributes) 

107 final_attributes = {k: v for k, v in collected_attr.items() if v is not None} 

108 return final_attributes 

109 

110 

111def _set_api_request_attr(request, client): 

112 attr = {} 

113 if request.get("method"): 

114 attr["http.request.method"] = request.get("method") 

115 if request.get("path"): 

116 full_url = client._connection.build_api_url(request.get("path")) 

117 attr.update(_get_opentelemetry_attributes_from_url(full_url, strip_query=True)) 

118 if "timeout" in request: 

119 attr["connect_timeout,read_timeout"] = str(request.get("timeout")) 

120 return attr 

121 

122 

123def _set_retry_attr(retry, conditional_predicate=None): 

124 predicate = conditional_predicate if conditional_predicate else retry._predicate 

125 retry_info = f"multiplier{retry._multiplier}/deadline{retry._deadline}/max{retry._maximum}/initial{retry._initial}/predicate{predicate}" 

126 return {"retry": retry_info} 

127 

128 

129def _get_opentelemetry_attributes_from_url(url, strip_query=True): 

130 """Helper to assemble OpenTelemetry span attributes from a URL.""" 

131 u = urlparse(url) 

132 netloc = u.netloc 

133 # u.hostname is always lowercase. We parse netloc to preserve casing. 

134 # netloc format: [userinfo@]host[:port] 

135 if "@" in netloc: 

136 netloc = netloc.split("@", 1)[1] 

137 if ":" in netloc and not netloc.endswith("]"): # Handle IPv6 literal 

138 netloc = netloc.split(":", 1)[0] 

139 

140 attributes = { 

141 "server.address": netloc, 

142 "server.port": u.port, 

143 "url.scheme": u.scheme, 

144 "url.path": u.path, 

145 } 

146 if not strip_query: 

147 attributes["url.query"] = u.query 

148 

149 return attributes