Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

65 statements  

1# Copyright 2017, Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import sys 

16from datetime import datetime 

17from typing import Optional 

18 

19from opentelemetry import trace 

20from opentelemetry.trace.propagation import set_span_in_context 

21from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator 

22 

23from google.pubsub_v1 import types as gapic_types 

24from google.cloud.pubsub_v1.open_telemetry.context_propagation import ( 

25 OpenTelemetryContextSetter, 

26) 

27 

28 

29class PublishMessageWrapper: 

30 _OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1" 

31 _OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub" 

32 _OPEN_TELEMETRY_PUBLISHER_BATCHING = "publisher batching" 

33 

34 _PUBLISH_START_EVENT: str = "publish start" 

35 _PUBLISH_FLOW_CONTROL: str = "publisher flow control" 

36 

37 def __init__(self, message: gapic_types.PubsubMessage): 

38 self._message: gapic_types.PubsubMessage = message 

39 self._create_span: Optional[trace.Span] = None 

40 self._flow_control_span: Optional[trace.Span] = None 

41 self._batching_span: Optional[trace.Span] = None 

42 

43 @property 

44 def message(self): 

45 return self._message 

46 

47 @message.setter # type: ignore[no-redef] # resetting message value is intentional here 

48 def message(self, message: gapic_types.PubsubMessage): 

49 self._message = message 

50 

51 @property 

52 def create_span(self): 

53 return self._create_span 

54 

55 def __eq__(self, other): # pragma: NO COVER 

56 """Used for pytest asserts to compare two PublishMessageWrapper objects with the same message""" 

57 if isinstance(self, other.__class__): 

58 return self.message == other.message 

59 return False 

60 

61 def start_create_span(self, topic: str, ordering_key: str) -> None: 

62 tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) 

63 assert len(topic.split("/")) == 4 

64 topic_short_name = topic.split("/")[3] 

65 with tracer.start_as_current_span( 

66 name=f"{topic_short_name} create", 

67 attributes={ 

68 "messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM, 

69 "messaging.destination.name": topic_short_name, 

70 "code.function": "publish", 

71 "messaging.gcp_pubsub.message.ordering_key": ordering_key, 

72 "messaging.operation": "create", 

73 "gcp.project_id": topic.split("/")[1], 

74 "messaging.message.body.size": sys.getsizeof( 

75 self._message.data 

76 ), # sys.getsizeof() used since the attribute expects size of message body in bytes 

77 }, 

78 kind=trace.SpanKind.PRODUCER, 

79 end_on_exit=False, 

80 ) as create_span: 

81 create_span.add_event( 

82 name=self._PUBLISH_START_EVENT, 

83 attributes={ 

84 "timestamp": str(datetime.now()), 

85 }, 

86 ) 

87 self._create_span = create_span 

88 TraceContextTextMapPropagator().inject( 

89 carrier=self._message, 

90 setter=OpenTelemetryContextSetter(), 

91 ) 

92 

93 def end_create_span(self, exc: Optional[BaseException] = None) -> None: 

94 assert self._create_span is not None 

95 if exc: 

96 self._create_span.record_exception(exception=exc) 

97 self._create_span.set_status( 

98 trace.Status(status_code=trace.StatusCode.ERROR) 

99 ) 

100 self._create_span.end() 

101 

102 def start_publisher_flow_control_span(self) -> None: 

103 tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) 

104 assert self._create_span is not None 

105 with tracer.start_as_current_span( 

106 name=self._PUBLISH_FLOW_CONTROL, 

107 kind=trace.SpanKind.INTERNAL, 

108 context=set_span_in_context(self._create_span), 

109 end_on_exit=False, 

110 ) as flow_control_span: 

111 self._flow_control_span = flow_control_span 

112 

113 def end_publisher_flow_control_span( 

114 self, exc: Optional[BaseException] = None 

115 ) -> None: 

116 assert self._flow_control_span is not None 

117 if exc: 

118 self._flow_control_span.record_exception(exception=exc) 

119 self._flow_control_span.set_status( 

120 trace.Status(status_code=trace.StatusCode.ERROR) 

121 ) 

122 self._flow_control_span.end() 

123 

124 def start_publisher_batching_span(self) -> None: 

125 assert self._create_span is not None 

126 tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) 

127 with tracer.start_as_current_span( 

128 name=self._OPEN_TELEMETRY_PUBLISHER_BATCHING, 

129 kind=trace.SpanKind.INTERNAL, 

130 context=set_span_in_context(self._create_span), 

131 end_on_exit=False, 

132 ) as batching_span: 

133 self._batching_span = batching_span 

134 

135 def end_publisher_batching_span(self, exc: Optional[BaseException] = None) -> None: 

136 assert self._batching_span is not None 

137 if exc: 

138 self._batching_span.record_exception(exception=exc) 

139 self._batching_span.set_status( 

140 trace.Status(status_code=trace.StatusCode.ERROR) 

141 ) 

142 self._batching_span.end()