Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

95 statements  

1# Copyright 2024, Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from typing import Optional, List 

16from datetime import datetime 

17 

18from opentelemetry import trace, context 

19from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator 

20from opentelemetry.trace.propagation import set_span_in_context 

21 

22from google.cloud.pubsub_v1.open_telemetry.context_propagation import ( 

23 OpenTelemetryContextGetter, 

24) 

25from google.pubsub_v1.types import PubsubMessage 

26 

27_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1" 

28_OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub" 

29 

30 

31class SubscribeOpenTelemetry: 

32 def __init__(self, message: PubsubMessage): 

33 self._message: PubsubMessage = message 

34 

35 # subscribe span will be initialized by the `start_subscribe_span` 

36 # method. 

37 self._subscribe_span: Optional[trace.Span] = None 

38 

39 # subscriber concurrency control span will be initialized by the 

40 # `start_subscribe_concurrency_control_span` method. 

41 self._concurrency_control_span: Optional[trace.Span] = None 

42 

43 # scheduler span will be initialized by the 

44 # `start_subscribe_scheduler_span` method. 

45 self._scheduler_span: Optional[trace.Span] = None 

46 

47 # This will be set by `start_subscribe_span` method and will be used 

48 # for other spans, such as process span. 

49 self._subscription_id: Optional[str] = None 

50 

51 # This will be set by `start_process_span` method. 

52 self._process_span: Optional[trace.Span] = None 

53 

54 # This will be set by `start_subscribe_span` method, if a publisher create span 

55 # context was extracted from trace propagation. And will be used by spans like 

56 # proces span to add links to the publisher create span. 

57 self._publisher_create_span_context: Optional[context.Context] = None 

58 

59 # This will be set by `start_subscribe_span` method and will be used 

60 # for other spans, such as modack span. 

61 self._project_id: Optional[str] = None 

62 

63 @property 

64 def subscription_id(self) -> Optional[str]: 

65 return self._subscription_id 

66 

67 @property 

68 def project_id(self) -> Optional[str]: 

69 return self._project_id 

70 

71 @property 

72 def subscribe_span(self) -> Optional[trace.Span]: 

73 return self._subscribe_span 

74 

75 def start_subscribe_span( 

76 self, 

77 subscription: str, 

78 exactly_once_enabled: bool, 

79 ack_id: str, 

80 delivery_attempt: int, 

81 ) -> None: 

82 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) 

83 parent_span_context = TraceContextTextMapPropagator().extract( 

84 carrier=self._message, 

85 getter=OpenTelemetryContextGetter(), 

86 ) 

87 self._publisher_create_span_context = parent_span_context 

88 split_subscription: List[str] = subscription.split("/") 

89 assert len(split_subscription) == 4 

90 subscription_short_name = split_subscription[3] 

91 self._project_id = split_subscription[1] 

92 self._subscription_id = subscription_short_name 

93 with tracer.start_as_current_span( 

94 name=f"{subscription_short_name} subscribe", 

95 context=parent_span_context if parent_span_context else None, 

96 kind=trace.SpanKind.CONSUMER, 

97 attributes={ 

98 "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM, 

99 "messaging.destination.name": subscription_short_name, 

100 "gcp.project_id": subscription.split("/")[1], 

101 "messaging.message.id": self._message.message_id, 

102 "messaging.message.body.size": len(self._message.data), 

103 "messaging.gcp_pubsub.message.ack_id": ack_id, 

104 "messaging.gcp_pubsub.message.ordering_key": self._message.ordering_key, 

105 "messaging.gcp_pubsub.message.exactly_once_delivery": exactly_once_enabled, 

106 "code.function": "_on_response", 

107 "messaging.gcp_pubsub.message.delivery_attempt": delivery_attempt, 

108 }, 

109 end_on_exit=False, 

110 ) as subscribe_span: 

111 self._subscribe_span = subscribe_span 

112 

113 def add_subscribe_span_event(self, event: str) -> None: 

114 assert self._subscribe_span is not None 

115 self._subscribe_span.add_event( 

116 name=event, 

117 attributes={ 

118 "timestamp": str(datetime.now()), 

119 }, 

120 ) 

121 

122 def end_subscribe_span(self) -> None: 

123 assert self._subscribe_span is not None 

124 self._subscribe_span.end() 

125 

126 def set_subscribe_span_result(self, result: str) -> None: 

127 assert self._subscribe_span is not None 

128 self._subscribe_span.set_attribute( 

129 key="messaging.gcp_pubsub.result", 

130 value=result, 

131 ) 

132 

133 def start_subscribe_concurrency_control_span(self) -> None: 

134 assert self._subscribe_span is not None 

135 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) 

136 with tracer.start_as_current_span( 

137 name="subscriber concurrency control", 

138 kind=trace.SpanKind.INTERNAL, 

139 context=set_span_in_context(self._subscribe_span), 

140 end_on_exit=False, 

141 ) as concurrency_control_span: 

