Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

101 statements  

1# Copyright 2024, Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from typing import Optional, List 

16from datetime import datetime 

17 

18from opentelemetry import trace, context 

19from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator 

20from opentelemetry.trace.propagation import set_span_in_context 

21 

22from google.cloud.pubsub_v1.open_telemetry.context_propagation import ( 

23 OpenTelemetryContextGetter, 

24) 

25from google.pubsub_v1.types import PubsubMessage 

26 

27_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1" 

28_OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub" 

29 

30 

31class SubscribeOpenTelemetry: 

32 def __init__(self, message: PubsubMessage): 

33 self._message: PubsubMessage = message 

34 

35 # subscribe span will be initialized by the `start_subscribe_span` 

36 # method. 

37 self._subscribe_span: Optional[trace.Span] = None 

38 

39 # subscriber concurrency control span will be initialized by the 

40 # `start_subscribe_concurrency_control_span` method. 

41 self._concurrency_control_span: Optional[trace.Span] = None 

42 

43 # scheduler span will be initialized by the 

44 # `start_subscribe_scheduler_span` method. 

45 self._scheduler_span: Optional[trace.Span] = None 

46 

47 # This will be set by `start_subscribe_span` method and will be used 

48 # for other spans, such as process span. 

49 self._subscription_id: Optional[str] = None 

50 

51 # This will be set by `start_process_span` method. 

52 self._process_span: Optional[trace.Span] = None 

53 

54 # This will be set by `start_subscribe_span` method, if a publisher create span 

55 # context was extracted from trace propagation. And will be used by spans like 

56 # proces span to add links to the publisher create span. 

57 self._publisher_create_span_context: Optional[context.Context] = None 

58 

59 # This will be set by `start_subscribe_span` method and will be used 

60 # for other spans, such as modack span. 

61 self._project_id: Optional[str] = None 

62 

63 @property 

64 def subscription_id(self) -> Optional[str]: 

65 return self._subscription_id 

66 

67 @property 

68 def project_id(self) -> Optional[str]: 

69 return self._project_id 

70 

71 @property 

72 def subscribe_span(self) -> Optional[trace.Span]: 

73 return self._subscribe_span 

74 

75 def start_subscribe_span( 

76 self, 

77 subscription: str, 

78 exactly_once_enabled: bool, 

79 ack_id: str, 

80 delivery_attempt: int, 

81 ) -> None: 

82 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) 

83 parent_span_context = TraceContextTextMapPropagator().extract( 

84 carrier=self._message, 

85 getter=OpenTelemetryContextGetter(), 

86 ) 

87 self._publisher_create_span_context = parent_span_context 

88 split_subscription: List[str] = subscription.split("/") 

89 assert len(split_subscription) == 4 

90 subscription_short_name = split_subscription[3] 

91 self._project_id = split_subscription[1] 

92 self._subscription_id = subscription_short_name 

93 with tracer.start_as_current_span( 

94 name=f"{subscription_short_name} subscribe", 

95 context=parent_span_context if parent_span_context else None, 

96 kind=trace.SpanKind.CONSUMER, 

97 attributes={ 

98 "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM, 

99 "messaging.destination.name": subscription_short_name, 

100 "gcp.project_id": subscription.split("/")[1], 

101 "messaging.message.id": self._message.message_id, 

102 "messaging.message.body.size": len(self._message.data), 

103 "messaging.gcp_pubsub.message.ack_id": ack_id, 

104 "messaging.gcp_pubsub.message.ordering_key": self._message.ordering_key, 

105 "messaging.gcp_pubsub.message.exactly_once_delivery": exactly_once_enabled, 

106 "code.function": "_on_response", 

107 "messaging.gcp_pubsub.message.delivery_attempt": delivery_attempt, 

108 }, 

109 end_on_exit=False, 

110 ) as subscribe_span: 

111 self._subscribe_span = subscribe_span 

112 

113 def add_subscribe_span_event(self, event: str) -> None: 

114 assert self._subscribe_span is not None 

115 self._subscribe_span.add_event( 

116 name=event, 

117 attributes={ 

118 "timestamp": str(datetime.now()), 

119 }, 

120 ) 

121 

122 def end_subscribe_span(self) -> None: 

123 assert self._subscribe_span is not None 

124 self._subscribe_span.end() 

125 

126 def set_subscribe_span_result(self, result: str) -> None: 

127 assert self._subscribe_span is not None 

128 self._subscribe_span.set_attribute( 

129 key="messaging.gcp_pubsub.result", 

130 value=result, 

131 ) 

132 

133 def start_subscribe_concurrency_control_span(self) -> None: 

134 assert self._subscribe_span is not None 

135 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) 

136 with tracer.start_as_current_span( 

137 name="subscriber concurrency control", 

138 kind=trace.SpanKind.INTERNAL, 

139 context=set_span_in_context(self._subscribe_span), 

140 end_on_exit=False, 

141 ) as concurrency_control_span: 

