Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/_observability.py: 54%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

79 statements  

1# Copyright 2023 The gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import annotations 

16 

17import abc 

18import contextlib 

19import logging 

20import threading 

21from typing import Any, Generator, Generic, List, Optional, TypeVar 

22 

23from grpc._cython import cygrpc as _cygrpc 

24 

25_LOGGER = logging.getLogger(__name__) 

26 

27_channel = Any # _channel.py imports this module. 

28ClientCallTracerCapsule = TypeVar("ClientCallTracerCapsule") 

29ServerCallTracerFactoryCapsule = TypeVar("ServerCallTracerFactoryCapsule") 

30 

31_plugin_lock: threading.RLock = threading.RLock() 

32_OBSERVABILITY_PLUGIN: Optional["ObservabilityPlugin"] = None 

33_SERVICES_TO_EXCLUDE: List[bytes] = [ 

34 b"google.monitoring.v3.MetricService", 

35 b"google.devtools.cloudtrace.v2.TraceService", 

36] 

37 

38 

39class ObservabilityPlugin( 

40 Generic[ClientCallTracerCapsule, ServerCallTracerFactoryCapsule], 

41 metaclass=abc.ABCMeta, 

42): 

43 """Abstract base class for observability plugin. 

44 

45 *This is a semi-private class that was intended for the exclusive use of 

46 the gRPC team.* 

47 

48 The ClientCallTracerCapsule and ClientCallTracerCapsule created by this 

49 plugin should be inject to gRPC core using observability_init at the 

50 start of a program, before any channels/servers are built. 

51 

52 Any future methods added to this interface cannot have the 

53 @abc.abstractmethod annotation. 

54 

55 Attributes: 

56 _stats_enabled: A bool indicates whether tracing is enabled. 

57 _tracing_enabled: A bool indicates whether stats(metrics) is enabled. 

58 _registered_methods: A set which stores the registered method names in 

59 bytes. 

60 """ 

61 

62 _tracing_enabled: bool = False 

63 _stats_enabled: bool = False 

64 

65 @abc.abstractmethod 

66 def create_client_call_tracer( 

67 self, method_name: bytes, target: bytes 

68 ) -> ClientCallTracerCapsule: 

69 """Creates a ClientCallTracerCapsule. 

70 

71 After register the plugin, if tracing or stats is enabled, this method 

72 will be called after a call was created, the ClientCallTracer created 

73 by this method will be saved to call context. 

74 

75 The ClientCallTracer is an object which implements `grpc_core::ClientCallTracer` 

76 interface and wrapped in a PyCapsule using `client_call_tracer` as name. 

77 

78 Args: 

79 method_name: The method name of the call in byte format. 

80 target: The channel target of the call in byte format. 

81 registered_method: Wether this method is pre-registered. 

82 

83 Returns: 

84 A PyCapsule which stores a ClientCallTracer object. 

85 """ 

86 raise NotImplementedError() 

87 

88 @abc.abstractmethod 

89 def delete_client_call_tracer( 

90 self, client_call_tracer: ClientCallTracerCapsule 

91 ) -> None: 

92 """Deletes the ClientCallTracer stored in ClientCallTracerCapsule. 

93 

94 After register the plugin, if tracing or stats is enabled, this method 

95 will be called at the end of the call to destroy the ClientCallTracer. 

96 

97 The ClientCallTracer is an object which implements `grpc_core::ClientCallTracer` 

98 interface and wrapped in a PyCapsule using `client_call_tracer` as name. 

99 

100 Args: 

101 client_call_tracer: A PyCapsule which stores a ClientCallTracer object. 

102 """ 

103 raise NotImplementedError() 

104 

105 @abc.abstractmethod 

106 def save_trace_context( 

107 self, trace_id: str, span_id: str, is_sampled: bool 

108 ) -> None: 

109 """Saves the trace_id and span_id related to the current span. 

110 

111 After register the plugin, if tracing is enabled, this method will be 

112 called after the server finished sending response. 

113 

114 This method can be used to propagate census context. 

115 

116 Args: 

117 trace_id: The identifier for the trace associated with the span as a 

118 32-character hexadecimal encoded string, 

119 e.g. 26ed0036f2eff2b7317bccce3e28d01f 

120 span_id: The identifier for the span as a 16-character hexadecimal encoded 

121 string. e.g. 113ec879e62583bc 

122 is_sampled: A bool indicates whether the span is sampled. 

123 """ 

124 raise NotImplementedError() 

125 

126 @abc.abstractmethod 

127 def create_server_call_tracer_factory( 

128 self, 

129 ) -> ServerCallTracerFactoryCapsule: 

130 """Creates a ServerCallTracerFactoryCapsule. 

131 

132 After register the plugin, if tracing or stats is enabled, this method 

133 will be called by calling observability_init, the ServerCallTracerFactory 

134 created by this method will be registered to gRPC core. 

135 

136 The ServerCallTracerFactory is an object which implements 

137 `grpc_core::ServerCallTracerFactory` interface and wrapped in a PyCapsule 

138 using `server_call_tracer_factory` as name. 

139 

140 Returns: 

141 A PyCapsule which stores a ServerCallTracerFactory object. 

142 """ 

143 raise NotImplementedError() 

144 

145 @abc.abstractmethod 

146 def record_rpc_latency( 

147 self, method: str, target: str, rpc_latency: float, status_code: Any 

148 ) -> None: 

