Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/opentelemetry/sdk/metrics/_internal/export/__init__.py: 33%
175 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Copyright The OpenTelemetry Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import os
16from abc import ABC, abstractmethod
17from enum import Enum
18from logging import getLogger
19from os import environ, linesep
20from sys import stdout
21from threading import Event, Lock, RLock, Thread
22from time import time_ns
23from typing import IO, Callable, Dict, Iterable, Optional
25from typing_extensions import final
27# This kind of import is needed to avoid Sphinx errors.
28import opentelemetry.sdk.metrics._internal
29from opentelemetry.context import (
30 _SUPPRESS_INSTRUMENTATION_KEY,
31 attach,
32 detach,
33 set_value,
34)
35from opentelemetry.sdk.environment_variables import (
36 OTEL_METRIC_EXPORT_INTERVAL,
37 OTEL_METRIC_EXPORT_TIMEOUT,
38)
39from opentelemetry.sdk.metrics._internal.aggregation import (
40 AggregationTemporality,
41 DefaultAggregation,
42)
43from opentelemetry.sdk.metrics._internal.instrument import (
44 Counter,
45 Histogram,
46 ObservableCounter,
47 ObservableGauge,
48 ObservableUpDownCounter,
49 UpDownCounter,
50 _Counter,
51 _Histogram,
52 _ObservableCounter,
53 _ObservableGauge,
54 _ObservableUpDownCounter,
55 _UpDownCounter,
56)
57from opentelemetry.sdk.metrics._internal.point import MetricsData
58from opentelemetry.util._once import Once
60_logger = getLogger(__name__)
63class MetricExportResult(Enum):
64 """Result of exporting a metric
66 Can be any of the following values:"""
68 SUCCESS = 0
69 FAILURE = 1
72class MetricExporter(ABC):
73 """Interface for exporting metrics.
75 Interface to be implemented by services that want to export metrics received
76 in their own format.
78 Args:
79 preferred_temporality: Used by `opentelemetry.sdk.metrics.export.PeriodicExportingMetricReader` to
80 configure exporter level preferred temporality. See `opentelemetry.sdk.metrics.export.MetricReader` for
81 more details on what preferred temporality is.
82 preferred_aggregation: Used by `opentelemetry.sdk.metrics.export.PeriodicExportingMetricReader` to
83 configure exporter level preferred aggregation. See `opentelemetry.sdk.metrics.export.MetricReader` for
84 more details on what preferred aggregation is.
85 """
87 def __init__(
88 self,
89 preferred_temporality: Dict[type, AggregationTemporality] = None,
90 preferred_aggregation: Dict[
91 type, "opentelemetry.sdk.metrics.view.Aggregation"
92 ] = None,
93 ) -> None:
94 self._preferred_temporality = preferred_temporality
95 self._preferred_aggregation = preferred_aggregation
97 @abstractmethod
98 def export(
99 self,
100 metrics_data: MetricsData,
101 timeout_millis: float = 10_000,
102 **kwargs,
103 ) -> MetricExportResult:
104 """Exports a batch of telemetry data.
106 Args:
107 metrics: The list of `opentelemetry.sdk.metrics.export.Metric` objects to be exported
109 Returns:
110 The result of the export
111 """
113 @abstractmethod
114 def force_flush(self, timeout_millis: float = 10_000) -> bool:
115 """
116 Ensure that export of any metrics currently received by the exporter
117 are completed as soon as possible.
118 """
120 @abstractmethod
121 def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
122 """Shuts down the exporter.
124 Called when the SDK is shut down.
125 """
128class ConsoleMetricExporter(MetricExporter):
129 """Implementation of :class:`MetricExporter` that prints metrics to the
130 console.
132 This class can be used for diagnostic purposes. It prints the exported
133 metrics to the console STDOUT.
134 """
136 def __init__(
137 self,
138 out: IO = stdout,
139 formatter: Callable[
140 ["opentelemetry.sdk.metrics.export.MetricsData"], str
141 ] = lambda metrics_data: metrics_data.to_json()
142 + linesep,
143 preferred_temporality: Dict[type, AggregationTemporality] = None,
144 preferred_aggregation: Dict[
145 type, "opentelemetry.sdk.metrics.view.Aggregation"
146 ] = None,
147 ):
148 super().__init__(
149 preferred_temporality=preferred_temporality,
150 preferred_aggregation=preferred_aggregation,
151 )
152 self.out = out
153 self.formatter = formatter
155 def export(
156 self,
157 metrics_data: MetricsData,
158 timeout_millis: float = 10_000,
159 **kwargs,
160 ) -> MetricExportResult:
161 self.out.write(self.formatter(metrics_data))
162 self.out.flush()
163 return MetricExportResult.SUCCESS
165 def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
166 pass
168 def force_flush(self, timeout_millis: float = 10_000) -> bool:
169 return True
172class MetricReader(ABC):
173 # pylint: disable=too-many-branches
174 """
175 Base class for all metric readers
177 Args:
178 preferred_temporality: A mapping between instrument classes and
179 aggregation temporality. By default uses CUMULATIVE for all instrument
180 classes. This mapping will be used to define the default aggregation
181 temporality of every instrument class. If the user wants to make a
182 change in the default aggregation temporality of an instrument class,
183 it is enough to pass here a dictionary whose keys are the instrument
184 classes and the values are the corresponding desired aggregation
185 temporalities of the classes that the user wants to change, not all of
186 them. The classes not included in the passed dictionary will retain
187 their association to their default aggregation temporalities.
188 preferred_aggregation: A mapping between instrument classes and
189 aggregation instances. By default maps all instrument classes to an
190 instance of `DefaultAggregation`. This mapping will be used to
191 define the default aggregation of every instrument class. If the
192 user wants to make a change in the default aggregation of an
193 instrument class, it is enough to pass here a dictionary whose keys
194 are the instrument classes and the values are the corresponding
195 desired aggregation for the instrument classes that the user wants
196 to change, not necessarily all of them. The classes not included in
197 the passed dictionary will retain their association to their
198 default aggregations. The aggregation defined here will be
199 overridden by an aggregation defined by a view that is not
200 `DefaultAggregation`.
202 .. document protected _receive_metrics which is a intended to be overridden by subclass
203 .. automethod:: _receive_metrics
204 """
206 def __init__(
207 self,
208 preferred_temporality: Dict[type, AggregationTemporality] = None,
209 preferred_aggregation: Dict[
210 type, "opentelemetry.sdk.metrics.view.Aggregation"
211 ] = None,
212 ) -> None:
213 self._collect: Callable[
214 [
215 "opentelemetry.sdk.metrics.export.MetricReader",
216 AggregationTemporality,
217 ],
218 Iterable["opentelemetry.sdk.metrics.export.Metric"],
219 ] = None
221 self._instrument_class_temporality = {
222 _Counter: AggregationTemporality.CUMULATIVE,
223 _UpDownCounter: AggregationTemporality.CUMULATIVE,
224 _Histogram: AggregationTemporality.CUMULATIVE,
225 _ObservableCounter: AggregationTemporality.CUMULATIVE,
226 _ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
227 _ObservableGauge: AggregationTemporality.CUMULATIVE,
228 }
230 if preferred_temporality is not None:
231 for temporality in preferred_temporality.values():
232 if temporality not in (
233 AggregationTemporality.CUMULATIVE,
234 AggregationTemporality.DELTA,
235 ):
236 raise Exception(
237 f"Invalid temporality value found {temporality}"
238 )
240 if preferred_temporality is not None:
241 for typ, temporality in preferred_temporality.items():
242 if typ is Counter:
243 self._instrument_class_temporality[_Counter] = temporality
244 elif typ is UpDownCounter:
245 self._instrument_class_temporality[
246 _UpDownCounter
247 ] = temporality
248 elif typ is Histogram:
249 self._instrument_class_temporality[
250 _Histogram
251 ] = temporality
252 elif typ is ObservableCounter:
253 self._instrument_class_temporality[
254 _ObservableCounter
255 ] = temporality
256 elif typ is ObservableUpDownCounter:
257 self._instrument_class_temporality[
258 _ObservableUpDownCounter
259 ] = temporality
260 elif typ is ObservableGauge:
261 self._instrument_class_temporality[
262 _ObservableGauge
263 ] = temporality
264 else:
265 raise Exception(f"Invalid instrument class found {typ}")
267 self._preferred_temporality = preferred_temporality
268 self._instrument_class_aggregation = {
269 _Counter: DefaultAggregation(),
270 _UpDownCounter: DefaultAggregation(),
271 _Histogram: DefaultAggregation(),
272 _ObservableCounter: DefaultAggregation(),
273 _ObservableUpDownCounter: DefaultAggregation(),
274 _ObservableGauge: DefaultAggregation(),
275 }
277 if preferred_aggregation is not None:
278 for typ, aggregation in preferred_aggregation.items():
279 if typ is Counter:
280 self._instrument_class_aggregation[_Counter] = aggregation
281 elif typ is UpDownCounter:
282 self._instrument_class_aggregation[
283 _UpDownCounter
284 ] = aggregation
285 elif typ is Histogram:
286 self._instrument_class_aggregation[
287 _Histogram
288 ] = aggregation
289 elif typ is ObservableCounter:
290 self._instrument_class_aggregation[
291 _ObservableCounter
292 ] = aggregation
293 elif typ is ObservableUpDownCounter:
294 self._instrument_class_aggregation[
295 _ObservableUpDownCounter
296 ] = aggregation
297 elif typ is ObservableGauge:
298 self._instrument_class_aggregation[
299 _ObservableGauge
300 ] = aggregation
301 else:
302 raise Exception(f"Invalid instrument class found {typ}")
304 @final
305 def collect(self, timeout_millis: float = 10_000) -> None:
306 """Collects the metrics from the internal SDK state and
307 invokes the `_receive_metrics` with the collection.
309 Args:
310 timeout_millis: Amount of time in milliseconds before this function
311 raises a timeout error.
313 If any of the underlying ``collect`` methods called by this method
314 fails by any reason (including timeout) an exception will be raised
315 detailing the individual errors that caused this function to fail.
316 """
317 if self._collect is None:
318 _logger.warning(
319 "Cannot call collect on a MetricReader until it is registered on a MeterProvider"
320 )
321 return
323 self._receive_metrics(
324 self._collect(self, timeout_millis=timeout_millis),
325 timeout_millis=timeout_millis,
326 )
328 @final
329 def _set_collect_callback(
330 self,
331 func: Callable[
332 [
333 "opentelemetry.sdk.metrics.export.MetricReader",
334 AggregationTemporality,
335 ],
336 Iterable["opentelemetry.sdk.metrics.export.Metric"],
337 ],
338 ) -> None:
339 """This function is internal to the SDK. It should not be called or overridden by users"""
340 self._collect = func
342 @abstractmethod
343 def _receive_metrics(
344 self,
345 metrics_data: "opentelemetry.sdk.metrics.export.MetricsData",
346 timeout_millis: float = 10_000,
347 **kwargs,
348 ) -> None:
349 """Called by `MetricReader.collect` when it receives a batch of metrics"""
351 def force_flush(self, timeout_millis: float = 10_000) -> bool:
352 self.collect(timeout_millis=timeout_millis)
353 return True
355 @abstractmethod
356 def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
357 """Shuts down the MetricReader. This method provides a way
358 for the MetricReader to do any cleanup required. A metric reader can
359 only be shutdown once, any subsequent calls are ignored and return
360 failure status.
362 When a `MetricReader` is registered on a
363 :class:`~opentelemetry.sdk.metrics.MeterProvider`,
364 :meth:`~opentelemetry.sdk.metrics.MeterProvider.shutdown` will invoke this
365 automatically.
366 """
369class InMemoryMetricReader(MetricReader):
370 """Implementation of `MetricReader` that returns its metrics from :func:`get_metrics_data`.
372 This is useful for e.g. unit tests.
373 """
375 def __init__(
376 self,
377 preferred_temporality: Dict[type, AggregationTemporality] = None,
378 preferred_aggregation: Dict[
379 type, "opentelemetry.sdk.metrics.view.Aggregation"
380 ] = None,
381 ) -> None:
382 super().__init__(
383 preferred_temporality=preferred_temporality,
384 preferred_aggregation=preferred_aggregation,
385 )
386 self._lock = RLock()
387 self._metrics_data: (
388 "opentelemetry.sdk.metrics.export.MetricsData"
389 ) = None
391 def get_metrics_data(
392 self,
393 ) -> ("opentelemetry.sdk.metrics.export.MetricsData"):
394 """Reads and returns current metrics from the SDK"""
395 with self._lock:
396 self.collect()
397 metrics_data = self._metrics_data
398 self._metrics_data = None
399 return metrics_data
401 def _receive_metrics(
402 self,
403 metrics_data: "opentelemetry.sdk.metrics.export.MetricsData",
404 timeout_millis: float = 10_000,
405 **kwargs,
406 ) -> None:
407 with self._lock:
408 self._metrics_data = metrics_data
410 def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
411 pass
414class PeriodicExportingMetricReader(MetricReader):
415 """`PeriodicExportingMetricReader` is an implementation of `MetricReader`
416 that collects metrics based on a user-configurable time interval, and passes the
417 metrics to the configured exporter.
419 The configured exporter's :py:meth:`~MetricExporter.export` method will not be called
420 concurrently.
421 """
423 def __init__(
424 self,
425 exporter: MetricExporter,
426 export_interval_millis: Optional[float] = None,
427 export_timeout_millis: Optional[float] = None,
428 ) -> None:
429 # PeriodicExportingMetricReader defers to exporter for configuration
430 super().__init__(
431 preferred_temporality=exporter._preferred_temporality,
432 preferred_aggregation=exporter._preferred_aggregation,
433 )
435 # This lock is held whenever calling self._exporter.export() to prevent concurrent
436 # execution of MetricExporter.export()
437 # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exportbatch
438 self._export_lock = Lock()
440 self._exporter = exporter
441 if export_interval_millis is None:
442 try:
443 export_interval_millis = float(
444 environ.get(OTEL_METRIC_EXPORT_INTERVAL, 60000)
445 )
446 except ValueError:
447 _logger.warning(
448 "Found invalid value for export interval, using default"
449 )
450 export_interval_millis = 60000
451 if export_timeout_millis is None:
452 try:
453 export_timeout_millis = float(
454 environ.get(OTEL_METRIC_EXPORT_TIMEOUT, 30000)
455 )
456 except ValueError:
457 _logger.warning(
458 "Found invalid value for export timeout, using default"
459 )
460 export_timeout_millis = 30000
461 self._export_interval_millis = export_interval_millis
462 self._export_timeout_millis = export_timeout_millis
463 self._shutdown = False
464 self._shutdown_event = Event()
465 self._shutdown_once = Once()
466 self._daemon_thread = Thread(
467 name="OtelPeriodicExportingMetricReader",
468 target=self._ticker,
469 daemon=True,
470 )
471 self._daemon_thread.start()
472 if hasattr(os, "register_at_fork"):
473 os.register_at_fork(
474 after_in_child=self._at_fork_reinit
475 ) # pylint: disable=protected-access
477 def _at_fork_reinit(self):
478 self._daemon_thread = Thread(
479 name="OtelPeriodicExportingMetricReader",
480 target=self._ticker,
481 daemon=True,
482 )
483 self._daemon_thread.start()
485 def _ticker(self) -> None:
486 interval_secs = self._export_interval_millis / 1e3
487 while not self._shutdown_event.wait(interval_secs):
488 self.collect(timeout_millis=self._export_timeout_millis)
489 # one last collection below before shutting down completely
490 self.collect(timeout_millis=self._export_interval_millis)
492 def _receive_metrics(
493 self,
494 metrics_data: MetricsData,
495 timeout_millis: float = 10_000,
496 **kwargs,
497 ) -> None:
498 if metrics_data is None:
499 return
500 token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
501 try:
502 with self._export_lock:
503 self._exporter.export(
504 metrics_data, timeout_millis=timeout_millis
505 )
506 except Exception as e: # pylint: disable=broad-except,invalid-name
507 _logger.exception("Exception while exporting metrics %s", str(e))
508 detach(token)
510 def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
511 deadline_ns = time_ns() + timeout_millis * 10**6
513 def _shutdown():
514 self._shutdown = True
516 did_set = self._shutdown_once.do_once(_shutdown)
517 if not did_set:
518 _logger.warning("Can't shutdown multiple times")
519 return
521 self._shutdown_event.set()
522 self._daemon_thread.join(timeout=(deadline_ns - time_ns()) / 10**9)
523 self._exporter.shutdown(timeout=(deadline_ns - time_ns()) / 10**6)
525 def force_flush(self, timeout_millis: float = 10_000) -> bool:
526 super().force_flush(timeout_millis=timeout_millis)
527 self._exporter.force_flush(timeout_millis=timeout_millis)
528 return True