Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/opentelemetry/sdk/metrics/_internal/aggregation.py: 31%
178 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Copyright The OpenTelemetry Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15from abc import ABC, abstractmethod
16from bisect import bisect_left
17from enum import IntEnum
18from logging import getLogger
19from math import inf
20from threading import Lock
21from typing import Generic, List, Optional, Sequence, TypeVar
23from opentelemetry.metrics import (
24 Asynchronous,
25 Counter,
26 Histogram,
27 Instrument,
28 ObservableCounter,
29 ObservableGauge,
30 ObservableUpDownCounter,
31 Synchronous,
32 UpDownCounter,
33)
34from opentelemetry.sdk.metrics._internal.measurement import Measurement
35from opentelemetry.sdk.metrics._internal.point import Gauge
36from opentelemetry.sdk.metrics._internal.point import (
37 Histogram as HistogramPoint,
38)
39from opentelemetry.sdk.metrics._internal.point import (
40 HistogramDataPoint,
41 NumberDataPoint,
42 Sum,
43)
44from opentelemetry.util.types import Attributes
46_DataPointVarT = TypeVar("_DataPointVarT", NumberDataPoint, HistogramDataPoint)
48_logger = getLogger(__name__)
51class AggregationTemporality(IntEnum):
52 """
53 The temporality to use when aggregating data.
55 Can be one of the following values:
56 """
58 UNSPECIFIED = 0
59 DELTA = 1
60 CUMULATIVE = 2
63class _Aggregation(ABC, Generic[_DataPointVarT]):
64 def __init__(self, attributes: Attributes):
65 self._lock = Lock()
66 self._attributes = attributes
67 self._previous_point = None
69 @abstractmethod
70 def aggregate(self, measurement: Measurement) -> None:
71 pass
73 @abstractmethod
74 def collect(
75 self,
76 aggregation_temporality: AggregationTemporality,
77 collection_start_nano: int,
78 ) -> Optional[_DataPointVarT]:
79 pass
82class _DropAggregation(_Aggregation):
83 def aggregate(self, measurement: Measurement) -> None:
84 pass
86 def collect(
87 self,
88 aggregation_temporality: AggregationTemporality,
89 collection_start_nano: int,
90 ) -> Optional[_DataPointVarT]:
91 pass
94class _SumAggregation(_Aggregation[Sum]):
95 def __init__(
96 self,
97 attributes: Attributes,
98 instrument_is_monotonic: bool,
99 instrument_temporality: AggregationTemporality,
100 start_time_unix_nano: int,
101 ):
102 super().__init__(attributes)
104 self._start_time_unix_nano = start_time_unix_nano
105 self._instrument_temporality = instrument_temporality
106 self._instrument_is_monotonic = instrument_is_monotonic
108 if self._instrument_temporality is AggregationTemporality.DELTA:
109 self._value = 0
110 else:
111 self._value = None
113 def aggregate(self, measurement: Measurement) -> None:
114 with self._lock:
115 if self._value is None:
116 self._value = 0
117 self._value = self._value + measurement.value
119 def collect(
120 self,
121 aggregation_temporality: AggregationTemporality,
122 collection_start_nano: int,
123 ) -> Optional[NumberDataPoint]:
124 """
125 Atomically return a point for the current value of the metric and
126 reset the aggregation value.
127 """
128 if self._instrument_temporality is AggregationTemporality.DELTA:
130 with self._lock:
131 value = self._value
132 start_time_unix_nano = self._start_time_unix_nano
134 self._value = 0
135 self._start_time_unix_nano = collection_start_nano
137 else:
139 with self._lock:
140 if self._value is None:
141 return None
142 value = self._value
143 self._value = None
144 start_time_unix_nano = self._start_time_unix_nano
146 current_point = NumberDataPoint(
147 attributes=self._attributes,
148 start_time_unix_nano=start_time_unix_nano,
149 time_unix_nano=collection_start_nano,
150 value=value,
151 )
153 if self._previous_point is None or (
154 self._instrument_temporality is aggregation_temporality
155 ):
156 # Output DELTA for a synchronous instrument
157 # Output CUMULATIVE for an asynchronous instrument
158 self._previous_point = current_point
159 return current_point
161 if aggregation_temporality is AggregationTemporality.DELTA:
162 # Output temporality DELTA for an asynchronous instrument
163 value = current_point.value - self._previous_point.value
164 output_start_time_unix_nano = self._previous_point.time_unix_nano
166 else:
167 # Output CUMULATIVE for a synchronous instrument
168 value = current_point.value + self._previous_point.value
169 output_start_time_unix_nano = (
170 self._previous_point.start_time_unix_nano
171 )
173 current_point = NumberDataPoint(
174 attributes=self._attributes,
175 start_time_unix_nano=output_start_time_unix_nano,
176 time_unix_nano=current_point.time_unix_nano,
177 value=value,
178 )
180 self._previous_point = current_point
181 return current_point
184class _LastValueAggregation(_Aggregation[Gauge]):
185 def __init__(self, attributes: Attributes):
186 super().__init__(attributes)
187 self._value = None
189 def aggregate(self, measurement: Measurement):
190 with self._lock:
191 self._value = measurement.value
193 def collect(
194 self,
195 aggregation_temporality: AggregationTemporality,
196 collection_start_nano: int,
197 ) -> Optional[_DataPointVarT]:
198 """
199 Atomically return a point for the current value of the metric.
200 """
201 with self._lock:
202 if self._value is None:
203 return None
204 value = self._value
205 self._value = None
207 return NumberDataPoint(
208 attributes=self._attributes,
209 start_time_unix_nano=0,
210 time_unix_nano=collection_start_nano,
211 value=value,
212 )
215class _ExplicitBucketHistogramAggregation(_Aggregation[HistogramPoint]):
216 def __init__(
217 self,
218 attributes: Attributes,
219 start_time_unix_nano: int,
220 boundaries: Sequence[float] = (
221 0.0,
222 5.0,
223 10.0,
224 25.0,
225 50.0,
226 75.0,
227 100.0,
228 250.0,
229 500.0,
230 750.0,
231 1000.0,
232 2500.0,
233 5000.0,
234 7500.0,
235 10000.0,
236 ),
237 record_min_max: bool = True,
238 ):
239 super().__init__(attributes)
240 self._boundaries = tuple(boundaries)
241 self._bucket_counts = self._get_empty_bucket_counts()
242 self._min = inf
243 self._max = -inf
244 self._sum = 0
245 self._record_min_max = record_min_max
246 self._start_time_unix_nano = start_time_unix_nano
247 # It is assumed that the "natural" aggregation temporality for a
248 # Histogram instrument is DELTA, like the "natural" aggregation
249 # temporality for a Counter is DELTA and the "natural" aggregation
250 # temporality for an ObservableCounter is CUMULATIVE.
251 self._instrument_temporality = AggregationTemporality.DELTA
253 def _get_empty_bucket_counts(self) -> List[int]:
254 return [0] * (len(self._boundaries) + 1)
256 def aggregate(self, measurement: Measurement) -> None:
258 value = measurement.value
260 if self._record_min_max:
261 self._min = min(self._min, value)
262 self._max = max(self._max, value)
264 self._sum += value
266 self._bucket_counts[bisect_left(self._boundaries, value)] += 1
268 def collect(
269 self,
270 aggregation_temporality: AggregationTemporality,
271 collection_start_nano: int,
272 ) -> Optional[_DataPointVarT]:
273 """
274 Atomically return a point for the current value of the metric.
275 """
276 with self._lock:
277 if not any(self._bucket_counts):
278 return None
280 bucket_counts = self._bucket_counts
281 start_time_unix_nano = self._start_time_unix_nano
282 sum_ = self._sum
283 max_ = self._max
284 min_ = self._min
286 self._bucket_counts = self._get_empty_bucket_counts()
287 self._start_time_unix_nano = collection_start_nano
288 self._sum = 0
289 self._min = inf
290 self._max = -inf
292 current_point = HistogramDataPoint(
293 attributes=self._attributes,
294 start_time_unix_nano=start_time_unix_nano,
295 time_unix_nano=collection_start_nano,
296 count=sum(bucket_counts),
297 sum=sum_,
298 bucket_counts=tuple(bucket_counts),
299 explicit_bounds=self._boundaries,
300 min=min_,
301 max=max_,
302 )
304 if self._previous_point is None or (
305 self._instrument_temporality is aggregation_temporality
306 ):
307 self._previous_point = current_point
308 return current_point
310 max_ = current_point.max
311 min_ = current_point.min
313 if aggregation_temporality is AggregationTemporality.CUMULATIVE:
314 start_time_unix_nano = self._previous_point.start_time_unix_nano
315 sum_ = current_point.sum + self._previous_point.sum
316 # Only update min/max on delta -> cumulative
317 max_ = max(current_point.max, self._previous_point.max)
318 min_ = min(current_point.min, self._previous_point.min)
319 bucket_counts = [
320 curr_count + prev_count
321 for curr_count, prev_count in zip(
322 current_point.bucket_counts,
323 self._previous_point.bucket_counts,
324 )
325 ]
326 else:
327 start_time_unix_nano = self._previous_point.time_unix_nano
328 sum_ = current_point.sum - self._previous_point.sum
329 bucket_counts = [
330 curr_count - prev_count
331 for curr_count, prev_count in zip(
332 current_point.bucket_counts,
333 self._previous_point.bucket_counts,
334 )
335 ]
337 current_point = HistogramDataPoint(
338 attributes=self._attributes,
339 start_time_unix_nano=start_time_unix_nano,
340 time_unix_nano=current_point.time_unix_nano,
341 count=sum(bucket_counts),
342 sum=sum_,
343 bucket_counts=tuple(bucket_counts),
344 explicit_bounds=current_point.explicit_bounds,
345 min=min_,
346 max=max_,
347 )
348 self._previous_point = current_point
349 return current_point
352class Aggregation(ABC):
353 """
354 Base class for all aggregation types.
355 """
357 @abstractmethod
358 def _create_aggregation(
359 self,
360 instrument: Instrument,
361 attributes: Attributes,
362 start_time_unix_nano: int,
363 ) -> _Aggregation:
364 """Creates an aggregation"""
367class DefaultAggregation(Aggregation):
368 """
369 The default aggregation to be used in a `View`.
371 This aggregation will create an actual aggregation depending on the
372 instrument type, as specified next:
374 ==================================================== ====================================
375 Instrument Aggregation
376 ==================================================== ====================================
377 `opentelemetry.sdk.metrics.Counter` `SumAggregation`
378 `opentelemetry.sdk.metrics.UpDownCounter` `SumAggregation`
379 `opentelemetry.sdk.metrics.ObservableCounter` `SumAggregation`
380 `opentelemetry.sdk.metrics.ObservableUpDownCounter` `SumAggregation`
381 `opentelemetry.sdk.metrics.Histogram` `ExplicitBucketHistogramAggregation`
382 `opentelemetry.sdk.metrics.ObservableGauge` `LastValueAggregation`
383 ==================================================== ====================================
384 """
386 def _create_aggregation(
387 self,
388 instrument: Instrument,
389 attributes: Attributes,
390 start_time_unix_nano: int,
391 ) -> _Aggregation:
393 # pylint: disable=too-many-return-statements
394 if isinstance(instrument, Counter):
395 return _SumAggregation(
396 attributes,
397 instrument_is_monotonic=True,
398 instrument_temporality=AggregationTemporality.DELTA,
399 start_time_unix_nano=start_time_unix_nano,
400 )
401 if isinstance(instrument, UpDownCounter):
402 return _SumAggregation(
403 attributes,
404 instrument_is_monotonic=False,
405 instrument_temporality=AggregationTemporality.DELTA,
406 start_time_unix_nano=start_time_unix_nano,
407 )
409 if isinstance(instrument, ObservableCounter):
410 return _SumAggregation(
411 attributes,
412 instrument_is_monotonic=True,
413 instrument_temporality=AggregationTemporality.CUMULATIVE,
414 start_time_unix_nano=start_time_unix_nano,
415 )
417 if isinstance(instrument, ObservableUpDownCounter):
418 return _SumAggregation(
419 attributes,
420 instrument_is_monotonic=False,
421 instrument_temporality=AggregationTemporality.CUMULATIVE,
422 start_time_unix_nano=start_time_unix_nano,
423 )
425 if isinstance(instrument, Histogram):
426 return _ExplicitBucketHistogramAggregation(
427 attributes, start_time_unix_nano
428 )
430 if isinstance(instrument, ObservableGauge):
431 return _LastValueAggregation(attributes)
433 raise Exception(f"Invalid instrument type {type(instrument)} found")
436class ExplicitBucketHistogramAggregation(Aggregation):
437 """This aggregation informs the SDK to collect:
439 - Count of Measurement values falling within explicit bucket boundaries.
440 - Arithmetic sum of Measurement values in population. This SHOULD NOT be collected when used with instruments that record negative measurements, e.g. UpDownCounter or ObservableGauge.
441 - Min (optional) Measurement value in population.
442 - Max (optional) Measurement value in population.
445 Args:
446 boundaries: Array of increasing values representing explicit bucket boundary values.
447 record_min_max: Whether to record min and max.
448 """
450 def __init__(
451 self,
452 boundaries: Sequence[float] = (
453 0.0,
454 5.0,
455 10.0,
456 25.0,
457 50.0,
458 75.0,
459 100.0,
460 250.0,
461 500.0,
462 750.0,
463 1000.0,
464 2500.0,
465 5000.0,
466 7500.0,
467 10000.0,
468 ),
469 record_min_max: bool = True,
470 ) -> None:
471 self._boundaries = boundaries
472 self._record_min_max = record_min_max
474 def _create_aggregation(
475 self,
476 instrument: Instrument,
477 attributes: Attributes,
478 start_time_unix_nano: int,
479 ) -> _Aggregation:
480 return _ExplicitBucketHistogramAggregation(
481 attributes,
482 start_time_unix_nano,
483 self._boundaries,
484 self._record_min_max,
485 )
488class SumAggregation(Aggregation):
489 """This aggregation informs the SDK to collect:
491 - The arithmetic sum of Measurement values.
492 """
494 def _create_aggregation(
495 self,
496 instrument: Instrument,
497 attributes: Attributes,
498 start_time_unix_nano: int,
499 ) -> _Aggregation:
501 temporality = AggregationTemporality.UNSPECIFIED
502 if isinstance(instrument, Synchronous):
503 temporality = AggregationTemporality.DELTA
504 elif isinstance(instrument, Asynchronous):
505 temporality = AggregationTemporality.CUMULATIVE
507 return _SumAggregation(
508 attributes,
509 isinstance(instrument, (Counter, ObservableCounter)),
510 temporality,
511 start_time_unix_nano,
512 )
515class LastValueAggregation(Aggregation):
516 """
517 This aggregation informs the SDK to collect:
519 - The last Measurement.
520 - The timestamp of the last Measurement.
521 """
523 def _create_aggregation(
524 self,
525 instrument: Instrument,
526 attributes: Attributes,
527 start_time_unix_nano: int,
528 ) -> _Aggregation:
529 return _LastValueAggregation(attributes)
532class DropAggregation(Aggregation):
533 """Using this aggregation will make all measurements be ignored."""
535 def _create_aggregation(
536 self,
537 instrument: Instrument,
538 attributes: Attributes,
539 start_time_unix_nano: int,
540 ) -> _Aggregation:
541 return _DropAggregation(attributes)