Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/opentelemetry/sdk/metrics/_internal/export/__init__.py: 33%

175 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Copyright The OpenTelemetry Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15import os 

16from abc import ABC, abstractmethod 

17from enum import Enum 

18from logging import getLogger 

19from os import environ, linesep 

20from sys import stdout 

21from threading import Event, Lock, RLock, Thread 

22from time import time_ns 

23from typing import IO, Callable, Dict, Iterable, Optional 

24 

25from typing_extensions import final 

26 

27# This kind of import is needed to avoid Sphinx errors. 

28import opentelemetry.sdk.metrics._internal 

29from opentelemetry.context import ( 

30 _SUPPRESS_INSTRUMENTATION_KEY, 

31 attach, 

32 detach, 

33 set_value, 

34) 

35from opentelemetry.sdk.environment_variables import ( 

36 OTEL_METRIC_EXPORT_INTERVAL, 

37 OTEL_METRIC_EXPORT_TIMEOUT, 

38) 

39from opentelemetry.sdk.metrics._internal.aggregation import ( 

40 AggregationTemporality, 

41 DefaultAggregation, 

42) 

43from opentelemetry.sdk.metrics._internal.instrument import ( 

44 Counter, 

45 Histogram, 

46 ObservableCounter, 

47 ObservableGauge, 

48 ObservableUpDownCounter, 

49 UpDownCounter, 

50 _Counter, 

51 _Histogram, 

52 _ObservableCounter, 

53 _ObservableGauge, 

54 _ObservableUpDownCounter, 

55 _UpDownCounter, 

56) 

57from opentelemetry.sdk.metrics._internal.point import MetricsData 

58from opentelemetry.util._once import Once 

59 

60_logger = getLogger(__name__) 

61 

62 

63class MetricExportResult(Enum): 

64 """Result of exporting a metric 

65 

66 Can be any of the following values:""" 

67 

68 SUCCESS = 0 

69 FAILURE = 1 

70 

71 

72class MetricExporter(ABC): 

73 """Interface for exporting metrics. 

74 

75 Interface to be implemented by services that want to export metrics received 

76 in their own format. 

77 

78 Args: 

79 preferred_temporality: Used by `opentelemetry.sdk.metrics.export.PeriodicExportingMetricReader` to 

80 configure exporter level preferred temporality. See `opentelemetry.sdk.metrics.export.MetricReader` for 

81 more details on what preferred temporality is. 

82 preferred_aggregation: Used by `opentelemetry.sdk.metrics.export.PeriodicExportingMetricReader` to 

83 configure exporter level preferred aggregation. See `opentelemetry.sdk.metrics.export.MetricReader` for 

84 more details on what preferred aggregation is. 

85 """ 

86 

87 def __init__( 

88 self, 

89 preferred_temporality: Dict[type, AggregationTemporality] = None, 

90 preferred_aggregation: Dict[ 

91 type, "opentelemetry.sdk.metrics.view.Aggregation" 

92 ] = None, 

93 ) -> None: 

94 self._preferred_temporality = preferred_temporality 

95 self._preferred_aggregation = preferred_aggregation 

96 

97 @abstractmethod 

98 def export( 

99 self, 

100 metrics_data: MetricsData, 

101 timeout_millis: float = 10_000, 

102 **kwargs, 

103 ) -> MetricExportResult: 

104 """Exports a batch of telemetry data. 

105 

106 Args: 

107 metrics: The list of `opentelemetry.sdk.metrics.export.Metric` objects to be exported 

108 

109 Returns: 

110 The result of the export 

111 """ 

112 

113 @abstractmethod 

114 def force_flush(self, timeout_millis: float = 10_000) -> bool: 

115 """ 

116 Ensure that export of any metrics currently received by the exporter 

117 are completed as soon as possible. 

118 """ 

119 

120 @abstractmethod 

121 def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: 

122 """Shuts down the exporter. 

123 

124 Called when the SDK is shut down. 

125 """ 

126 

127 

128class ConsoleMetricExporter(MetricExporter): 

129 """Implementation of :class:`MetricExporter` that prints metrics to the 

130 console. 

131 

132 This class can be used for diagnostic purposes. It prints the exported 

133 metrics to the console STDOUT. 

134 """ 

135 

136 def __init__( 

137 self, 

138 out: IO = stdout, 

139 formatter: Callable[ 

140 ["opentelemetry.sdk.metrics.export.MetricsData"], str 

141 ] = lambda metrics_data: metrics_data.to_json() 

142 + linesep, 

143 preferred_temporality: Dict[type, AggregationTemporality] = None, 

144 preferred_aggregation: Dict[ 

145 type, "opentelemetry.sdk.metrics.view.Aggregation" 

146 ] = None, 

147 ): 

