Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py: 30%

77 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Copyright The OpenTelemetry Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from logging import getLogger 

16from threading import RLock 

17from time import time_ns 

18from typing import Dict, List 

19 

20from opentelemetry.metrics import ( 

21 Asynchronous, 

22 Counter, 

23 Instrument, 

24 ObservableCounter, 

25) 

26from opentelemetry.sdk.metrics._internal._view_instrument_match import ( 

27 _ViewInstrumentMatch, 

28) 

29from opentelemetry.sdk.metrics._internal.aggregation import ( 

30 Aggregation, 

31 ExplicitBucketHistogramAggregation, 

32 _DropAggregation, 

33 _ExplicitBucketHistogramAggregation, 

34 _LastValueAggregation, 

35 _SumAggregation, 

36) 

37from opentelemetry.sdk.metrics._internal.export import AggregationTemporality 

38from opentelemetry.sdk.metrics._internal.measurement import Measurement 

39from opentelemetry.sdk.metrics._internal.point import ( 

40 Gauge, 

41 Histogram, 

42 Metric, 

43 MetricsData, 

44 ResourceMetrics, 

45 ScopeMetrics, 

46 Sum, 

47) 

48from opentelemetry.sdk.metrics._internal.sdk_configuration import ( 

49 SdkConfiguration, 

50) 

51from opentelemetry.sdk.metrics._internal.view import View 

52from opentelemetry.sdk.util.instrumentation import InstrumentationScope 

53 

54_logger = getLogger(__name__) 

55 

56_DEFAULT_VIEW = View(instrument_name="") 

57 

58 

59class MetricReaderStorage: 

60 """The SDK's storage for a given reader""" 

61 

62 def __init__( 

63 self, 

64 sdk_config: SdkConfiguration, 

65 instrument_class_temporality: Dict[type, AggregationTemporality], 

66 instrument_class_aggregation: Dict[type, Aggregation], 

67 ) -> None: 

68 self._lock = RLock() 

69 self._sdk_config = sdk_config 

70 self._instrument_view_instrument_matches: Dict[ 

71 Instrument, List[_ViewInstrumentMatch] 

72 ] = {} 

73 self._instrument_class_temporality = instrument_class_temporality 

74 self._instrument_class_aggregation = instrument_class_aggregation 

75 

76 def _get_or_init_view_instrument_match( 

77 self, instrument: Instrument 

78 ) -> List[_ViewInstrumentMatch]: 

79 # Optimistically get the relevant views for the given instrument. Once set for a given 

80 # instrument, the mapping will never change 

81 

82 if instrument in self._instrument_view_instrument_matches: 

83 return self._instrument_view_instrument_matches[instrument] 

84 

85 with self._lock: 

86 # double check if it was set before we held the lock 

87 if instrument in self._instrument_view_instrument_matches: 

88 return self._instrument_view_instrument_matches[instrument] 

89 

90 # not present, hold the lock and add a new mapping 

91 view_instrument_matches = [] 

92 

93 self._handle_view_instrument_match( 

94 instrument, view_instrument_matches 

95 ) 

96 

97 # if no view targeted the instrument, use the default 

98 if not view_instrument_matches: 

99 view_instrument_matches.append( 

100 _ViewInstrumentMatch( 

101 view=_DEFAULT_VIEW, 

102 instrument=instrument, 

103 instrument_class_aggregation=( 

104 self._instrument_class_aggregation 

105 ), 

106 ) 

107 ) 

108 self._instrument_view_instrument_matches[ 

109 instrument 

110 ] = view_instrument_matches 

111 

112 return view_instrument_matches 

113 

114 def consume_measurement(self, measurement: Measurement) -> None: 

115 for view_instrument_match in self._get_or_init_view_instrument_match( 

116 measurement.instrument 

117 ): 

118 view_instrument_match.consume_measurement(measurement) 

119 

120 def collect(self) -> MetricsData: 

121 # Use a list instead of yielding to prevent a slow reader from holding 

122 # SDK locks 

123 

124 # While holding the lock, new _ViewInstrumentMatch can't be added from 

125 # another thread (so we are sure we collect all existing view). 

126 # However, instruments can still send measurements that will make it 

127 # into the individual aggregations; collection will acquire those locks 

128 # iteratively to keep locking as fine-grained as possible. One side 

129 # effect is that end times can be slightly skewed among the metric 

130 # streams produced by the SDK, but we still align the output timestamps 

131 # for a single instrument. 

132 

133 collection_start_nanos = time_ns() 

134 

135 with self._lock: 

136 

137 instrumentation_scope_scope_metrics: ( 

138 Dict[InstrumentationScope, ScopeMetrics] 

139 ) = {} 

140 

141 for ( 

142 instrument, 

143 view_instrument_matches, 

144 ) in self._instrument_view_instrument_matches.items(): 

