1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import logging
16from contextlib import contextmanager
17from google.api_core.exceptions import GoogleAPICallError # type: ignore
18
19logger = logging.getLogger(__name__)
20try:
21 from opentelemetry import trace # type: ignore
22 from opentelemetry.instrumentation.utils import http_status_to_status_code # type: ignore
23 from opentelemetry.trace.status import Status # type: ignore
24
25 HAS_OPENTELEMETRY = True
26 _warned_telemetry = True
27
28except ImportError:
29 HAS_OPENTELEMETRY = False
30 _warned_telemetry = False
31
32_default_attributes = {
33 "db.system": "BigQuery"
34} # static, default values assigned to all spans
35
36
37@contextmanager
38def create_span(name, attributes=None, client=None, job_ref=None):
39 """Creates a ContextManager for a Span to be exported to the configured exporter.
40 If no configuration exists yields None.
41
42 Args:
43 name (str): Name that will be set for the span being created
44 attributes (Optional[dict]):
45 Additional attributes that pertain to
46 the specific API call (i.e. not a default attribute)
47 client (Optional[google.cloud.bigquery.client.Client]):
48 Pass in a Client object to extract any attributes that may be
49 relevant to it and add them to the created spans.
50 job_ref (Optional[google.cloud.bigquery.job._AsyncJob])
51 Pass in a _AsyncJob object to extract any attributes that may be
52 relevant to it and add them to the created spans.
53
54 Yields:
55 opentelemetry.trace.Span: Yields the newly created Span.
56
57 Raises:
58 google.api_core.exceptions.GoogleAPICallError:
59 Raised if a span could not be yielded or issue with call to
60 OpenTelemetry.
61 """
62 global _warned_telemetry
63 final_attributes = _get_final_span_attributes(attributes, client, job_ref)
64 if not HAS_OPENTELEMETRY:
65 if not _warned_telemetry:
66 logger.debug(
67 "This service is instrumented using OpenTelemetry. "
68 "OpenTelemetry or one of its components could not be imported; "
69 "please add compatible versions of opentelemetry-api and "
70 "opentelemetry-instrumentation packages in order to get BigQuery "
71 "Tracing data."
72 )
73 _warned_telemetry = True
74
75 yield None
76 return
77 tracer = trace.get_tracer(__name__)
78
79 # yield new span value
80 with tracer.start_as_current_span(name=name, attributes=final_attributes) as span:
81 try:
82 yield span
83 except GoogleAPICallError as error:
84 if error.code is not None:
85 span.set_status(Status(http_status_to_status_code(error.code)))
86 raise
87
88
89def _get_final_span_attributes(attributes=None, client=None, job_ref=None):
90 """Compiles attributes from: client, job_ref, user-provided attributes.
91
92 Attributes from all of these sources are merged together. Note the
93 attributes are added sequentially based on perceived order of precedence:
94 i.e. attributes added last may overwrite attributes added earlier.
95
96 Args:
97 attributes (Optional[dict]):
98 Additional attributes that pertain to
99 the specific API call (i.e. not a default attribute)
100
101 client (Optional[google.cloud.bigquery.client.Client]):
102 Pass in a Client object to extract any attributes that may be
103 relevant to it and add them to the final_attributes
104
105 job_ref (Optional[google.cloud.bigquery.job._AsyncJob])
106 Pass in a _AsyncJob object to extract any attributes that may be
107 relevant to it and add them to the final_attributes.
108
109 Returns: dict
110 """
111
112 collected_attributes = _default_attributes.copy()
113
114 if client:
115 collected_attributes.update(_set_client_attributes(client))
116 if job_ref:
117 collected_attributes.update(_set_job_attributes(job_ref))
118 if attributes:
119 collected_attributes.update(attributes)
120
121 final_attributes = {k: v for k, v in collected_attributes.items() if v is not None}
122 return final_attributes
123
124
125def _set_client_attributes(client):
126 return {"db.name": client.project, "location": client.location}
127
128
129def _set_job_attributes(job_ref):
130 job_attributes = {
131 "db.name": job_ref.project,
132 "job_id": job_ref.job_id,
133 "state": job_ref.state,
134 }
135
136 job_attributes["hasErrors"] = job_ref.error_result is not None
137
138 if job_ref.created is not None:
139 job_attributes["timeCreated"] = job_ref.created.isoformat()
140
141 if job_ref.started is not None:
142 job_attributes["timeStarted"] = job_ref.started.isoformat()
143
144 if job_ref.ended is not None:
145 job_attributes["timeEnded"] = job_ref.ended.isoformat()
146
147 if job_ref.location is not None:
148 job_attributes["location"] = job_ref.location
149
150 if job_ref.parent_job_id is not None:
151 job_attributes["parent_job_id"] = job_ref.parent_job_id
152
153 if job_ref.num_child_jobs is not None:
154 job_attributes["num_child_jobs"] = job_ref.num_child_jobs
155
156 total_bytes_billed = getattr(job_ref, "total_bytes_billed", None)
157 if total_bytes_billed is not None:
158 job_attributes["total_bytes_billed"] = total_bytes_billed
159
160 total_bytes_processed = getattr(job_ref, "total_bytes_processed", None)
161 if total_bytes_processed is not None:
162 job_attributes["total_bytes_processed"] = total_bytes_processed
163
164 return job_attributes