148 super().__init__( 

149 preferred_temporality=preferred_temporality, 

150 preferred_aggregation=preferred_aggregation, 

151 ) 

152 self.out = out 

153 self.formatter = formatter 

154 

155 def export( 

156 self, 

157 metrics_data: MetricsData, 

158 timeout_millis: float = 10_000, 

159 **kwargs, 

160 ) -> MetricExportResult: 

161 self.out.write(self.formatter(metrics_data)) 

162 self.out.flush() 

163 return MetricExportResult.SUCCESS 

164 

165 def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: 

166 pass 

167 

168 def force_flush(self, timeout_millis: float = 10_000) -> bool: 

169 return True 

170 

171 

172class MetricReader(ABC): 

173 # pylint: disable=too-many-branches 

174 """ 

175 Base class for all metric readers 

176 

177 Args: 

178 preferred_temporality: A mapping between instrument classes and 

179 aggregation temporality. By default uses CUMULATIVE for all instrument 

180 classes. This mapping will be used to define the default aggregation 

181 temporality of every instrument class. If the user wants to make a 

182 change in the default aggregation temporality of an instrument class, 

183 it is enough to pass here a dictionary whose keys are the instrument 

184 classes and the values are the corresponding desired aggregation 

185 temporalities of the classes that the user wants to change, not all of 

186 them. The classes not included in the passed dictionary will retain 

187 their association to their default aggregation temporalities. 

188 preferred_aggregation: A mapping between instrument classes and 

189 aggregation instances. By default maps all instrument classes to an 

190 instance of `DefaultAggregation`. This mapping will be used to 

191 define the default aggregation of every instrument class. If the 

192 user wants to make a change in the default aggregation of an 

193 instrument class, it is enough to pass here a dictionary whose keys 

194 are the instrument classes and the values are the corresponding 

195 desired aggregation for the instrument classes that the user wants 

196 to change, not necessarily all of them. The classes not included in 

197 the passed dictionary will retain their association to their 

198 default aggregations. The aggregation defined here will be 

199 overridden by an aggregation defined by a view that is not 

200 `DefaultAggregation`. 

201 

202 .. document protected _receive_metrics which is a intended to be overridden by subclass 

203 .. automethod:: _receive_metrics 

204 """ 

205 

206 def __init__( 

207 self, 

208 preferred_temporality: Dict[type, AggregationTemporality] = None, 

209 preferred_aggregation: Dict[ 

210 type, "opentelemetry.sdk.metrics.view.Aggregation" 

211 ] = None, 

212 ) -> None: 

213 self._collect: Callable[ 

214 [ 

215 "opentelemetry.sdk.metrics.export.MetricReader", 

216 AggregationTemporality, 

217 ], 

218 Iterable["opentelemetry.sdk.metrics.export.Metric"], 

219 ] = None 

220 

221 self._instrument_class_temporality = { 

222 _Counter: AggregationTemporality.CUMULATIVE, 

223 _UpDownCounter: AggregationTemporality.CUMULATIVE, 

224 _Histogram: AggregationTemporality.CUMULATIVE, 

225 _ObservableCounter: AggregationTemporality.CUMULATIVE, 

226 _ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, 

227 _ObservableGauge: AggregationTemporality.CUMULATIVE, 

228 } 

229 

230 if preferred_temporality is not None: 

231 for temporality in preferred_temporality.values(): 

232 if temporality not in ( 

233 AggregationTemporality.CUMULATIVE, 

234 AggregationTemporality.DELTA, 

235 ): 

236 raise Exception( 

237 f"Invalid temporality value found {temporality}" 

238 ) 

239 

240 if preferred_temporality is not None: 

241 for typ, temporality in preferred_temporality.items(): 

242 if typ is Counter: 

243 self._instrument_class_temporality[_Counter] = temporality 

244 elif typ is UpDownCounter: 

245 self._instrument_class_temporality[ 

246 _UpDownCounter 

247 ] = temporality 

248 elif typ is Histogram: 

249 self._instrument_class_temporality[ 

250 _Histogram 

251 ] = temporality 

252 elif typ is ObservableCounter: 

253 self._instrument_class_temporality[ 

254 _ObservableCounter 

255 ] = temporality 

256 elif typ is ObservableUpDownCounter: 

257 self._instrument_class_temporality[ 

258 _ObservableUpDownCounter 

259 ] = temporality 

260 elif typ is ObservableGauge: 

261 self._instrument_class_temporality[ 

262 _ObservableGauge 

263 ] = temporality 

264 else: 

265 raise Exception(f"Invalid instrument class found {typ}") 

266 

267 self._preferred_temporality = preferred_temporality 

