Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/grpc/_observability.py: 54%

78 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:45 +0000

1# Copyright 2023 The gRPC authors. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import annotations 

16 

17import abc 

18import contextlib 

19import logging 

20import threading 

21from typing import Any, Generator, Generic, List, Optional, TypeVar 

22 

23from grpc._cython import cygrpc as _cygrpc 

24 

25_LOGGER = logging.getLogger(__name__) 

26 

27_channel = Any # _channel.py imports this module. 

28ClientCallTracerCapsule = TypeVar("ClientCallTracerCapsule") 

29ServerCallTracerFactoryCapsule = TypeVar("ServerCallTracerFactoryCapsule") 

30 

31_plugin_lock: threading.RLock = threading.RLock() 

32_OBSERVABILITY_PLUGIN: Optional["ObservabilityPlugin"] = None 

33_SERVICES_TO_EXCLUDE: List[bytes] = [ 

34 b"google.monitoring.v3.MetricService", 

35 b"google.devtools.cloudtrace.v2.TraceService", 

36] 

37 

38 

39class ObservabilityPlugin( 

40 Generic[ClientCallTracerCapsule, ServerCallTracerFactoryCapsule], 

41 metaclass=abc.ABCMeta, 

42): 

43 """Abstract base class for observability plugin. 

44 

45 *This is a semi-private class that was intended for the exclusive use of 

46 the gRPC team.* 

47 

48 The ClientCallTracerCapsule and ClientCallTracerCapsule created by this 

49 plugin should be inject to gRPC core using observability_init at the 

50 start of a program, before any channels/servers are built. 

51 

52 Any future methods added to this interface cannot have the 

53 @abc.abstractmethod annotation. 

54 

55 Attributes: 

56 _stats_enabled: A bool indicates whether tracing is enabled. 

57 _tracing_enabled: A bool indicates whether stats(metrics) is enabled. 

58 """ 

59 

60 _tracing_enabled: bool = False 

61 _stats_enabled: bool = False 

62 

63 @abc.abstractmethod 

64 def create_client_call_tracer( 

65 self, method_name: bytes 

66 ) -> ClientCallTracerCapsule: 

67 """Creates a ClientCallTracerCapsule. 

68 

69 After register the plugin, if tracing or stats is enabled, this method 

70 will be called after a call was created, the ClientCallTracer created 

71 by this method will be saved to call context. 

72 

73 The ClientCallTracer is an object which implements `grpc_core::ClientCallTracer` 

74 interface and wrapped in a PyCapsule using `client_call_tracer` as name. 

75 

76 Args: 

77 method_name: The method name of the call in byte format. 

78 

79 Returns: 

80 A PyCapsule which stores a ClientCallTracer object. 

81 """ 

82 raise NotImplementedError() 

83 

84 @abc.abstractmethod 

85 def delete_client_call_tracer( 

86 self, client_call_tracer: ClientCallTracerCapsule 

87 ) -> None: 

88 """Deletes the ClientCallTracer stored in ClientCallTracerCapsule. 

89 

90 After register the plugin, if tracing or stats is enabled, this method 

91 will be called at the end of the call to destroy the ClientCallTracer. 

92 

93 The ClientCallTracer is an object which implements `grpc_core::ClientCallTracer` 

94 interface and wrapped in a PyCapsule using `client_call_tracer` as name. 

95 

96 Args: 

97 client_call_tracer: A PyCapsule which stores a ClientCallTracer object. 

98 """ 

99 raise NotImplementedError() 

100 

101 @abc.abstractmethod 

102 def save_trace_context( 

103 self, trace_id: str, span_id: str, is_sampled: bool 

104 ) -> None: 

105 """Saves the trace_id and span_id related to the current span. 

106 

107 After register the plugin, if tracing is enabled, this method will be 

108 called after the server finished sending response. 

109 

110 This method can be used to propagate census context. 

111 

112 Args: 

113 trace_id: The identifier for the trace associated with the span as a 

114 32-character hexadecimal encoded string, 

115 e.g. 26ed0036f2eff2b7317bccce3e28d01f 

116 span_id: The identifier for the span as a 16-character hexadecimal encoded 

117 string. e.g. 113ec879e62583bc 

118 is_sampled: A bool indicates whether the span is sampled. 

119 """ 

120 raise NotImplementedError() 

121 

122 @abc.abstractmethod 

123 def create_server_call_tracer_factory( 

124 self, 

125 ) -> ServerCallTracerFactoryCapsule: 

126 """Creates a ServerCallTracerFactoryCapsule. 

127 

128 After register the plugin, if tracing or stats is enabled, this method 

129 will be called by calling observability_init, the ServerCallTracerFactory 

130 created by this method will be registered to gRPC core. 

131 

132 The ServerCallTracerFactory is an object which implements 

133 `grpc_core::ServerCallTracerFactory` interface and wrapped in a PyCapsule 

134 using `server_call_tracer_factory` as name. 

135 

136 Returns: 

137 A PyCapsule which stores a ServerCallTracerFactory object. 

138 """ 

139 raise NotImplementedError() 

140 

141 @abc.abstractmethod 

142 def record_rpc_latency( 

143 self, method: str, rpc_latency: float, status_code: Any 

144 ) -> None: 

