Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/opentelemetry/sdk/metrics/_internal/__init__.py: 23%
163 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Copyright The OpenTelemetry Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15from atexit import register, unregister
16from logging import getLogger
17from threading import Lock
18from time import time_ns
19from typing import Optional, Sequence
21# This kind of import is needed to avoid Sphinx errors.
22import opentelemetry.sdk.metrics
23from opentelemetry.metrics import Counter as APICounter
24from opentelemetry.metrics import Histogram as APIHistogram
25from opentelemetry.metrics import Meter as APIMeter
26from opentelemetry.metrics import MeterProvider as APIMeterProvider
27from opentelemetry.metrics import NoOpMeter
28from opentelemetry.metrics import ObservableCounter as APIObservableCounter
29from opentelemetry.metrics import ObservableGauge as APIObservableGauge
30from opentelemetry.metrics import (
31 ObservableUpDownCounter as APIObservableUpDownCounter,
32)
33from opentelemetry.metrics import UpDownCounter as APIUpDownCounter
34from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError
35from opentelemetry.sdk.metrics._internal.instrument import (
36 _Counter,
37 _Histogram,
38 _ObservableCounter,
39 _ObservableGauge,
40 _ObservableUpDownCounter,
41 _UpDownCounter,
42)
43from opentelemetry.sdk.metrics._internal.measurement_consumer import (
44 MeasurementConsumer,
45 SynchronousMeasurementConsumer,
46)
47from opentelemetry.sdk.metrics._internal.sdk_configuration import (
48 SdkConfiguration,
49)
50from opentelemetry.sdk.resources import Resource
51from opentelemetry.sdk.util.instrumentation import InstrumentationScope
52from opentelemetry.util._once import Once
54_logger = getLogger(__name__)
57class Meter(APIMeter):
58 """See `opentelemetry.metrics.Meter`."""
60 def __init__(
61 self,
62 instrumentation_scope: InstrumentationScope,
63 measurement_consumer: MeasurementConsumer,
64 ):
65 super().__init__(instrumentation_scope)
66 self._instrumentation_scope = instrumentation_scope
67 self._measurement_consumer = measurement_consumer
68 self._instrument_id_instrument = {}
69 self._instrument_id_instrument_lock = Lock()
71 def create_counter(self, name, unit="", description="") -> APICounter:
73 (
74 is_instrument_registered,
75 instrument_id,
76 ) = self._is_instrument_registered(name, _Counter, unit, description)
78 if is_instrument_registered:
79 # FIXME #2558 go through all views here and check if this
80 # instrument registration conflict can be fixed. If it can be, do
81 # not log the following warning.
82 _logger.warning(
83 "An instrument with name %s, type %s, unit %s and "
84 "description %s has been created already.",
85 name,
86 APICounter.__name__,
87 unit,
88 description,
89 )
90 with self._instrument_id_instrument_lock:
91 return self._instrument_id_instrument[instrument_id]
93 instrument = _Counter(
94 name,
95 self._instrumentation_scope,
96 self._measurement_consumer,
97 unit,
98 description,
99 )
101 with self._instrument_id_instrument_lock:
102 self._instrument_id_instrument[instrument_id] = instrument
103 return instrument
105 def create_up_down_counter(
106 self, name, unit="", description=""
107 ) -> APIUpDownCounter:
109 (
110 is_instrument_registered,
111 instrument_id,
112 ) = self._is_instrument_registered(
113 name, _UpDownCounter, unit, description
114 )
116 if is_instrument_registered:
117 # FIXME #2558 go through all views here and check if this
118 # instrument registration conflict can be fixed. If it can be, do
119 # not log the following warning.
120 _logger.warning(
121 "An instrument with name %s, type %s, unit %s and "
122 "description %s has been created already.",
123 name,
124 APIUpDownCounter.__name__,
125 unit,
126 description,
127 )
128 with self._instrument_id_instrument_lock:
129 return self._instrument_id_instrument[instrument_id]
131 instrument = _UpDownCounter(
132 name,
133 self._instrumentation_scope,
134 self._measurement_consumer,
135 unit,
136 description,
137 )
139 with self._instrument_id_instrument_lock:
140 self._instrument_id_instrument[instrument_id] = instrument
141 return instrument
143 def create_observable_counter(
144 self, name, callbacks=None, unit="", description=""
145 ) -> APIObservableCounter:
147 (
148 is_instrument_registered,
149 instrument_id,
150 ) = self._is_instrument_registered(
151 name, _ObservableCounter, unit, description
152 )
154 if is_instrument_registered:
155 # FIXME #2558 go through all views here and check if this
156 # instrument registration conflict can be fixed. If it can be, do
157 # not log the following warning.
158 _logger.warning(
159 "An instrument with name %s, type %s, unit %s and "
160 "description %s has been created already.",
161 name,
162 APIObservableCounter.__name__,
163 unit,
164 description,
165 )
166 with self._instrument_id_instrument_lock:
167 return self._instrument_id_instrument[instrument_id]
169 instrument = _ObservableCounter(
170 name,
171 self._instrumentation_scope,
172 self._measurement_consumer,
173 callbacks,
174 unit,
175 description,
176 )
178 self._measurement_consumer.register_asynchronous_instrument(instrument)
180 with self._instrument_id_instrument_lock:
181 self._instrument_id_instrument[instrument_id] = instrument
182 return instrument
184 def create_histogram(self, name, unit="", description="") -> APIHistogram:
186 (
187 is_instrument_registered,
188 instrument_id,
189 ) = self._is_instrument_registered(name, _Histogram, unit, description)
191 if is_instrument_registered:
192 # FIXME #2558 go through all views here and check if this
193 # instrument registration conflict can be fixed. If it can be, do
194 # not log the following warning.
195 _logger.warning(
196 "An instrument with name %s, type %s, unit %s and "
197 "description %s has been created already.",
198 name,
199 APIHistogram.__name__,
200 unit,
201 description,
202 )
203 with self._instrument_id_instrument_lock:
204 return self._instrument_id_instrument[instrument_id]
206 instrument = _Histogram(
207 name,
208 self._instrumentation_scope,
209 self._measurement_consumer,
210 unit,
211 description,
212 )
213 with self._instrument_id_instrument_lock:
214 self._instrument_id_instrument[instrument_id] = instrument
215 return instrument
217 def create_observable_gauge(
218 self, name, callbacks=None, unit="", description=""
219 ) -> APIObservableGauge:
221 (
222 is_instrument_registered,
223 instrument_id,
224 ) = self._is_instrument_registered(
225 name, _ObservableGauge, unit, description
226 )
228 if is_instrument_registered:
229 # FIXME #2558 go through all views here and check if this
230 # instrument registration conflict can be fixed. If it can be, do
231 # not log the following warning.
232 _logger.warning(
233 "An instrument with name %s, type %s, unit %s and "
234 "description %s has been created already.",
235 name,
236 APIObservableGauge.__name__,
237 unit,
238 description,
239 )
240 with self._instrument_id_instrument_lock:
241 return self._instrument_id_instrument[instrument_id]
243 instrument = _ObservableGauge(
244 name,
245 self._instrumentation_scope,
246 self._measurement_consumer,
247 callbacks,
248 unit,
249 description,
250 )
252 self._measurement_consumer.register_asynchronous_instrument(instrument)
254 with self._instrument_id_instrument_lock:
255 self._instrument_id_instrument[instrument_id] = instrument
256 return instrument
258 def create_observable_up_down_counter(
259 self, name, callbacks=None, unit="", description=""
260 ) -> APIObservableUpDownCounter:
262 (
263 is_instrument_registered,
264 instrument_id,
265 ) = self._is_instrument_registered(
266 name, _ObservableUpDownCounter, unit, description
267 )
269 if is_instrument_registered:
270 # FIXME #2558 go through all views here and check if this
271 # instrument registration conflict can be fixed. If it can be, do
272 # not log the following warning.
273 _logger.warning(
274 "An instrument with name %s, type %s, unit %s and "
275 "description %s has been created already.",
276 name,
277 APIObservableUpDownCounter.__name__,
278 unit,
279 description,
280 )
281 with self._instrument_id_instrument_lock:
282 return self._instrument_id_instrument[instrument_id]
284 instrument = _ObservableUpDownCounter(
285 name,
286 self._instrumentation_scope,
287 self._measurement_consumer,
288 callbacks,
289 unit,
290 description,
291 )
293 self._measurement_consumer.register_asynchronous_instrument(instrument)
295 with self._instrument_id_instrument_lock:
296 self._instrument_id_instrument[instrument_id] = instrument
297 return instrument
300class MeterProvider(APIMeterProvider):
301 r"""See `opentelemetry.metrics.MeterProvider`.
303 Args:
304 metric_readers: Register metric readers to collect metrics from the SDK
305 on demand. Each :class:`opentelemetry.sdk.metrics.export.MetricReader` is
306 completely independent and will collect separate streams of
307 metrics. TODO: reference ``PeriodicExportingMetricReader`` usage with push
308 exporters here.
309 resource: The resource representing what the metrics emitted from the SDK pertain to.
310 shutdown_on_exit: If true, registers an `atexit` handler to call
311 `MeterProvider.shutdown`
312 views: The views to configure the metric output the SDK
314 By default, instruments which do not match any :class:`opentelemetry.sdk.metrics.view.View` (or if no :class:`opentelemetry.sdk.metrics.view.View`\ s
315 are provided) will report metrics with the default aggregation for the
316 instrument's kind. To disable instruments by default, configure a match-all
317 :class:`opentelemetry.sdk.metrics.view.View` with `DropAggregation` and then create :class:`opentelemetry.sdk.metrics.view.View`\ s to re-enable
318 individual instruments:
320 .. code-block:: python
321 :caption: Disable default views
323 MeterProvider(
324 views=[
325 View(instrument_name="*", aggregation=DropAggregation()),
326 View(instrument_name="mycounter"),
327 ],
328 # ...
329 )
330 """
332 _all_metric_readers_lock = Lock()
333 _all_metric_readers = set()
335 def __init__(
336 self,
337 metric_readers: Sequence[
338 "opentelemetry.sdk.metrics.export.MetricReader"
339 ] = (),
340 resource: Resource = Resource.create({}),
341 shutdown_on_exit: bool = True,
342 views: Sequence["opentelemetry.sdk.metrics.view.View"] = (),
343 ):
344 self._lock = Lock()
345 self._meter_lock = Lock()
346 self._atexit_handler = None
347 self._sdk_config = SdkConfiguration(
348 resource=resource,
349 metric_readers=metric_readers,
350 views=views,
351 )
352 self._measurement_consumer = SynchronousMeasurementConsumer(
353 sdk_config=self._sdk_config
354 )
356 if shutdown_on_exit:
357 self._atexit_handler = register(self.shutdown)
359 self._meters = {}
360 self._shutdown_once = Once()
361 self._shutdown = False
363 for metric_reader in self._sdk_config.metric_readers:
365 with self._all_metric_readers_lock:
366 if metric_reader in self._all_metric_readers:
367 raise Exception(
368 f"MetricReader {metric_reader} has been registered "
369 "already in other MeterProvider instance"
370 )
372 self._all_metric_readers.add(metric_reader)
374 metric_reader._set_collect_callback(
375 self._measurement_consumer.collect
376 )
378 def force_flush(self, timeout_millis: float = 10_000) -> bool:
379 deadline_ns = time_ns() + timeout_millis * 10**6
381 metric_reader_error = {}
383 for metric_reader in self._sdk_config.metric_readers:
384 current_ts = time_ns()
385 try:
386 if current_ts >= deadline_ns:
387 raise MetricsTimeoutError(
388 "Timed out while flushing metric readers"
389 )
390 metric_reader.force_flush(
391 timeout_millis=(deadline_ns - current_ts) / 10**6
392 )
394 # pylint: disable=broad-except
395 except Exception as error:
397 metric_reader_error[metric_reader] = error
399 if metric_reader_error:
401 metric_reader_error_string = "\n".join(
402 [
403 f"{metric_reader.__class__.__name__}: {repr(error)}"
404 for metric_reader, error in metric_reader_error.items()
405 ]
406 )
408 raise Exception(
409 "MeterProvider.force_flush failed because the following "
410 "metric readers failed during collect:\n"
411 f"{metric_reader_error_string}"
412 )
413 return True
415 def shutdown(self, timeout_millis: float = 30_000):
416 deadline_ns = time_ns() + timeout_millis * 10**6
418 def _shutdown():
419 self._shutdown = True
421 did_shutdown = self._shutdown_once.do_once(_shutdown)
423 if not did_shutdown:
424 _logger.warning("shutdown can only be called once")
425 return
427 metric_reader_error = {}
429 for metric_reader in self._sdk_config.metric_readers:
430 current_ts = time_ns()
431 try:
432 if current_ts >= deadline_ns:
433 raise Exception(
434 "Didn't get to execute, deadline already exceeded"
435 )
436 metric_reader.shutdown(
437 timeout_millis=(deadline_ns - current_ts) / 10**6
438 )
440 # pylint: disable=broad-except
441 except Exception as error:
443 metric_reader_error[metric_reader] = error
445 if self._atexit_handler is not None:
446 unregister(self._atexit_handler)
447 self._atexit_handler = None
449 if metric_reader_error:
451 metric_reader_error_string = "\n".join(
452 [
453 f"{metric_reader.__class__.__name__}: {repr(error)}"
454 for metric_reader, error in metric_reader_error.items()
455 ]
456 )
458 raise Exception(
459 (
460 "MeterProvider.shutdown failed because the following "
461 "metric readers failed during shutdown:\n"
462 f"{metric_reader_error_string}"
463 )
464 )
466 def get_meter(
467 self,
468 name: str,
469 version: Optional[str] = None,
470 schema_url: Optional[str] = None,
471 ) -> Meter:
473 if self._shutdown:
474 _logger.warning(
475 "A shutdown `MeterProvider` can not provide a `Meter`"
476 )
477 return NoOpMeter(name, version=version, schema_url=schema_url)
479 if not name:
480 _logger.warning("Meter name cannot be None or empty.")
481 return NoOpMeter(name, version=version, schema_url=schema_url)
483 info = InstrumentationScope(name, version, schema_url)
484 with self._meter_lock:
485 if not self._meters.get(info):
486 # FIXME #2558 pass SDKConfig object to meter so that the meter
487 # has access to views.
488 self._meters[info] = Meter(
489 info,
490 self._measurement_consumer,
491 )
492 return self._meters[info]