1# Copyright 2024, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from typing import Optional, List
16from datetime import datetime
17
18from opentelemetry import trace, context
19from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
20from opentelemetry.trace.propagation import set_span_in_context
21
22from google.cloud.pubsub_v1.open_telemetry.context_propagation import (
23 OpenTelemetryContextGetter,
24)
25from google.pubsub_v1.types import PubsubMessage
26
27_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1"
28_OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub"
29
30
31class SubscribeOpenTelemetry:
32 def __init__(self, message: PubsubMessage):
33 self._message: PubsubMessage = message
34
35 # subscribe span will be initialized by the `start_subscribe_span`
36 # method.
37 self._subscribe_span: Optional[trace.Span] = None
38
39 # subscriber concurrency control span will be initialized by the
40 # `start_subscribe_concurrency_control_span` method.
41 self._concurrency_control_span: Optional[trace.Span] = None
42
43 # scheduler span will be initialized by the
44 # `start_subscribe_scheduler_span` method.
45 self._scheduler_span: Optional[trace.Span] = None
46
47 # This will be set by `start_subscribe_span` method and will be used
48 # for other spans, such as process span.
49 self._subscription_id: Optional[str] = None
50
51 # This will be set by `start_process_span` method.
52 self._process_span: Optional[trace.Span] = None
53
54 # This will be set by `start_subscribe_span` method, if a publisher create span
55 # context was extracted from trace propagation. And will be used by spans like
56 # proces span to add links to the publisher create span.
57 self._publisher_create_span_context: Optional[context.Context] = None
58
59 # This will be set by `start_subscribe_span` method and will be used
60 # for other spans, such as modack span.
61 self._project_id: Optional[str] = None
62
63 @property
64 def subscription_id(self) -> Optional[str]:
65 return self._subscription_id
66
67 @property
68 def project_id(self) -> Optional[str]:
69 return self._project_id
70
71 @property
72 def subscribe_span(self) -> Optional[trace.Span]:
73 return self._subscribe_span
74
75 def start_subscribe_span(
76 self,
77 subscription: str,
78 exactly_once_enabled: bool,
79 ack_id: str,
80 delivery_attempt: int,
81 ) -> None:
82 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
83 parent_span_context = TraceContextTextMapPropagator().extract(
84 carrier=self._message,
85 getter=OpenTelemetryContextGetter(),
86 )
87 self._publisher_create_span_context = parent_span_context
88 split_subscription: List[str] = subscription.split("/")
89 assert len(split_subscription) == 4
90 subscription_short_name = split_subscription[3]
91 self._project_id = split_subscription[1]
92 self._subscription_id = subscription_short_name
93 with tracer.start_as_current_span(
94 name=f"{subscription_short_name} subscribe",
95 context=parent_span_context if parent_span_context else None,
96 kind=trace.SpanKind.CONSUMER,
97 attributes={
98 "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
99 "messaging.destination.name": subscription_short_name,
100 "gcp.project_id": subscription.split("/")[1],
101 "messaging.message.id": self._message.message_id,
102 "messaging.message.body.size": len(self._message.data),
103 "messaging.gcp_pubsub.message.ack_id": ack_id,
104 "messaging.gcp_pubsub.message.ordering_key": self._message.ordering_key,
105 "messaging.gcp_pubsub.message.exactly_once_delivery": exactly_once_enabled,
106 "code.function": "_on_response",
107 "messaging.gcp_pubsub.message.delivery_attempt": delivery_attempt,
108 },
109 end_on_exit=False,
110 ) as subscribe_span:
111 self._subscribe_span = subscribe_span
112
113 def add_subscribe_span_event(self, event: str) -> None:
114 assert self._subscribe_span is not None
115 self._subscribe_span.add_event(
116 name=event,
117 attributes={
118 "timestamp": str(datetime.now()),
119 },
120 )
121
122 def end_subscribe_span(self) -> None:
123 assert self._subscribe_span is not None
124 self._subscribe_span.end()
125
126 def set_subscribe_span_result(self, result: str) -> None:
127 assert self._subscribe_span is not None
128 self._subscribe_span.set_attribute(
129 key="messaging.gcp_pubsub.result",
130 value=result,
131 )
132
133 def start_subscribe_concurrency_control_span(self) -> None:
134 assert self._subscribe_span is not None
135 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
136 with tracer.start_as_current_span(
137 name="subscriber concurrency control",
138 kind=trace.SpanKind.INTERNAL,
139 context=set_span_in_context(self._subscribe_span),
140 end_on_exit=False,
141 ) as concurrency_control_span:
142 self._concurrency_control_span = concurrency_control_span
143
144 def end_subscribe_concurrency_control_span(self) -> None:
145 assert self._concurrency_control_span is not None
146 self._concurrency_control_span.end()
147
148 def start_subscribe_scheduler_span(self) -> None:
149 assert self._subscribe_span is not None
150 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
151 with tracer.start_as_current_span(
152 name="subscriber scheduler",
153 kind=trace.SpanKind.INTERNAL,
154 context=set_span_in_context(self._subscribe_span),
155 end_on_exit=False,
156 ) as scheduler_span:
157 self._scheduler_span = scheduler_span
158
159 def end_subscribe_scheduler_span(self) -> None:
160 assert self._scheduler_span is not None
161 self._scheduler_span.end()
162
163 def start_process_span(self) -> None:
164 assert self._subscribe_span is not None
165 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
166 publish_create_span_link: Optional[trace.Link] = None
167 if self._publisher_create_span_context:
168 publish_create_span: trace.Span = trace.get_current_span(
169 self._publisher_create_span_context
170 )
171 span_context: Optional[
172 trace.SpanContext
173 ] = publish_create_span.get_span_context()
174 publish_create_span_link = (
175 trace.Link(span_context) if span_context else None
176 )
177
178 with tracer.start_as_current_span(
179 name=f"{self._subscription_id} process",
180 attributes={
181 "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
182 },
183 kind=trace.SpanKind.INTERNAL,
184 context=set_span_in_context(self._subscribe_span),
185 links=[publish_create_span_link] if publish_create_span_link else None,
186 end_on_exit=False,
187 ) as process_span:
188 self._process_span = process_span
189
190 def end_process_span(self) -> None:
191 assert self._process_span is not None
192 self._process_span.end()
193
194 def add_process_span_event(self, event: str) -> None:
195 assert self._process_span is not None
196 self._process_span.add_event(
197 name=event,
198 attributes={
199 "timestamp": str(datetime.now()),
200 },
201 )
202
203
204def start_modack_span(
205 subscribe_span_links: List[trace.Link],
206 subscription_id: Optional[str],
207 message_count: int,
208 deadline: float,
209 project_id: Optional[str],
210 code_function: str,
211 receipt_modack: bool,
212) -> trace.Span:
213 assert subscription_id is not None
214 assert project_id is not None
215 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
216 with tracer.start_as_current_span(
217 name=f"{subscription_id} modack",
218 attributes={
219 "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
220 "messaging.batch.message_count": message_count,
221 "messaging.gcp_pubsub.message.ack_deadline": deadline,
222 "messaging.destination.name": subscription_id,
223 "gcp.project_id": project_id,
224 "messaging.operation.name": "modack",
225 "code.function": code_function,
226 "messaging.gcp_pubsub.is_receipt_modack": receipt_modack,
227 },
228 links=subscribe_span_links,
229 kind=trace.SpanKind.CLIENT,
230 end_on_exit=False,
231 ) as modack_span:
232 return modack_span
233
234
235def start_ack_span(
236 subscription_id: str,
237 message_count: int,
238 project_id: str,
239 links: List[trace.Link],
240) -> trace.Span:
241 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
242 with tracer.start_as_current_span(
243 name=f"{subscription_id} ack",
244 attributes={
245 "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
246 "messaging.batch.message_count": message_count,
247 "messaging.operation": "ack",
248 "gcp.project_id": project_id,
249 "messaging.destination.name": subscription_id,
250 "code.function": "ack",
251 },
252 kind=trace.SpanKind.CLIENT,
253 links=links,
254 end_on_exit=False,
255 ) as ack_span:
256 return ack_span
257
258
259def start_nack_span(
260 subscription_id: str,
261 message_count: int,
262 project_id: str,
263 links: List[trace.Link],
264) -> trace.Span:
265 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
266 with tracer.start_as_current_span(
267 name=f"{subscription_id} nack",
268 attributes={
269 "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
270 "messaging.batch.message_count": message_count,
271 "messaging.operation": "nack",
272 "gcp.project_id": project_id,
273 "messaging.destination.name": subscription_id,
274 "code.function": "modify_ack_deadline",
275 },
276 kind=trace.SpanKind.CLIENT,
277 links=links,
278 end_on_exit=False,
279 ) as nack_span:
280 return nack_span