1# Copyright 2024, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from typing import Optional, List
16from datetime import datetime
17
18from opentelemetry import trace, context
19from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
20from opentelemetry.trace.propagation import set_span_in_context
21
22from google.cloud.pubsub_v1.open_telemetry.context_propagation import (
23 OpenTelemetryContextGetter,
24)
25from google.pubsub_v1.types import PubsubMessage
26
27_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1"
28_OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub"
29
30
31class SubscribeOpenTelemetry:
32 def __init__(self, message: PubsubMessage):
33 self._message: PubsubMessage = message
34
35 # subscribe span will be initialized by the `start_subscribe_span`
36 # method.
37 self._subscribe_span: Optional[trace.Span] = None
38
39 # subscriber concurrency control span will be initialized by the
40 # `start_subscribe_concurrency_control_span` method.
41 self._concurrency_control_span: Optional[trace.Span] = None
42
43 # scheduler span will be initialized by the
44 # `start_subscribe_scheduler_span` method.
45 self._scheduler_span: Optional[trace.Span] = None
46
47 # This will be set by `start_subscribe_span` method and will be used
48 # for other spans, such as process span.
49 self._subscription_id: Optional[str] = None
50
51 # This will be set by `start_process_span` method.
52 self._process_span: Optional[trace.Span] = None
53
54 # This will be set by `start_subscribe_span` method, if a publisher create span
55 # context was extracted from trace propagation. And will be used by spans like
56 # proces span to add links to the publisher create span.
57 self._publisher_create_span_context: Optional[context.Context] = None
58
59 # This will be set by `start_subscribe_span` method and will be used
60 # for other spans, such as modack span.
61 self._project_id: Optional[str] = None
62
63 @property
64 def subscription_id(self) -> Optional[str]:
65 return self._subscription_id
66
67 @property
68 def project_id(self) -> Optional[str]:
69 return self._project_id
70
71 @property
72 def subscribe_span(self) -> Optional[trace.Span]:
73 return self._subscribe_span
74
75 def start_subscribe_span(
76 self,
77 subscription: str,
78 exactly_once_enabled: bool,
79 ack_id: str,
80 delivery_attempt: int,
81 ) -> None:
82 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
83 parent_span_context = TraceContextTextMapPropagator().extract(
84 carrier=self._message,
85 getter=OpenTelemetryContextGetter(),
86 )
87 self._publisher_create_span_context = parent_span_context
88 split_subscription: List[str] = subscription.split("/")
89 assert len(split_subscription) == 4
90 subscription_short_name = split_subscription[3]
91 self._project_id = split_subscription[1]
92 self._subscription_id = subscription_short_name
93 with tracer.start_as_current_span(
94 name=f"{subscription_short_name} subscribe",
95 context=parent_span_context if parent_span_context else None,
96 kind=trace.SpanKind.CONSUMER,
97 attributes={
98 "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
99 "messaging.destination.name": subscription_short_name,
100 "gcp.project_id": subscription.split("/")[1],
101 "messaging.message.id": self._message.message_id,
102 "messaging.message.body.size": len(self._message.data),
103 "messaging.gcp_pubsub.message.ack_id": ack_id,
104 "messaging.gcp_pubsub.message.ordering_key": self._message.ordering_key,
105 "messaging.gcp_pubsub.message.exactly_once_delivery": exactly_once_enabled,
106 "code.function": "_on_response",
107 "messaging.gcp_pubsub.message.delivery_attempt": delivery_attempt,
108 },
109 end_on_exit=False,
110 ) as subscribe_span:
111 self._subscribe_span = subscribe_span
112
113 def add_subscribe_span_event(self, event: str) -> None:
114 assert self._subscribe_span is not None
115 self._subscribe_span.add_event(
116 name=event,
117 attributes={
118 "timestamp": str(datetime.now()),
119 },
120 )
121
122 def end_subscribe_span(self) -> None:
123 assert self._subscribe_span is not None
124 self._subscribe_span.end()
125
126 def set_subscribe_span_result(self, result: str) -> None:
127 assert self._subscribe_span is not None
128 self._subscribe_span.set_attribute(
129 key="messaging.gcp_pubsub.result",
130 value=result,
131 )
132
133 def start_subscribe_concurrency_control_span(self) -> None:
134 assert self._subscribe_span is not None
135 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
136 with tracer.start_as_current_span(
137 name="subscriber concurrency control",
138 kind=trace.SpanKind.INTERNAL,
139 context=set_span_in_context(self._subscribe_span),
140 end_on_exit=False,
141 ) as concurrency_control_span:
142 self._concurrency_control_span = concurrency_control_span
143
144 def end_subscribe_concurrency_control_span(self) -> None:
145 assert self._concurrency_control_span is not None
146 self._concurrency_control_span.end()
147
148 def start_subscribe_scheduler_span(self) -> None:
149 assert self._subscribe_span is not None
150 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
151 with tracer.start_as_current_span(
152 name="subscriber scheduler",
153 kind=trace.SpanKind.INTERNAL,
154 context=set_span_in_context(self._subscribe_span),
155 end_on_exit=False,
156 ) as scheduler_span:
157 self._scheduler_span = scheduler_span
158
159 def end_subscribe_scheduler_span(self) -> None:
160 assert self._scheduler_span is not None
161 self._scheduler_span.end()
162
163 def start_process_span(self) -> trace.Span:
164 assert self._subscribe_span is not None
165 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
166 publish_create_span_link: Optional[trace.Link] = None
167 if self._publisher_create_span_context:
168 publish_create_span: trace.Span = trace.get_current_span(
169 self._publisher_create_span_context
170 )
171 span_context: Optional[
172 trace.SpanContext
173 ] = publish_create_span.get_span_context()
174 publish_create_span_link = (
175 trace.Link(span_context) if span_context else None
176 )
177
178 with tracer.start_as_current_span(
179 name=f"{self._subscription_id} process",
180 attributes={
181 "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
182 },
183 kind=trace.SpanKind.INTERNAL,
184 context=set_span_in_context(self._subscribe_span),
185 links=[publish_create_span_link] if publish_create_span_link else None,
186 end_on_exit=False,
187 ) as process_span:
188 self._process_span = process_span
189 return process_span
190
191 def end_process_span(self) -> None:
192 assert self._process_span is not None
193 self._process_span.end()
194
195 def add_process_span_event(self, event: str) -> None:
196 assert self._process_span is not None
197 self._process_span.add_event(
198 name=event,
199 attributes={
200 "timestamp": str(datetime.now()),
201 },
202 )
203
204 def __enter__(self) -> trace.Span:
205 return self.start_process_span()
206
207 def __exit__(self, exc_type, exc_val, traceback):
208 if self._process_span:
209 self.end_process_span()
210
211
212def start_modack_span(
213 subscribe_span_links: List[trace.Link],
214 subscription_id: Optional[str],
215 message_count: int,
216 deadline: float,
217 project_id: Optional[str],
218 code_function: str,
219 receipt_modack: bool,
220) -> trace.Span:
221 assert subscription_id is not None
222 assert project_id is not None
223 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
224 with tracer.start_as_current_span(
225 name=f"{subscription_id} modack",
226 attributes={
227 "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
228 "messaging.batch.message_count": message_count,
229 "messaging.gcp_pubsub.message.ack_deadline": deadline,
230 "messaging.destination.name": subscription_id,
231 "gcp.project_id": project_id,
232 "messaging.operation.name": "modack",
233 "code.function": code_function,
234 "messaging.gcp_pubsub.is_receipt_modack": receipt_modack,
235 },
236 links=subscribe_span_links,
237 kind=trace.SpanKind.CLIENT,
238 end_on_exit=False,
239 ) as modack_span:
240 return modack_span
241
242
243def start_ack_span(
244 subscription_id: str,
245 message_count: int,
246 project_id: str,
247 links: List[trace.Link],
248) -> trace.Span:
249 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
250 with tracer.start_as_current_span(
251 name=f"{subscription_id} ack",
252 attributes={
253 "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
254 "messaging.batch.message_count": message_count,
255 "messaging.operation": "ack",
256 "gcp.project_id": project_id,
257 "messaging.destination.name": subscription_id,
258 "code.function": "ack",
259 },
260 kind=trace.SpanKind.CLIENT,
261 links=links,
262 end_on_exit=False,
263 ) as ack_span:
264 return ack_span
265
266
267def start_nack_span(
268 subscription_id: str,
269 message_count: int,
270 project_id: str,
271 links: List[trace.Link],
272) -> trace.Span:
273 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
274 with tracer.start_as_current_span(
275 name=f"{subscription_id} nack",
276 attributes={
277 "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
278 "messaging.batch.message_count": message_count,
279 "messaging.operation": "nack",
280 "gcp.project_id": project_id,
281 "messaging.destination.name": subscription_id,
282 "code.function": "modify_ack_deadline",
283 },
284 kind=trace.SpanKind.CLIENT,
285 links=links,
286 end_on_exit=False,
287 ) as nack_span:
288 return nack_span