Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/opentelemetry/sdk/metrics/_internal/instrument.py: 39%
120 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Copyright The OpenTelemetry Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15# pylint: disable=too-many-ancestors, unused-import
17from logging import getLogger
18from typing import Dict, Generator, Iterable, List, Optional, Union
20# This kind of import is needed to avoid Sphinx errors.
21import opentelemetry.sdk.metrics
22from opentelemetry.metrics import CallbackT
23from opentelemetry.metrics import Counter as APICounter
24from opentelemetry.metrics import Histogram as APIHistogram
25from opentelemetry.metrics import ObservableCounter as APIObservableCounter
26from opentelemetry.metrics import ObservableGauge as APIObservableGauge
27from opentelemetry.metrics import (
28 ObservableUpDownCounter as APIObservableUpDownCounter,
29)
30from opentelemetry.metrics import UpDownCounter as APIUpDownCounter
31from opentelemetry.metrics._internal.instrument import CallbackOptions
32from opentelemetry.sdk.metrics._internal.measurement import Measurement
33from opentelemetry.sdk.util.instrumentation import InstrumentationScope
35_logger = getLogger(__name__)
38_ERROR_MESSAGE = (
39 "Expected ASCII string of maximum length 63 characters but got {}"
40)
43class _Synchronous:
44 def __init__(
45 self,
46 name: str,
47 instrumentation_scope: InstrumentationScope,
48 measurement_consumer: "opentelemetry.sdk.metrics.MeasurementConsumer",
49 unit: str = "",
50 description: str = "",
51 ):
52 # pylint: disable=no-member
53 result = self._check_name_unit_description(name, unit, description)
55 if result["name"] is None:
56 raise Exception(_ERROR_MESSAGE.format(name))
58 if result["unit"] is None:
59 raise Exception(_ERROR_MESSAGE.format(unit))
61 name = result["name"]
62 unit = result["unit"]
63 description = result["description"]
65 self.name = name.lower()
66 self.unit = unit
67 self.description = description
68 self.instrumentation_scope = instrumentation_scope
69 self._measurement_consumer = measurement_consumer
70 super().__init__(name, unit=unit, description=description)
73class _Asynchronous:
74 def __init__(
75 self,
76 name: str,
77 instrumentation_scope: InstrumentationScope,
78 measurement_consumer: "opentelemetry.sdk.metrics.MeasurementConsumer",
79 callbacks: Optional[Iterable[CallbackT]] = None,
80 unit: str = "",
81 description: str = "",
82 ):
83 # pylint: disable=no-member
84 result = self._check_name_unit_description(name, unit, description)
86 if result["name"] is None:
87 raise Exception(_ERROR_MESSAGE.format(name))
89 if result["unit"] is None:
90 raise Exception(_ERROR_MESSAGE.format(unit))
92 name = result["name"]
93 unit = result["unit"]
94 description = result["description"]
96 self.name = name.lower()
97 self.unit = unit
98 self.description = description
99 self.instrumentation_scope = instrumentation_scope
100 self._measurement_consumer = measurement_consumer
101 super().__init__(name, callbacks, unit=unit, description=description)
103 self._callbacks: List[CallbackT] = []
105 if callbacks is not None:
107 for callback in callbacks:
109 if isinstance(callback, Generator):
111 # advance generator to it's first yield
112 next(callback)
114 def inner(
115 options: CallbackOptions,
116 callback=callback,
117 ) -> Iterable[Measurement]:
118 try:
119 return callback.send(options)
120 except StopIteration:
121 return []
123 self._callbacks.append(inner)
124 else:
125 self._callbacks.append(callback)
127 def callback(
128 self, callback_options: CallbackOptions
129 ) -> Iterable[Measurement]:
130 for callback in self._callbacks:
131 try:
132 for api_measurement in callback(callback_options):
133 yield Measurement(
134 api_measurement.value,
135 instrument=self,
136 attributes=api_measurement.attributes,
137 )
138 except Exception: # pylint: disable=broad-except
139 _logger.exception(
140 "Callback failed for instrument %s.", self.name
141 )
144class Counter(_Synchronous, APICounter):
145 def __new__(cls, *args, **kwargs):
146 if cls is Counter:
147 raise TypeError("Counter must be instantiated via a meter.")
148 return super().__new__(cls)
150 def add(
151 self, amount: Union[int, float], attributes: Dict[str, str] = None
152 ):
153 if amount < 0:
154 _logger.warning(
155 "Add amount must be non-negative on Counter %s.", self.name
156 )
157 return
158 self._measurement_consumer.consume_measurement(
159 Measurement(amount, self, attributes)
160 )
163class UpDownCounter(_Synchronous, APIUpDownCounter):
164 def __new__(cls, *args, **kwargs):
165 if cls is UpDownCounter:
166 raise TypeError("UpDownCounter must be instantiated via a meter.")
167 return super().__new__(cls)
169 def add(
170 self, amount: Union[int, float], attributes: Dict[str, str] = None
171 ):
172 self._measurement_consumer.consume_measurement(
173 Measurement(amount, self, attributes)
174 )
177class ObservableCounter(_Asynchronous, APIObservableCounter):
178 def __new__(cls, *args, **kwargs):
179 if cls is ObservableCounter:
180 raise TypeError(
181 "ObservableCounter must be instantiated via a meter."
182 )
183 return super().__new__(cls)
186class ObservableUpDownCounter(_Asynchronous, APIObservableUpDownCounter):
187 def __new__(cls, *args, **kwargs):
188 if cls is ObservableUpDownCounter:
189 raise TypeError(
190 "ObservableUpDownCounter must be instantiated via a meter."
191 )
192 return super().__new__(cls)
195class Histogram(_Synchronous, APIHistogram):
196 def __new__(cls, *args, **kwargs):
197 if cls is Histogram:
198 raise TypeError("Histogram must be instantiated via a meter.")
199 return super().__new__(cls)
201 def record(
202 self, amount: Union[int, float], attributes: Dict[str, str] = None
203 ):
204 if amount < 0:
205 _logger.warning(
206 "Record amount must be non-negative on Histogram %s.",
207 self.name,
208 )
209 return
210 self._measurement_consumer.consume_measurement(
211 Measurement(amount, self, attributes)
212 )
215class ObservableGauge(_Asynchronous, APIObservableGauge):
216 def __new__(cls, *args, **kwargs):
217 if cls is ObservableGauge:
218 raise TypeError(
219 "ObservableGauge must be instantiated via a meter."
220 )
221 return super().__new__(cls)
224# Below classes exist to prevent the direct instantiation
225class _Counter(Counter):
226 pass
229class _UpDownCounter(UpDownCounter):
230 pass
233class _ObservableCounter(ObservableCounter):
234 pass
237class _ObservableUpDownCounter(ObservableUpDownCounter):
238 pass
241class _Histogram(Histogram):
242 pass
245class _ObservableGauge(ObservableGauge):
246 pass