145 aggregation_temporality = self._instrument_class_temporality[ 

146 instrument.__class__ 

147 ] 

148 

149 metrics: List[Metric] = [] 

150 

151 for view_instrument_match in view_instrument_matches: 

152 

153 if isinstance( 

154 # pylint: disable=protected-access 

155 view_instrument_match._aggregation, 

156 _SumAggregation, 

157 ): 

158 data = Sum( 

159 aggregation_temporality=aggregation_temporality, 

160 data_points=view_instrument_match.collect( 

161 aggregation_temporality, collection_start_nanos 

162 ), 

163 is_monotonic=isinstance( 

164 instrument, (Counter, ObservableCounter) 

165 ), 

166 ) 

167 elif isinstance( 

168 # pylint: disable=protected-access 

169 view_instrument_match._aggregation, 

170 _LastValueAggregation, 

171 ): 

172 data = Gauge( 

173 data_points=view_instrument_match.collect( 

174 aggregation_temporality, collection_start_nanos 

175 ) 

176 ) 

177 elif isinstance( 

178 # pylint: disable=protected-access 

179 view_instrument_match._aggregation, 

180 _ExplicitBucketHistogramAggregation, 

181 ): 

182 data = Histogram( 

183 data_points=view_instrument_match.collect( 

184 aggregation_temporality, collection_start_nanos 

185 ), 

186 aggregation_temporality=aggregation_temporality, 

187 ) 

188 elif isinstance( 

189 # pylint: disable=protected-access 

190 view_instrument_match._aggregation, 

191 _DropAggregation, 

192 ): 

193 continue 

194 

195 metrics.append( 

196 Metric( 

197 # pylint: disable=protected-access 

198 name=view_instrument_match._name, 

199 description=view_instrument_match._description, 

200 unit=view_instrument_match._instrument.unit, 

201 data=data, 

202 ) 

203 ) 

204 

205 if instrument.instrumentation_scope not in ( 

206 instrumentation_scope_scope_metrics 

207 ): 

208 instrumentation_scope_scope_metrics[ 

209 instrument.instrumentation_scope 

210 ] = ScopeMetrics( 

211 scope=instrument.instrumentation_scope, 

212 metrics=metrics, 

213 schema_url=instrument.instrumentation_scope.schema_url, 

214 ) 

215 else: 

216 instrumentation_scope_scope_metrics[ 

217 instrument.instrumentation_scope 

218 ].metrics.extend(metrics) 

219 

220 return MetricsData( 

221 resource_metrics=[ 

222 ResourceMetrics( 

223 resource=self._sdk_config.resource, 

224 scope_metrics=list( 

225 instrumentation_scope_scope_metrics.values() 

226 ), 

227 schema_url=self._sdk_config.resource.schema_url, 

228 ) 

229 ] 

230 ) 

231 

232 def _handle_view_instrument_match( 

233 self, 

234 instrument: Instrument, 

235 view_instrument_matches: List["_ViewInstrumentMatch"], 

236 ) -> None: 

237 for view in self._sdk_config.views: 

238 # pylint: disable=protected-access 

239 if not view._match(instrument): 

240 continue 

241 

242 if not self._check_view_instrument_compatibility(view, instrument): 

243 continue 

244 

245 new_view_instrument_match = _ViewInstrumentMatch( 

246 view=view, 

247 instrument=instrument, 

248 instrument_class_aggregation=( 

249 self._instrument_class_aggregation 

250 ), 

251 ) 

252 

253 for ( 

254 existing_view_instrument_matches 

255 ) in self._instrument_view_instrument_matches.values(): 

256 for ( 

257 existing_view_instrument_match 

258 ) in existing_view_instrument_matches: 

259 if existing_view_instrument_match.conflicts( 

260 new_view_instrument_match 

261 ): 

262 

263 _logger.warning( 

264 "Views %s and %s will cause conflicting " 

265 "metrics identities", 

266 existing_view_instrument_match._view, 

267 new_view_instrument_match._view, 

268 ) 

269 

270 view_instrument_matches.append(new_view_instrument_match) 

271 

272 @staticmethod 

273 def _check_view_instrument_compatibility( 

274 view: View, instrument: Instrument 

275 ) -> bool: 

276 """ 

277 Checks if a view and an instrument are compatible. 

278 

279 Returns `true` if they are compatible and a `_ViewInstrumentMatch` 

280 object should be created, `false` otherwise. 

281 """ 

282 

283 result = True 

284 

285 # pylint: disable=protected-access 

286 if isinstance(instrument, Asynchronous) and isinstance( 

287 view._aggregation, ExplicitBucketHistogramAggregation 

288 ): 

289 _logger.warning( 

290 "View %s and instrument %s will produce " 

291 "semantic errors when matched, the view " 

292 "has not been applied.", 

293 view, 

294 instrument, 

295 ) 

296 result = False 

297 

298 return result