142 self._concurrency_control_span = concurrency_control_span 

143 

144 def end_subscribe_concurrency_control_span(self) -> None: 

145 assert self._concurrency_control_span is not None 

146 self._concurrency_control_span.end() 

147 

148 def start_subscribe_scheduler_span(self) -> None: 

149 assert self._subscribe_span is not None 

150 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) 

151 with tracer.start_as_current_span( 

152 name="subscriber scheduler", 

153 kind=trace.SpanKind.INTERNAL, 

154 context=set_span_in_context(self._subscribe_span), 

155 end_on_exit=False, 

156 ) as scheduler_span: 

157 self._scheduler_span = scheduler_span 

158 

159 def end_subscribe_scheduler_span(self) -> None: 

160 assert self._scheduler_span is not None 

161 self._scheduler_span.end() 

162 

163 def start_process_span(self) -> trace.Span: 

164 assert self._subscribe_span is not None 

165 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) 

166 publish_create_span_link: Optional[trace.Link] = None 

167 if self._publisher_create_span_context: 

168 publish_create_span: trace.Span = trace.get_current_span( 

169 self._publisher_create_span_context 

170 ) 

171 span_context: Optional[ 

172 trace.SpanContext 

173 ] = publish_create_span.get_span_context() 

174 publish_create_span_link = ( 

175 trace.Link(span_context) if span_context else None 

176 ) 

177 

178 with tracer.start_as_current_span( 

179 name=f"{self._subscription_id} process", 

180 attributes={ 

181 "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM, 

182 }, 

183 kind=trace.SpanKind.INTERNAL, 

184 context=set_span_in_context(self._subscribe_span), 

185 links=[publish_create_span_link] if publish_create_span_link else None, 

186 end_on_exit=False, 

187 ) as process_span: 

188 self._process_span = process_span 

189 return process_span 

190 

191 def end_process_span(self) -> None: 

192 assert self._process_span is not None 

193 self._process_span.end() 

194 

195 def add_process_span_event(self, event: str) -> None: 

196 assert self._process_span is not None 

197 self._process_span.add_event( 

198 name=event, 

199 attributes={ 

200 "timestamp": str(datetime.now()), 

201 }, 

202 ) 

203 

204 def __enter__(self) -> trace.Span: 

205 return self.start_process_span() 

206 

207 def __exit__(self, exc_type, exc_val, traceback): 

208 if self._process_span: 

209 self.end_process_span() 

210 

211 

212def start_modack_span( 

213 subscribe_span_links: List[trace.Link], 

214 subscription_id: Optional[str], 

215 message_count: int, 

216 deadline: float, 

217 project_id: Optional[str], 

218 code_function: str, 

219 receipt_modack: bool, 

220) -> trace.Span: 

221 assert subscription_id is not None 

222 assert project_id is not None 

223 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) 

224 with tracer.start_as_current_span( 

225 name=f"{subscription_id} modack", 

226 attributes={ 

227 "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM, 

228 "messaging.batch.message_count": message_count, 

229 "messaging.gcp_pubsub.message.ack_deadline": deadline, 

230 "messaging.destination.name": subscription_id, 

231 "gcp.project_id": project_id, 

232 "messaging.operation.name": "modack", 

233 "code.function": code_function, 

234 "messaging.gcp_pubsub.is_receipt_modack": receipt_modack, 

235 }, 

236 links=subscribe_span_links, 

237 kind=trace.SpanKind.CLIENT, 

238 end_on_exit=False, 

239 ) as modack_span: 

240 return modack_span 

241 

242 

243def start_ack_span( 

244 subscription_id: str, 

245 message_count: int, 

246 project_id: str, 

247 links: List[trace.Link], 

248) -> trace.Span: 

249 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) 

250 with tracer.start_as_current_span( 

251 name=f"{subscription_id} ack", 

252 attributes={ 

253 "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM, 

254 "messaging.batch.message_count": message_count, 

255 "messaging.operation": "ack", 

256 "gcp.project_id": project_id, 

257 "messaging.destination.name": subscription_id, 

258 "code.function": "ack", 

259 }, 

260 kind=trace.SpanKind.CLIENT, 

261 links=links, 

262 end_on_exit=False, 

263 ) as ack_span: 

264 return ack_span 

265 

266 

267def start_nack_span( 

268 subscription_id: str, 

269 message_count: int, 

270 project_id: str, 

271 links: List[trace.Link], 

272) -> trace.Span: 

273 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) 

274 with tracer.start_as_current_span( 

275 name=f"{subscription_id} nack", 

276 attributes={ 

277 "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM, 

278 "messaging.batch.message_count": message_count, 

279 "messaging.operation": "nack", 

280 "gcp.project_id": project_id, 

281 "messaging.destination.name": subscription_id, 

282 "code.function": "modify_ack_deadline", 

283 }, 

284 kind=trace.SpanKind.CLIENT, 

285 links=links, 

286 end_on_exit=False, 

287 ) as nack_span: 

288 return nack_span