268 self._instrument_class_aggregation = { 

269 _Counter: DefaultAggregation(), 

270 _UpDownCounter: DefaultAggregation(), 

271 _Histogram: DefaultAggregation(), 

272 _ObservableCounter: DefaultAggregation(), 

273 _ObservableUpDownCounter: DefaultAggregation(), 

274 _ObservableGauge: DefaultAggregation(), 

275 } 

276 

277 if preferred_aggregation is not None: 

278 for typ, aggregation in preferred_aggregation.items(): 

279 if typ is Counter: 

280 self._instrument_class_aggregation[_Counter] = aggregation 

281 elif typ is UpDownCounter: 

282 self._instrument_class_aggregation[ 

283 _UpDownCounter 

284 ] = aggregation 

285 elif typ is Histogram: 

286 self._instrument_class_aggregation[ 

287 _Histogram 

288 ] = aggregation 

289 elif typ is ObservableCounter: 

290 self._instrument_class_aggregation[ 

291 _ObservableCounter 

292 ] = aggregation 

293 elif typ is ObservableUpDownCounter: 

294 self._instrument_class_aggregation[ 

295 _ObservableUpDownCounter 

296 ] = aggregation 

297 elif typ is ObservableGauge: 

298 self._instrument_class_aggregation[ 

299 _ObservableGauge 

300 ] = aggregation 

301 else: 

302 raise Exception(f"Invalid instrument class found {typ}") 

303 

304 @final 

305 def collect(self, timeout_millis: float = 10_000) -> None: 

306 """Collects the metrics from the internal SDK state and 

307 invokes the `_receive_metrics` with the collection. 

308 

309 Args: 

310 timeout_millis: Amount of time in milliseconds before this function 

311 raises a timeout error. 

312 

313 If any of the underlying ``collect`` methods called by this method 

314 fails by any reason (including timeout) an exception will be raised 

315 detailing the individual errors that caused this function to fail. 

316 """ 

317 if self._collect is None: 

318 _logger.warning( 

319 "Cannot call collect on a MetricReader until it is registered on a MeterProvider" 

320 ) 

321 return 

322 

323 self._receive_metrics( 

324 self._collect(self, timeout_millis=timeout_millis), 

325 timeout_millis=timeout_millis, 

326 ) 

327 

328 @final 

329 def _set_collect_callback( 

330 self, 

331 func: Callable[ 

332 [ 

333 "opentelemetry.sdk.metrics.export.MetricReader", 

334 AggregationTemporality, 

335 ], 

336 Iterable["opentelemetry.sdk.metrics.export.Metric"], 

337 ], 

338 ) -> None: 

339 """This function is internal to the SDK. It should not be called or overridden by users""" 

340 self._collect = func 

341 

342 @abstractmethod 

343 def _receive_metrics( 

344 self, 

345 metrics_data: "opentelemetry.sdk.metrics.export.MetricsData", 

346 timeout_millis: float = 10_000, 

347 **kwargs, 

348 ) -> None: 

349 """Called by `MetricReader.collect` when it receives a batch of metrics""" 

350 

351 def force_flush(self, timeout_millis: float = 10_000) -> bool: 

352 self.collect(timeout_millis=timeout_millis) 

353 return True 

354 

355 @abstractmethod 

356 def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: 

357 """Shuts down the MetricReader. This method provides a way 

358 for the MetricReader to do any cleanup required. A metric reader can 

359 only be shutdown once, any subsequent calls are ignored and return 

360 failure status. 

361 

362 When a `MetricReader` is registered on a 

363 :class:`~opentelemetry.sdk.metrics.MeterProvider`, 

364 :meth:`~opentelemetry.sdk.metrics.MeterProvider.shutdown` will invoke this 

365 automatically. 

366 """ 

367 

368 

369class InMemoryMetricReader(MetricReader): 

370 """Implementation of `MetricReader` that returns its metrics from :func:`get_metrics_data`. 

371 

372 This is useful for e.g. unit tests. 

373 """ 

374 

375 def __init__( 

376 self, 

377 preferred_temporality: Dict[type, AggregationTemporality] = None, 

378 preferred_aggregation: Dict[ 

379 type, "opentelemetry.sdk.metrics.view.Aggregation" 

380 ] = None, 

381 ) -> None: 

382 super().__init__( 

383 preferred_temporality=preferred_temporality, 

384 preferred_aggregation=preferred_aggregation, 

385 ) 

386 self._lock = RLock() 

387 self._metrics_data: ( 

388 "opentelemetry.sdk.metrics.export.MetricsData" 

389 ) = None 

390 

391 def get_metrics_data( 

392 self, 

393 ) -> ("opentelemetry.sdk.metrics.export.MetricsData"): 

394 """Reads and returns current metrics from the SDK""" 

395 with self._lock: 

396 self.collect() 

397 metrics_data = self._metrics_data 

398 self._metrics_data = None 