149 """Record the latency of the RPC. 

150 

151 After register the plugin, if stats is enabled, this method will be 

152 called at the end of each RPC. 

153 

154 Args: 

155 method: The fully-qualified name of the RPC method being invoked. 

156 target: The target name of the RPC method being invoked. 

157 rpc_latency: The latency for the RPC in seconds, equals to the time between 

158 when the client invokes the RPC and when the client receives the status. 

159 status_code: An element of grpc.StatusCode in string format representing the 

160 final status for the RPC. 

161 """ 

162 raise NotImplementedError() 

163 

164 def set_tracing(self, enable: bool) -> None: 

165 """Enable or disable tracing. 

166 

167 Args: 

168 enable: A bool indicates whether tracing should be enabled. 

169 """ 

170 self._tracing_enabled = enable 

171 

172 def set_stats(self, enable: bool) -> None: 

173 """Enable or disable stats(metrics). 

174 

175 Args: 

176 enable: A bool indicates whether stats should be enabled. 

177 """ 

178 self._stats_enabled = enable 

179 

180 def save_registered_method(self, method_name: bytes) -> None: 

181 """Saves the method name to registered_method list. 

182 

183 When exporting metrics, method name for unregistered methods will be replaced 

184 with 'other' by default. 

185 

186 Args: 

187 method_name: The method name in bytes. 

188 """ 

189 raise NotImplementedError() 

190 

191 @property 

192 def tracing_enabled(self) -> bool: 

193 return self._tracing_enabled 

194 

195 @property 

196 def stats_enabled(self) -> bool: 

197 return self._stats_enabled 

198 

199 @property 

200 def observability_enabled(self) -> bool: 

201 return self.tracing_enabled or self.stats_enabled 

202 

203 

204@contextlib.contextmanager 

205def get_plugin() -> Generator[Optional[ObservabilityPlugin], None, None]: 

206 """Get the ObservabilityPlugin in _observability module. 

207 

208 Returns: 

209 The ObservabilityPlugin currently registered with the _observability 

210 module. Or None if no plugin exists at the time of calling this method. 

211 """ 

212 with _plugin_lock: 

213 yield _OBSERVABILITY_PLUGIN 

214 

215 

216def set_plugin(observability_plugin: Optional[ObservabilityPlugin]) -> None: 

217 """Save ObservabilityPlugin to _observability module. 

218 

219 Args: 

220 observability_plugin: The ObservabilityPlugin to save. 

221 

222 Raises: 

223 ValueError: If an ObservabilityPlugin was already registered at the 

224 time of calling this method. 

225 """ 

226 global _OBSERVABILITY_PLUGIN # pylint: disable=global-statement 

227 with _plugin_lock: 

228 if observability_plugin and _OBSERVABILITY_PLUGIN: 

229 raise ValueError("observability_plugin was already set!") 

230 _OBSERVABILITY_PLUGIN = observability_plugin 

231 

232 

233def observability_init(observability_plugin: ObservabilityPlugin) -> None: 

234 """Initialize observability with provided ObservabilityPlugin. 

235 

236 This method have to be called at the start of a program, before any 

237 channels/servers are built. 

238 

239 Args: 

240 observability_plugin: The ObservabilityPlugin to use. 

241 

242 Raises: 

243 ValueError: If an ObservabilityPlugin was already registered at the 

244 time of calling this method. 

245 """ 

246 set_plugin(observability_plugin) 

247 try: 

248 _cygrpc.set_server_call_tracer_factory(observability_plugin) 

249 except Exception: # pylint:disable=broad-except 

250 _LOGGER.exception("Failed to set server call tracer factory!") 

251 

252 

253def observability_deinit() -> None: 

254 """Clear the observability context, including ObservabilityPlugin and 

255 ServerCallTracerFactory 

256 

257 This method have to be called after exit observability context so that 

258 it's possible to re-initialize again. 

259 """ 

260 set_plugin(None) 

261 _cygrpc.clear_server_call_tracer_factory() 

262 

263 

264def delete_call_tracer(client_call_tracer_capsule: Any) -> None: 

265 """Deletes the ClientCallTracer stored in ClientCallTracerCapsule. 

266 

267 This method will be called at the end of the call to destroy the ClientCallTracer. 

268 

269 The ClientCallTracer is an object which implements `grpc_core::ClientCallTracer` 

270 interface and wrapped in a PyCapsule using `client_call_tracer` as the name. 

271 

272 Args: 

273 client_call_tracer_capsule: A PyCapsule which stores a ClientCallTracer object. 

274 """ 

275 with get_plugin() as plugin: 

276 if plugin and plugin.observability_enabled: 

277 plugin.delete_client_call_tracer(client_call_tracer_capsule) 

278 

279 

280def maybe_record_rpc_latency(state: "_channel._RPCState") -> None: 

281 """Record the latency of the RPC, if the plugin is registered and stats is enabled. 

282 

283 This method will be called at the end of each RPC. 

284 

285 Args: 

286 state: a grpc._channel._RPCState object which contains the stats related to the 

287 RPC. 

288 """ 

289 # TODO(xuanwn): use channel args to exclude those metrics. 

290 for exclude_prefix in _SERVICES_TO_EXCLUDE: 

291 if exclude_prefix in state.method.encode("utf8"): 

292 return 

293 with get_plugin() as plugin: 

294 if not (plugin and plugin.stats_enabled): 

295 return 

296 rpc_latency_s = state.rpc_end_time - state.rpc_start_time 

297 rpc_latency_ms = rpc_latency_s * 1000 

298 plugin.record_rpc_latency( 

299 state.method, state.target, rpc_latency_ms, state.code 

300 )