145 """Record the latency of the RPC. 

146 

147 After register the plugin, if stats is enabled, this method will be 

148 called at the end of each RPC. 

149 

150 Args: 

151 method: The fully-qualified name of the RPC method being invoked. 

152 rpc_latency: The latency for the RPC, equals to the time between 

153 when the client invokes the RPC and when the client receives the status. 

154 status_code: An element of grpc.StatusCode in string format representing the 

155 final status for the RPC. 

156 """ 

157 raise NotImplementedError() 

158 

159 def set_tracing(self, enable: bool) -> None: 

160 """Enable or disable tracing. 

161 

162 Args: 

163 enable: A bool indicates whether tracing should be enabled. 

164 """ 

165 self._tracing_enabled = enable 

166 

167 def set_stats(self, enable: bool) -> None: 

168 """Enable or disable stats(metrics). 

169 

170 Args: 

171 enable: A bool indicates whether stats should be enabled. 

172 """ 

173 self._stats_enabled = enable 

174 

175 @property 

176 def tracing_enabled(self) -> bool: 

177 return self._tracing_enabled 

178 

179 @property 

180 def stats_enabled(self) -> bool: 

181 return self._stats_enabled 

182 

183 @property 

184 def observability_enabled(self) -> bool: 

185 return self.tracing_enabled or self.stats_enabled 

186 

187 

188@contextlib.contextmanager 

189def get_plugin() -> Generator[Optional[ObservabilityPlugin], None, None]: 

190 """Get the ObservabilityPlugin in _observability module. 

191 

192 Returns: 

193 The ObservabilityPlugin currently registered with the _observability 

194 module. Or None if no plugin exists at the time of calling this method. 

195 """ 

196 with _plugin_lock: 

197 yield _OBSERVABILITY_PLUGIN 

198 

199 

200def set_plugin(observability_plugin: Optional[ObservabilityPlugin]) -> None: 

201 """Save ObservabilityPlugin to _observability module. 

202 

203 Args: 

204 observability_plugin: The ObservabilityPlugin to save. 

205 

206 Raises: 

207 ValueError: If an ObservabilityPlugin was already registered at the 

208 time of calling this method. 

209 """ 

210 global _OBSERVABILITY_PLUGIN # pylint: disable=global-statement 

211 with _plugin_lock: 

212 if observability_plugin and _OBSERVABILITY_PLUGIN: 

213 raise ValueError("observability_plugin was already set!") 

214 _OBSERVABILITY_PLUGIN = observability_plugin 

215 

216 

217def observability_init(observability_plugin: ObservabilityPlugin) -> None: 

218 """Initialize observability with provided ObservabilityPlugin. 

219 

220 This method have to be called at the start of a program, before any 

221 channels/servers are built. 

222 

223 Args: 

224 observability_plugin: The ObservabilityPlugin to use. 

225 

226 Raises: 

227 ValueError: If an ObservabilityPlugin was already registered at the 

228 time of calling this method. 

229 """ 

230 set_plugin(observability_plugin) 

231 try: 

232 _cygrpc.set_server_call_tracer_factory(observability_plugin) 

233 except Exception: # pylint:disable=broad-except 

234 _LOGGER.exception("Failed to set server call tracer factory!") 

235 

236 

237def observability_deinit() -> None: 

238 """Clear the observability context, including ObservabilityPlugin and 

239 ServerCallTracerFactory 

240 

241 This method have to be called after exit observability context so that 

242 it's possible to re-initialize again. 

243 """ 

244 set_plugin(None) 

245 _cygrpc.clear_server_call_tracer_factory() 

246 

247 

248def delete_call_tracer(client_call_tracer_capsule: Any) -> None: 

249 """Deletes the ClientCallTracer stored in ClientCallTracerCapsule. 

250 

251 This method will be called at the end of the call to destroy the ClientCallTracer. 

252 

253 The ClientCallTracer is an object which implements `grpc_core::ClientCallTracer` 

254 interface and wrapped in a PyCapsule using `client_call_tracer` as the name. 

255 

256 Args: 

257 client_call_tracer_capsule: A PyCapsule which stores a ClientCallTracer object. 

258 """ 

259 with get_plugin() as plugin: 

260 if not (plugin and plugin.observability_enabled): 

261 return 

262 plugin.delete_client_call_tracer(client_call_tracer_capsule) 

263 

264 

265def maybe_record_rpc_latency(state: "_channel._RPCState") -> None: 

266 """Record the latency of the RPC, if the plugin is registered and stats is enabled. 

267 

268 This method will be called at the end of each RPC. 

269 

270 Args: 

271 state: a grpc._channel._RPCState object which contains the stats related to the 

272 RPC. 

273 """ 

274 # TODO(xuanwn): use channel args to exclude those metrics. 

275 for exclude_prefix in _SERVICES_TO_EXCLUDE: 

276 if exclude_prefix in state.method.encode("utf8"): 

277 return 

278 with get_plugin() as plugin: 

279 if not (plugin and plugin.stats_enabled): 

280 return 

281 rpc_latency_s = state.rpc_end_time - state.rpc_start_time 

282 rpc_latency_ms = rpc_latency_s * 1000 

283 plugin.record_rpc_latency(state.method, rpc_latency_ms, state.code)