142 self._concurrency_control_span = concurrency_control_span 

143 

144 def end_subscribe_concurrency_control_span(self) -> None: 

145 assert self._concurrency_control_span is not None 

146 self._concurrency_control_span.end() 

147 

148 def start_subscribe_scheduler_span(self) -> None: 

149 assert self._subscribe_span is not None 

150 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) 

151 with tracer.start_as_current_span( 

152 name="subscriber scheduler", 

153 kind=trace.SpanKind.INTERNAL, 

154 context=set_span_in_context(self._subscribe_span), 

155 end_on_exit=False, 

156 ) as scheduler_span: 

157 self._scheduler_span = scheduler_span 

158 

159 def end_subscribe_scheduler_span(self) -> None: 

160 assert self._scheduler_span is not None 

161 self._scheduler_span.end() 

162 

163 def start_process_span(self) -> None: 

164 assert self._subscribe_span is not None 

165 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) 

166 publish_create_span_link: Optional[trace.Link] = None 

167 if self._publisher_create_span_context: 

168 publish_create_span: trace.Span = trace.get_current_span( 

169 self._publisher_create_span_context 

170 ) 

171 span_context: Optional[ 

172 trace.SpanContext 

173 ] = publish_create_span.get_span_context() 

174 publish_create_span_link = ( 

175 trace.Link(span_context) if span_context else None 

176 ) 

177 

178 with tracer.start_as_current_span( 

179 name=f"{self._subscription_id} process", 

180 attributes={ 

181 "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM, 

182 }, 

183 kind=trace.SpanKind.INTERNAL, 

184 context=set_span_in_context(self._subscribe_span), 

185 links=[publish_create_span_link] if publish_create_span_link else None, 

186 end_on_exit=False, 

187 ) as process_span: 

188 self._process_span = process_span 

189 

190 def end_process_span(self) -> None: 

191 assert self._process_span is not None 

192 self._process_span.end() 

193 

194 def add_process_span_event(self, event: str) -> None: 

195 assert self._process_span is not None 

196 self._process_span.add_event( 

197 name=event, 

198 attributes={ 

199 "timestamp": str(datetime.now()), 

200 }, 

201 ) 

202 

203 

204def start_modack_span( 

205 subscribe_span_links: List[trace.Link], 

206 subscription_id: Optional[str], 

207 message_count: int, 

208 deadline: float, 

209 project_id: Optional[str], 

210 code_function: str, 

211 receipt_modack: bool, 

212) -> trace.Span: 

213 assert subscription_id is not None 

214 assert project_id is not None 

215 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) 

216 with tracer.start_as_current_span( 

217 name=f"{subscription_id} modack", 

218 attributes={ 

219 "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM, 

220 "messaging.batch.message_count": message_count, 

221 "messaging.gcp_pubsub.message.ack_deadline": deadline, 

222 "messaging.destination.name": subscription_id, 

223 "gcp.project_id": project_id, 

224 "messaging.operation.name": "modack", 

225 "code.function": code_function, 

226 "messaging.gcp_pubsub.is_receipt_modack": receipt_modack, 

227 }, 

228 links=subscribe_span_links, 

229 kind=trace.SpanKind.CLIENT, 

230 end_on_exit=False, 

231 ) as modack_span: 

232 return modack_span 

233 

234 

235def start_ack_span( 

236 subscription_id: str, 

237 message_count: int, 

238 project_id: str, 

239 links: List[trace.Link], 

240) -> trace.Span: 

241 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) 

242 with tracer.start_as_current_span( 

243 name=f"{subscription_id} ack", 

244 attributes={ 

245 "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM, 

246 "messaging.batch.message_count": message_count, 

247 "messaging.operation": "ack", 

248 "gcp.project_id": project_id, 

249 "messaging.destination.name": subscription_id, 

250 "code.function": "ack", 

251 }, 

252 kind=trace.SpanKind.CLIENT, 

253 links=links, 

254 end_on_exit=False, 

255 ) as ack_span: 

256 return ack_span 

257 

258 

259def start_nack_span( 

260 subscription_id: str, 

261 message_count: int, 

262 project_id: str, 

263 links: List[trace.Link], 

264) -> trace.Span: 

265 tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) 

266 with tracer.start_as_current_span( 

267 name=f"{subscription_id} nack", 

268 attributes={ 

269 "messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM, 

270 "messaging.batch.message_count": message_count, 

271 "messaging.operation": "nack", 

272 "gcp.project_id": project_id, 

273 "messaging.destination.name": subscription_id, 

274 "code.function": "modify_ack_deadline", 

275 }, 

276 kind=trace.SpanKind.CLIENT, 

277 links=links, 

278 end_on_exit=False, 

279 ) as nack_span: 

280 return nack_span