1# Copyright 2017, Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import sys
16from datetime import datetime
17from typing import Optional
18
19from opentelemetry import trace
20from opentelemetry.trace.propagation import set_span_in_context
21from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
22
23from google.pubsub_v1 import types as gapic_types
24from google.cloud.pubsub_v1.open_telemetry.context_propagation import (
25 OpenTelemetryContextSetter,
26)
27
28
29class PublishMessageWrapper:
30 _OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1"
31 _OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub"
32 _OPEN_TELEMETRY_PUBLISHER_BATCHING = "publisher batching"
33
34 _PUBLISH_START_EVENT: str = "publish start"
35 _PUBLISH_FLOW_CONTROL: str = "publisher flow control"
36
37 def __init__(self, message: gapic_types.PubsubMessage):
38 self._message: gapic_types.PubsubMessage = message
39 self._create_span: Optional[trace.Span] = None
40 self._flow_control_span: Optional[trace.Span] = None
41 self._batching_span: Optional[trace.Span] = None
42
43 @property
44 def message(self):
45 return self._message
46
47 @message.setter # type: ignore[no-redef] # resetting message value is intentional here
48 def message(self, message: gapic_types.PubsubMessage):
49 self._message = message
50
51 @property
52 def create_span(self):
53 return self._create_span
54
55 def __eq__(self, other): # pragma: NO COVER
56 """Used for pytest asserts to compare two PublishMessageWrapper objects with the same message"""
57 if isinstance(self, other.__class__):
58 return self.message == other.message
59 return False
60
61 def start_create_span(self, topic: str, ordering_key: str) -> None:
62 tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
63 assert len(topic.split("/")) == 4
64 topic_short_name = topic.split("/")[3]
65 with tracer.start_as_current_span(
66 name=f"{topic_short_name} create",
67 attributes={
68 "messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM,
69 "messaging.destination.name": topic_short_name,
70 "code.function": "publish",
71 "messaging.gcp_pubsub.message.ordering_key": ordering_key,
72 "messaging.operation": "create",
73 "gcp.project_id": topic.split("/")[1],
74 "messaging.message.body.size": sys.getsizeof(
75 self._message.data
76 ), # sys.getsizeof() used since the attribute expects size of message body in bytes
77 },
78 kind=trace.SpanKind.PRODUCER,
79 end_on_exit=False,
80 ) as create_span:
81 create_span.add_event(
82 name=self._PUBLISH_START_EVENT,
83 attributes={
84 "timestamp": str(datetime.now()),
85 },
86 )
87 self._create_span = create_span
88 TraceContextTextMapPropagator().inject(
89 carrier=self._message,
90 setter=OpenTelemetryContextSetter(),
91 )
92
93 def end_create_span(self, exc: Optional[BaseException] = None) -> None:
94 assert self._create_span is not None
95 if exc:
96 self._create_span.record_exception(exception=exc)
97 self._create_span.set_status(
98 trace.Status(status_code=trace.StatusCode.ERROR)
99 )
100 self._create_span.end()
101
102 def start_publisher_flow_control_span(self) -> None:
103 tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
104 assert self._create_span is not None
105 with tracer.start_as_current_span(
106 name=self._PUBLISH_FLOW_CONTROL,
107 kind=trace.SpanKind.INTERNAL,
108 context=set_span_in_context(self._create_span),
109 end_on_exit=False,
110 ) as flow_control_span:
111 self._flow_control_span = flow_control_span
112
113 def end_publisher_flow_control_span(
114 self, exc: Optional[BaseException] = None
115 ) -> None:
116 assert self._flow_control_span is not None
117 if exc:
118 self._flow_control_span.record_exception(exception=exc)
119 self._flow_control_span.set_status(
120 trace.Status(status_code=trace.StatusCode.ERROR)
121 )
122 self._flow_control_span.end()
123
124 def start_publisher_batching_span(self) -> None:
125 assert self._create_span is not None
126 tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
127 with tracer.start_as_current_span(
128 name=self._OPEN_TELEMETRY_PUBLISHER_BATCHING,
129 kind=trace.SpanKind.INTERNAL,
130 context=set_span_in_context(self._create_span),
131 end_on_exit=False,
132 ) as batching_span:
133 self._batching_span = batching_span
134
135 def end_publisher_batching_span(self, exc: Optional[BaseException] = None) -> None:
136 assert self._batching_span is not None
137 if exc:
138 self._batching_span.record_exception(exception=exc)
139 self._batching_span.set_status(
140 trace.Status(status_code=trace.StatusCode.ERROR)
141 )
142 self._batching_span.end()