Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py: 30%
77 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Copyright The OpenTelemetry Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15from logging import getLogger
16from threading import RLock
17from time import time_ns
18from typing import Dict, List
20from opentelemetry.metrics import (
21 Asynchronous,
22 Counter,
23 Instrument,
24 ObservableCounter,
25)
26from opentelemetry.sdk.metrics._internal._view_instrument_match import (
27 _ViewInstrumentMatch,
28)
29from opentelemetry.sdk.metrics._internal.aggregation import (
30 Aggregation,
31 ExplicitBucketHistogramAggregation,
32 _DropAggregation,
33 _ExplicitBucketHistogramAggregation,
34 _LastValueAggregation,
35 _SumAggregation,
36)
37from opentelemetry.sdk.metrics._internal.export import AggregationTemporality
38from opentelemetry.sdk.metrics._internal.measurement import Measurement
39from opentelemetry.sdk.metrics._internal.point import (
40 Gauge,
41 Histogram,
42 Metric,
43 MetricsData,
44 ResourceMetrics,
45 ScopeMetrics,
46 Sum,
47)
48from opentelemetry.sdk.metrics._internal.sdk_configuration import (
49 SdkConfiguration,
50)
51from opentelemetry.sdk.metrics._internal.view import View
52from opentelemetry.sdk.util.instrumentation import InstrumentationScope
54_logger = getLogger(__name__)
56_DEFAULT_VIEW = View(instrument_name="")
59class MetricReaderStorage:
60 """The SDK's storage for a given reader"""
62 def __init__(
63 self,
64 sdk_config: SdkConfiguration,
65 instrument_class_temporality: Dict[type, AggregationTemporality],
66 instrument_class_aggregation: Dict[type, Aggregation],
67 ) -> None:
68 self._lock = RLock()
69 self._sdk_config = sdk_config
70 self._instrument_view_instrument_matches: Dict[
71 Instrument, List[_ViewInstrumentMatch]
72 ] = {}
73 self._instrument_class_temporality = instrument_class_temporality
74 self._instrument_class_aggregation = instrument_class_aggregation
76 def _get_or_init_view_instrument_match(
77 self, instrument: Instrument
78 ) -> List[_ViewInstrumentMatch]:
79 # Optimistically get the relevant views for the given instrument. Once set for a given
80 # instrument, the mapping will never change
82 if instrument in self._instrument_view_instrument_matches:
83 return self._instrument_view_instrument_matches[instrument]
85 with self._lock:
86 # double check if it was set before we held the lock
87 if instrument in self._instrument_view_instrument_matches:
88 return self._instrument_view_instrument_matches[instrument]
90 # not present, hold the lock and add a new mapping
91 view_instrument_matches = []
93 self._handle_view_instrument_match(
94 instrument, view_instrument_matches
95 )
97 # if no view targeted the instrument, use the default
98 if not view_instrument_matches:
99 view_instrument_matches.append(
100 _ViewInstrumentMatch(
101 view=_DEFAULT_VIEW,
102 instrument=instrument,
103 instrument_class_aggregation=(
104 self._instrument_class_aggregation
105 ),
106 )
107 )
108 self._instrument_view_instrument_matches[
109 instrument
110 ] = view_instrument_matches
112 return view_instrument_matches
114 def consume_measurement(self, measurement: Measurement) -> None:
115 for view_instrument_match in self._get_or_init_view_instrument_match(
116 measurement.instrument
117 ):
118 view_instrument_match.consume_measurement(measurement)
120 def collect(self) -> MetricsData:
121 # Use a list instead of yielding to prevent a slow reader from holding
122 # SDK locks
124 # While holding the lock, new _ViewInstrumentMatch can't be added from
125 # another thread (so we are sure we collect all existing view).
126 # However, instruments can still send measurements that will make it
127 # into the individual aggregations; collection will acquire those locks
128 # iteratively to keep locking as fine-grained as possible. One side
129 # effect is that end times can be slightly skewed among the metric
130 # streams produced by the SDK, but we still align the output timestamps
131 # for a single instrument.
133 collection_start_nanos = time_ns()
135 with self._lock:
137 instrumentation_scope_scope_metrics: (
138 Dict[InstrumentationScope, ScopeMetrics]
139 ) = {}
141 for (
142 instrument,
143 view_instrument_matches,
144 ) in self._instrument_view_instrument_matches.items():
145 aggregation_temporality = self._instrument_class_temporality[
146 instrument.__class__
147 ]
149 metrics: List[Metric] = []
151 for view_instrument_match in view_instrument_matches:
153 if isinstance(
154 # pylint: disable=protected-access
155 view_instrument_match._aggregation,
156 _SumAggregation,
157 ):
158 data = Sum(
159 aggregation_temporality=aggregation_temporality,
160 data_points=view_instrument_match.collect(
161 aggregation_temporality, collection_start_nanos
162 ),
163 is_monotonic=isinstance(
164 instrument, (Counter, ObservableCounter)
165 ),
166 )
167 elif isinstance(
168 # pylint: disable=protected-access
169 view_instrument_match._aggregation,
170 _LastValueAggregation,
171 ):
172 data = Gauge(
173 data_points=view_instrument_match.collect(
174 aggregation_temporality, collection_start_nanos
175 )
176 )
177 elif isinstance(
178 # pylint: disable=protected-access
179 view_instrument_match._aggregation,
180 _ExplicitBucketHistogramAggregation,
181 ):
182 data = Histogram(
183 data_points=view_instrument_match.collect(
184 aggregation_temporality, collection_start_nanos
185 ),
186 aggregation_temporality=aggregation_temporality,
187 )
188 elif isinstance(
189 # pylint: disable=protected-access
190 view_instrument_match._aggregation,
191 _DropAggregation,
192 ):
193 continue
195 metrics.append(
196 Metric(
197 # pylint: disable=protected-access
198 name=view_instrument_match._name,
199 description=view_instrument_match._description,
200 unit=view_instrument_match._instrument.unit,
201 data=data,
202 )
203 )
205 if instrument.instrumentation_scope not in (
206 instrumentation_scope_scope_metrics
207 ):
208 instrumentation_scope_scope_metrics[
209 instrument.instrumentation_scope
210 ] = ScopeMetrics(
211 scope=instrument.instrumentation_scope,
212 metrics=metrics,
213 schema_url=instrument.instrumentation_scope.schema_url,
214 )
215 else:
216 instrumentation_scope_scope_metrics[
217 instrument.instrumentation_scope
218 ].metrics.extend(metrics)
220 return MetricsData(
221 resource_metrics=[
222 ResourceMetrics(
223 resource=self._sdk_config.resource,
224 scope_metrics=list(
225 instrumentation_scope_scope_metrics.values()
226 ),
227 schema_url=self._sdk_config.resource.schema_url,
228 )
229 ]
230 )
232 def _handle_view_instrument_match(
233 self,
234 instrument: Instrument,
235 view_instrument_matches: List["_ViewInstrumentMatch"],
236 ) -> None:
237 for view in self._sdk_config.views:
238 # pylint: disable=protected-access
239 if not view._match(instrument):
240 continue
242 if not self._check_view_instrument_compatibility(view, instrument):
243 continue
245 new_view_instrument_match = _ViewInstrumentMatch(
246 view=view,
247 instrument=instrument,
248 instrument_class_aggregation=(
249 self._instrument_class_aggregation
250 ),
251 )
253 for (
254 existing_view_instrument_matches
255 ) in self._instrument_view_instrument_matches.values():
256 for (
257 existing_view_instrument_match
258 ) in existing_view_instrument_matches:
259 if existing_view_instrument_match.conflicts(
260 new_view_instrument_match
261 ):
263 _logger.warning(
264 "Views %s and %s will cause conflicting "
265 "metrics identities",
266 existing_view_instrument_match._view,
267 new_view_instrument_match._view,
268 )
270 view_instrument_matches.append(new_view_instrument_match)
272 @staticmethod
273 def _check_view_instrument_compatibility(
274 view: View, instrument: Instrument
275 ) -> bool:
276 """
277 Checks if a view and an instrument are compatible.
279 Returns `true` if they are compatible and a `_ViewInstrumentMatch`
280 object should be created, `false` otherwise.
281 """
283 result = True
285 # pylint: disable=protected-access
286 if isinstance(instrument, Asynchronous) and isinstance(
287 view._aggregation, ExplicitBucketHistogramAggregation
288 ):
289 _logger.warning(
290 "View %s and instrument %s will produce "
291 "semantic errors when matched, the view "
292 "has not been applied.",
293 view,
294 instrument,
295 )
296 result = False
298 return result