Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/opentelemetry/sdk/metrics/_internal/measurement_consumer.py: 48%
50 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Copyright The OpenTelemetry Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15# pylint: disable=unused-import
17from abc import ABC, abstractmethod
18from threading import Lock
19from time import time_ns
20from typing import Iterable, List, Mapping
22# This kind of import is needed to avoid Sphinx errors.
23import opentelemetry.sdk.metrics
24import opentelemetry.sdk.metrics._internal.instrument
25import opentelemetry.sdk.metrics._internal.sdk_configuration
26from opentelemetry.metrics._internal.instrument import CallbackOptions
27from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError
28from opentelemetry.sdk.metrics._internal.measurement import Measurement
29from opentelemetry.sdk.metrics._internal.metric_reader_storage import (
30 MetricReaderStorage,
31)
32from opentelemetry.sdk.metrics._internal.point import Metric
35class MeasurementConsumer(ABC):
36 @abstractmethod
37 def consume_measurement(self, measurement: Measurement) -> None:
38 pass
40 @abstractmethod
41 def register_asynchronous_instrument(
42 self,
43 instrument: (
44 "opentelemetry.sdk.metrics._internal.instrument_Asynchronous"
45 ),
46 ):
47 pass
49 @abstractmethod
50 def collect(
51 self,
52 metric_reader: "opentelemetry.sdk.metrics.MetricReader",
53 timeout_millis: float = 10_000,
54 ) -> Iterable[Metric]:
55 pass
58class SynchronousMeasurementConsumer(MeasurementConsumer):
59 def __init__(
60 self,
61 sdk_config: "opentelemetry.sdk.metrics._internal.SdkConfiguration",
62 ) -> None:
63 self._lock = Lock()
64 self._sdk_config = sdk_config
65 # should never be mutated
66 self._reader_storages: Mapping[
67 "opentelemetry.sdk.metrics.MetricReader", MetricReaderStorage
68 ] = {
69 reader: MetricReaderStorage(
70 sdk_config,
71 reader._instrument_class_temporality,
72 reader._instrument_class_aggregation,
73 )
74 for reader in sdk_config.metric_readers
75 }
76 self._async_instruments: List[
77 "opentelemetry.sdk.metrics._internal.instrument._Asynchronous"
78 ] = []
80 def consume_measurement(self, measurement: Measurement) -> None:
81 for reader_storage in self._reader_storages.values():
82 reader_storage.consume_measurement(measurement)
84 def register_asynchronous_instrument(
85 self,
86 instrument: (
87 "opentelemetry.sdk.metrics._internal.instrument._Asynchronous"
88 ),
89 ) -> None:
90 with self._lock:
91 self._async_instruments.append(instrument)
93 def collect(
94 self,
95 metric_reader: "opentelemetry.sdk.metrics.MetricReader",
96 timeout_millis: float = 10_000,
97 ) -> Iterable[Metric]:
99 with self._lock:
100 metric_reader_storage = self._reader_storages[metric_reader]
101 # for now, just use the defaults
102 callback_options = CallbackOptions()
103 deadline_ns = time_ns() + timeout_millis * 10**6
105 default_timeout_millis = 10000 * 10**6
107 for async_instrument in self._async_instruments:
109 remaining_time = deadline_ns - time_ns()
111 if remaining_time < default_timeout_millis:
113 callback_options = CallbackOptions(
114 timeout_millis=remaining_time
115 )
117 measurements = async_instrument.callback(callback_options)
118 if time_ns() >= deadline_ns:
119 raise MetricsTimeoutError(
120 "Timed out while executing callback"
121 )
123 for measurement in measurements:
124 metric_reader_storage.consume_measurement(measurement)
126 return self._reader_storages[metric_reader].collect()