Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery/opentelemetry_tracing.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

66 statements  

1# Copyright 2020 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import logging 

16from contextlib import contextmanager 

17from google.api_core.exceptions import GoogleAPICallError # type: ignore 

18 

19logger = logging.getLogger(__name__) 

20try: 

21 from opentelemetry import trace # type: ignore 

22 from opentelemetry.instrumentation.utils import http_status_to_status_code # type: ignore 

23 from opentelemetry.trace.status import Status # type: ignore 

24 

25 HAS_OPENTELEMETRY = True 

26 _warned_telemetry = True 

27 

28except ImportError: 

29 HAS_OPENTELEMETRY = False 

30 _warned_telemetry = False 

31 

32_default_attributes = { 

33 "db.system": "BigQuery" 

34} # static, default values assigned to all spans 

35 

36 

37@contextmanager 

38def create_span(name, attributes=None, client=None, job_ref=None): 

39 """Creates a ContextManager for a Span to be exported to the configured exporter. 

40 If no configuration exists yields None. 

41 

42 Args: 

43 name (str): Name that will be set for the span being created 

44 attributes (Optional[dict]): 

45 Additional attributes that pertain to 

46 the specific API call (i.e. not a default attribute) 

47 client (Optional[google.cloud.bigquery.client.Client]): 

48 Pass in a Client object to extract any attributes that may be 

49 relevant to it and add them to the created spans. 

50 job_ref (Optional[google.cloud.bigquery.job._AsyncJob]) 

51 Pass in a _AsyncJob object to extract any attributes that may be 

52 relevant to it and add them to the created spans. 

53 

54 Yields: 

55 opentelemetry.trace.Span: Yields the newly created Span. 

56 

57 Raises: 

58 google.api_core.exceptions.GoogleAPICallError: 

59 Raised if a span could not be yielded or issue with call to 

60 OpenTelemetry. 

61 """ 

62 global _warned_telemetry 

63 final_attributes = _get_final_span_attributes(attributes, client, job_ref) 

64 if not HAS_OPENTELEMETRY: 

65 if not _warned_telemetry: 

66 logger.debug( 

67 "This service is instrumented using OpenTelemetry. " 

68 "OpenTelemetry or one of its components could not be imported; " 

69 "please add compatible versions of opentelemetry-api and " 

70 "opentelemetry-instrumentation packages in order to get BigQuery " 

71 "Tracing data." 

72 ) 

73 _warned_telemetry = True 

74 

75 yield None 

76 return 

77 tracer = trace.get_tracer(__name__) 

78 

79 # yield new span value 

80 with tracer.start_as_current_span(name=name, attributes=final_attributes) as span: 

81 try: 

82 yield span 

83 except GoogleAPICallError as error: 

84 if error.code is not None: 

85 span.set_status(Status(http_status_to_status_code(error.code))) 

86 raise 

87 

88 

89def _get_final_span_attributes(attributes=None, client=None, job_ref=None): 

90 """Compiles attributes from: client, job_ref, user-provided attributes. 

91 

92 Attributes from all of these sources are merged together. Note the 

93 attributes are added sequentially based on perceived order of precedence: 

94 i.e. attributes added last may overwrite attributes added earlier. 

95 

96 Args: 

97 attributes (Optional[dict]): 

98 Additional attributes that pertain to 

99 the specific API call (i.e. not a default attribute) 

100 

101 client (Optional[google.cloud.bigquery.client.Client]): 

102 Pass in a Client object to extract any attributes that may be 

103 relevant to it and add them to the final_attributes 

104 

105 job_ref (Optional[google.cloud.bigquery.job._AsyncJob]) 

106 Pass in a _AsyncJob object to extract any attributes that may be 

107 relevant to it and add them to the final_attributes. 

108 

109 Returns: dict 

110 """ 

111 

112 collected_attributes = _default_attributes.copy() 

113 

114 if client: 

115 collected_attributes.update(_set_client_attributes(client)) 

116 if job_ref: 

117 collected_attributes.update(_set_job_attributes(job_ref)) 

118 if attributes: 

119 collected_attributes.update(attributes) 

120 

121 final_attributes = {k: v for k, v in collected_attributes.items() if v is not None} 

122 return final_attributes 

123 

124 

125def _set_client_attributes(client): 

126 return {"db.name": client.project, "location": client.location} 

127 

128 

129def _set_job_attributes(job_ref): 

130 job_attributes = { 

131 "db.name": job_ref.project, 

132 "job_id": job_ref.job_id, 

133 "state": job_ref.state, 

134 } 

135 

136 job_attributes["hasErrors"] = job_ref.error_result is not None 

137 

138 if job_ref.created is not None: 

139 job_attributes["timeCreated"] = job_ref.created.isoformat() 

140 

141 if job_ref.started is not None: 

142 job_attributes["timeStarted"] = job_ref.started.isoformat() 

143 

144 if job_ref.ended is not None: 

145 job_attributes["timeEnded"] = job_ref.ended.isoformat() 

146 

147 if job_ref.location is not None: 

148 job_attributes["location"] = job_ref.location 

149 

150 if job_ref.parent_job_id is not None: 

151 job_attributes["parent_job_id"] = job_ref.parent_job_id 

152 

153 if job_ref.num_child_jobs is not None: 

154 job_attributes["num_child_jobs"] = job_ref.num_child_jobs 

155 

156 total_bytes_billed = getattr(job_ref, "total_bytes_billed", None) 

157 if total_bytes_billed is not None: 

158 job_attributes["total_bytes_billed"] = total_bytes_billed 

159 

160 total_bytes_processed = getattr(job_ref, "total_bytes_processed", None) 

161 if total_bytes_processed is not None: 

162 job_attributes["total_bytes_processed"] = total_bytes_processed 

163 

164 return job_attributes