399 return metrics_data 

400 

401 def _receive_metrics( 

402 self, 

403 metrics_data: "opentelemetry.sdk.metrics.export.MetricsData", 

404 timeout_millis: float = 10_000, 

405 **kwargs, 

406 ) -> None: 

407 with self._lock: 

408 self._metrics_data = metrics_data 

409 

410 def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: 

411 pass 

412 

413 

414class PeriodicExportingMetricReader(MetricReader): 

415 """`PeriodicExportingMetricReader` is an implementation of `MetricReader` 

416 that collects metrics based on a user-configurable time interval, and passes the 

417 metrics to the configured exporter. 

418 

419 The configured exporter's :py:meth:`~MetricExporter.export` method will not be called 

420 concurrently. 

421 """ 

422 

423 def __init__( 

424 self, 

425 exporter: MetricExporter, 

426 export_interval_millis: Optional[float] = None, 

427 export_timeout_millis: Optional[float] = None, 

428 ) -> None: 

429 # PeriodicExportingMetricReader defers to exporter for configuration 

430 super().__init__( 

431 preferred_temporality=exporter._preferred_temporality, 

432 preferred_aggregation=exporter._preferred_aggregation, 

433 ) 

434 

435 # This lock is held whenever calling self._exporter.export() to prevent concurrent 

436 # execution of MetricExporter.export() 

437 # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exportbatch 

438 self._export_lock = Lock() 

439 

440 self._exporter = exporter 

441 if export_interval_millis is None: 

442 try: 

443 export_interval_millis = float( 

444 environ.get(OTEL_METRIC_EXPORT_INTERVAL, 60000) 

445 ) 

446 except ValueError: 

447 _logger.warning( 

448 "Found invalid value for export interval, using default" 

449 ) 

450 export_interval_millis = 60000 

451 if export_timeout_millis is None: 

452 try: 

453 export_timeout_millis = float( 

454 environ.get(OTEL_METRIC_EXPORT_TIMEOUT, 30000) 

455 ) 

456 except ValueError: 

457 _logger.warning( 

458 "Found invalid value for export timeout, using default" 

459 ) 

460 export_timeout_millis = 30000 

461 self._export_interval_millis = export_interval_millis 

462 self._export_timeout_millis = export_timeout_millis 

463 self._shutdown = False 

464 self._shutdown_event = Event() 

465 self._shutdown_once = Once() 

466 self._daemon_thread = Thread( 

467 name="OtelPeriodicExportingMetricReader", 

468 target=self._ticker, 

469 daemon=True, 

470 ) 

471 self._daemon_thread.start() 

472 if hasattr(os, "register_at_fork"): 

473 os.register_at_fork( 

474 after_in_child=self._at_fork_reinit 

475 ) # pylint: disable=protected-access 

476 

477 def _at_fork_reinit(self): 

478 self._daemon_thread = Thread( 

479 name="OtelPeriodicExportingMetricReader", 

480 target=self._ticker, 

481 daemon=True, 

482 ) 

483 self._daemon_thread.start() 

484 

485 def _ticker(self) -> None: 

486 interval_secs = self._export_interval_millis / 1e3 

487 while not self._shutdown_event.wait(interval_secs): 

488 self.collect(timeout_millis=self._export_timeout_millis) 

489 # one last collection below before shutting down completely 

490 self.collect(timeout_millis=self._export_interval_millis) 

491 

492 def _receive_metrics( 

493 self, 

494 metrics_data: MetricsData, 

495 timeout_millis: float = 10_000, 

496 **kwargs, 

497 ) -> None: 

498 if metrics_data is None: 

499 return 

500 token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) 

501 try: 

502 with self._export_lock: 

503 self._exporter.export( 

504 metrics_data, timeout_millis=timeout_millis 

505 ) 

506 except Exception as e: # pylint: disable=broad-except,invalid-name 

507 _logger.exception("Exception while exporting metrics %s", str(e)) 

508 detach(token) 

509 

510 def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: 

511 deadline_ns = time_ns() + timeout_millis * 10**6 

512 

513 def _shutdown(): 

514 self._shutdown = True 

515 

516 did_set = self._shutdown_once.do_once(_shutdown) 

517 if not did_set: 

518 _logger.warning("Can't shutdown multiple times") 

519 return 

520 

521 self._shutdown_event.set() 

522 self._daemon_thread.join(timeout=(deadline_ns - time_ns()) / 10**9) 

523 self._exporter.shutdown(timeout=(deadline_ns - time_ns()) / 10**6) 

524 

525 def force_flush(self, timeout_millis: float = 10_000) -> bool: 

526 super().force_flush(timeout_millis=timeout_millis) 

527 self._exporter.force_flush(timeout_millis=timeout_millis) 

528 return True