Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py: 29%
55 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Copyright The OpenTelemetry Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16from logging import getLogger
17from threading import Lock
18from time import time_ns
19from typing import Dict, List, Sequence
21from opentelemetry.metrics import Instrument
22from opentelemetry.sdk.metrics._internal.aggregation import (
23 Aggregation,
24 DefaultAggregation,
25 _Aggregation,
26 _SumAggregation,
27)
28from opentelemetry.sdk.metrics._internal.export import AggregationTemporality
29from opentelemetry.sdk.metrics._internal.measurement import Measurement
30from opentelemetry.sdk.metrics._internal.point import DataPointT
31from opentelemetry.sdk.metrics._internal.view import View
33_logger = getLogger(__name__)
36class _ViewInstrumentMatch:
37 def __init__(
38 self,
39 view: View,
40 instrument: Instrument,
41 instrument_class_aggregation: Dict[type, Aggregation],
42 ):
43 self._start_time_unix_nano = time_ns()
44 self._view = view
45 self._instrument = instrument
46 self._attributes_aggregation: Dict[frozenset, _Aggregation] = {}
47 self._lock = Lock()
48 self._instrument_class_aggregation = instrument_class_aggregation
49 self._name = self._view._name or self._instrument.name
50 self._description = (
51 self._view._description or self._instrument.description
52 )
53 if not isinstance(self._view._aggregation, DefaultAggregation):
54 self._aggregation = self._view._aggregation._create_aggregation(
55 self._instrument, None, 0
56 )
57 else:
58 self._aggregation = self._instrument_class_aggregation[
59 self._instrument.__class__
60 ]._create_aggregation(self._instrument, None, 0)
62 def conflicts(self, other: "_ViewInstrumentMatch") -> bool:
63 # pylint: disable=protected-access
65 result = (
66 self._name == other._name
67 and self._instrument.unit == other._instrument.unit
68 # The aggregation class is being used here instead of data point
69 # type since they are functionally equivalent.
70 and self._aggregation.__class__ == other._aggregation.__class__
71 )
72 if isinstance(self._aggregation, _SumAggregation):
73 result = (
74 result
75 and self._aggregation._instrument_is_monotonic
76 == other._aggregation._instrument_is_monotonic
77 and self._aggregation._instrument_temporality
78 == other._aggregation._instrument_temporality
79 )
81 return result
83 # pylint: disable=protected-access
84 def consume_measurement(self, measurement: Measurement) -> None:
86 if self._view._attribute_keys is not None:
88 attributes = {}
90 for key, value in (measurement.attributes or {}).items():
91 if key in self._view._attribute_keys:
92 attributes[key] = value
93 elif measurement.attributes is not None:
94 attributes = measurement.attributes
95 else:
96 attributes = {}
98 aggr_key = frozenset(attributes.items())
100 if aggr_key not in self._attributes_aggregation:
101 with self._lock:
102 if aggr_key not in self._attributes_aggregation:
103 if not isinstance(
104 self._view._aggregation, DefaultAggregation
105 ):
106 aggregation = (
107 self._view._aggregation._create_aggregation(
108 self._instrument,
109 attributes,
110 self._start_time_unix_nano,
111 )
112 )
113 else:
114 aggregation = self._instrument_class_aggregation[
115 self._instrument.__class__
116 ]._create_aggregation(
117 self._instrument,
118 attributes,
119 self._start_time_unix_nano,
120 )
121 self._attributes_aggregation[aggr_key] = aggregation
123 self._attributes_aggregation[aggr_key].aggregate(measurement)
125 def collect(
126 self,
127 aggregation_temporality: AggregationTemporality,
128 collection_start_nanos: int,
129 ) -> Sequence[DataPointT]:
131 data_points: List[DataPointT] = []
132 with self._lock:
133 for aggregation in self._attributes_aggregation.values():
134 data_point = aggregation.collect(
135 aggregation_temporality, collection_start_nanos
136 )
137 if data_point is not None:
138 data_points.append(data_point)
139 return data_points