Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/opentelemetry/sdk/metrics/_internal/aggregation.py: 31%

178 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Copyright The OpenTelemetry Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from abc import ABC, abstractmethod 

16from bisect import bisect_left 

17from enum import IntEnum 

18from logging import getLogger 

19from math import inf 

20from threading import Lock 

21from typing import Generic, List, Optional, Sequence, TypeVar 

22 

23from opentelemetry.metrics import ( 

24 Asynchronous, 

25 Counter, 

26 Histogram, 

27 Instrument, 

28 ObservableCounter, 

29 ObservableGauge, 

30 ObservableUpDownCounter, 

31 Synchronous, 

32 UpDownCounter, 

33) 

34from opentelemetry.sdk.metrics._internal.measurement import Measurement 

35from opentelemetry.sdk.metrics._internal.point import Gauge 

36from opentelemetry.sdk.metrics._internal.point import ( 

37 Histogram as HistogramPoint, 

38) 

39from opentelemetry.sdk.metrics._internal.point import ( 

40 HistogramDataPoint, 

41 NumberDataPoint, 

42 Sum, 

43) 

44from opentelemetry.util.types import Attributes 

45 

46_DataPointVarT = TypeVar("_DataPointVarT", NumberDataPoint, HistogramDataPoint) 

47 

48_logger = getLogger(__name__) 

49 

50 

51class AggregationTemporality(IntEnum): 

52 """ 

53 The temporality to use when aggregating data. 

54 

55 Can be one of the following values: 

56 """ 

57 

58 UNSPECIFIED = 0 

59 DELTA = 1 

60 CUMULATIVE = 2 

61 

62 

63class _Aggregation(ABC, Generic[_DataPointVarT]): 

64 def __init__(self, attributes: Attributes): 

65 self._lock = Lock() 

66 self._attributes = attributes 

67 self._previous_point = None 

68 

69 @abstractmethod 

70 def aggregate(self, measurement: Measurement) -> None: 

71 pass 

72 

73 @abstractmethod 

74 def collect( 

75 self, 

76 aggregation_temporality: AggregationTemporality, 

77 collection_start_nano: int, 

78 ) -> Optional[_DataPointVarT]: 

79 pass 

80 

81 

82class _DropAggregation(_Aggregation): 

83 def aggregate(self, measurement: Measurement) -> None: 

84 pass 

85 

86 def collect( 

87 self, 

88 aggregation_temporality: AggregationTemporality, 

89 collection_start_nano: int, 

90 ) -> Optional[_DataPointVarT]: 

91 pass 

92 

93 

94class _SumAggregation(_Aggregation[Sum]): 

95 def __init__( 

96 self, 

97 attributes: Attributes, 

98 instrument_is_monotonic: bool, 

99 instrument_temporality: AggregationTemporality, 

100 start_time_unix_nano: int, 

101 ): 

102 super().__init__(attributes) 

103 

104 self._start_time_unix_nano = start_time_unix_nano 

105 self._instrument_temporality = instrument_temporality 

106 self._instrument_is_monotonic = instrument_is_monotonic 

107 

108 if self._instrument_temporality is AggregationTemporality.DELTA: 

109 self._value = 0 

110 else: 

111 self._value = None 

112 

113 def aggregate(self, measurement: Measurement) -> None: 

114 with self._lock: 

115 if self._value is None: 

116 self._value = 0 

117 self._value = self._value + measurement.value 

118 

119 def collect( 

120 self, 

121 aggregation_temporality: AggregationTemporality, 

122 collection_start_nano: int, 

123 ) -> Optional[NumberDataPoint]: 

124 """ 

125 Atomically return a point for the current value of the metric and 

126 reset the aggregation value. 

127 """ 

128 if self._instrument_temporality is AggregationTemporality.DELTA: 

129 

130 with self._lock: 

131 value = self._value 

132 start_time_unix_nano = self._start_time_unix_nano 

133 

134 self._value = 0 

135 self._start_time_unix_nano = collection_start_nano 

136 

137 else: 

138 

139 with self._lock: 

140 if self._value is None: 

141 return None 

142 value = self._value 

143 self._value = None 

144 start_time_unix_nano = self._start_time_unix_nano 

145 

146 current_point = NumberDataPoint( 

147 attributes=self._attributes, 

148 start_time_unix_nano=start_time_unix_nano, 

149 time_unix_nano=collection_start_nano, 

150 value=value, 

151 ) 

152 

153 if self._previous_point is None or ( 

154 self._instrument_temporality is aggregation_temporality 

155 ): 

156 # Output DELTA for a synchronous instrument 

157 # Output CUMULATIVE for an asynchronous instrument 

158 self._previous_point = current_point 

159 return current_point 

160 

161 if aggregation_temporality is AggregationTemporality.DELTA: 

162 # Output temporality DELTA for an asynchronous instrument 

163 value = current_point.value - self._previous_point.value 

164 output_start_time_unix_nano = self._previous_point.time_unix_nano 

165 

166 else: 

167 # Output CUMULATIVE for a synchronous instrument 

168 value = current_point.value + self._previous_point.value 

169 output_start_time_unix_nano = ( 

170 self._previous_point.start_time_unix_nano 

171 ) 

172 

173 current_point = NumberDataPoint( 

174 attributes=self._attributes, 

175 start_time_unix_nano=output_start_time_unix_nano, 

176 time_unix_nano=current_point.time_unix_nano, 

177 value=value, 

178 ) 

179 

180 self._previous_point = current_point 

181 return current_point 

182 

183 

184class _LastValueAggregation(_Aggregation[Gauge]): 

185 def __init__(self, attributes: Attributes): 

186 super().__init__(attributes) 

187 self._value = None 

188 

189 def aggregate(self, measurement: Measurement): 

190 with self._lock: 

191 self._value = measurement.value 

192 

193 def collect( 

194 self, 

195 aggregation_temporality: AggregationTemporality, 

196 collection_start_nano: int, 

197 ) -> Optional[_DataPointVarT]: 

198 """ 

199 Atomically return a point for the current value of the metric. 

200 """ 

201 with self._lock: 

202 if self._value is None: 

203 return None 

204 value = self._value 

205 self._value = None 

206 

207 return NumberDataPoint( 

208 attributes=self._attributes, 

209 start_time_unix_nano=0, 

210 time_unix_nano=collection_start_nano, 

211 value=value, 

212 ) 

213 

214 

215class _ExplicitBucketHistogramAggregation(_Aggregation[HistogramPoint]): 

216 def __init__( 

217 self, 

218 attributes: Attributes, 

219 start_time_unix_nano: int, 

220 boundaries: Sequence[float] = ( 

221 0.0, 

222 5.0, 

223 10.0, 

224 25.0, 

225 50.0, 

226 75.0, 

227 100.0, 

228 250.0, 

229 500.0, 

230 750.0, 

231 1000.0, 

232 2500.0, 

233 5000.0, 

234 7500.0, 

235 10000.0, 

236 ), 

237 record_min_max: bool = True, 

238 ): 

239 super().__init__(attributes) 

240 self._boundaries = tuple(boundaries) 

241 self._bucket_counts = self._get_empty_bucket_counts() 

242 self._min = inf 

243 self._max = -inf 

244 self._sum = 0 

245 self._record_min_max = record_min_max 

246 self._start_time_unix_nano = start_time_unix_nano 

247 # It is assumed that the "natural" aggregation temporality for a 

248 # Histogram instrument is DELTA, like the "natural" aggregation 

249 # temporality for a Counter is DELTA and the "natural" aggregation 

250 # temporality for an ObservableCounter is CUMULATIVE. 

251 self._instrument_temporality = AggregationTemporality.DELTA 

252 

253 def _get_empty_bucket_counts(self) -> List[int]: 

254 return [0] * (len(self._boundaries) + 1) 

255 

256 def aggregate(self, measurement: Measurement) -> None: 

257 

258 value = measurement.value 

259 

260 if self._record_min_max: 

261 self._min = min(self._min, value) 

262 self._max = max(self._max, value) 

263 

264 self._sum += value 

265 

266 self._bucket_counts[bisect_left(self._boundaries, value)] += 1 

267 

268 def collect( 

269 self, 

270 aggregation_temporality: AggregationTemporality, 

271 collection_start_nano: int, 

272 ) -> Optional[_DataPointVarT]: 

273 """ 

274 Atomically return a point for the current value of the metric. 

275 """ 

276 with self._lock: 

277 if not any(self._bucket_counts): 

278 return None 

279 

280 bucket_counts = self._bucket_counts 

281 start_time_unix_nano = self._start_time_unix_nano 

282 sum_ = self._sum 

283 max_ = self._max 

284 min_ = self._min 

285 

286 self._bucket_counts = self._get_empty_bucket_counts() 

287 self._start_time_unix_nano = collection_start_nano 

288 self._sum = 0 

289 self._min = inf 

290 self._max = -inf 

291 

292 current_point = HistogramDataPoint( 

293 attributes=self._attributes, 

294 start_time_unix_nano=start_time_unix_nano, 

295 time_unix_nano=collection_start_nano, 

296 count=sum(bucket_counts), 

297 sum=sum_, 

298 bucket_counts=tuple(bucket_counts), 

299 explicit_bounds=self._boundaries, 

300 min=min_, 

301 max=max_, 

302 ) 

303 

304 if self._previous_point is None or ( 

305 self._instrument_temporality is aggregation_temporality 

306 ): 

307 self._previous_point = current_point 

308 return current_point 

309 

310 max_ = current_point.max 

311 min_ = current_point.min 

312 

313 if aggregation_temporality is AggregationTemporality.CUMULATIVE: 

314 start_time_unix_nano = self._previous_point.start_time_unix_nano 

315 sum_ = current_point.sum + self._previous_point.sum 

316 # Only update min/max on delta -> cumulative 

317 max_ = max(current_point.max, self._previous_point.max) 

318 min_ = min(current_point.min, self._previous_point.min) 

319 bucket_counts = [ 

320 curr_count + prev_count 

321 for curr_count, prev_count in zip( 

322 current_point.bucket_counts, 

323 self._previous_point.bucket_counts, 

324 ) 

325 ] 

326 else: 

327 start_time_unix_nano = self._previous_point.time_unix_nano 

328 sum_ = current_point.sum - self._previous_point.sum 

329 bucket_counts = [ 

330 curr_count - prev_count 

331 for curr_count, prev_count in zip( 

332 current_point.bucket_counts, 

333 self._previous_point.bucket_counts, 

334 ) 

335 ] 

336 

337 current_point = HistogramDataPoint( 

338 attributes=self._attributes, 

339 start_time_unix_nano=start_time_unix_nano, 

340 time_unix_nano=current_point.time_unix_nano, 

341 count=sum(bucket_counts), 

342 sum=sum_, 

343 bucket_counts=tuple(bucket_counts), 

344 explicit_bounds=current_point.explicit_bounds, 

345 min=min_, 

346 max=max_, 

347 ) 

348 self._previous_point = current_point 

349 return current_point 

350 

351 

352class Aggregation(ABC): 

353 """ 

354 Base class for all aggregation types. 

355 """ 

356 

357 @abstractmethod 

358 def _create_aggregation( 

359 self, 

360 instrument: Instrument, 

361 attributes: Attributes, 

362 start_time_unix_nano: int, 

363 ) -> _Aggregation: 

364 """Creates an aggregation""" 

365 

366 

367class DefaultAggregation(Aggregation): 

368 """ 

369 The default aggregation to be used in a `View`. 

370 

371 This aggregation will create an actual aggregation depending on the 

372 instrument type, as specified next: 

373 

374 ==================================================== ==================================== 

375 Instrument Aggregation 

376 ==================================================== ==================================== 

377 `opentelemetry.sdk.metrics.Counter` `SumAggregation` 

378 `opentelemetry.sdk.metrics.UpDownCounter` `SumAggregation` 

379 `opentelemetry.sdk.metrics.ObservableCounter` `SumAggregation` 

380 `opentelemetry.sdk.metrics.ObservableUpDownCounter` `SumAggregation` 

381 `opentelemetry.sdk.metrics.Histogram` `ExplicitBucketHistogramAggregation` 

382 `opentelemetry.sdk.metrics.ObservableGauge` `LastValueAggregation` 

383 ==================================================== ==================================== 

384 """ 

385 

386 def _create_aggregation( 

387 self, 

388 instrument: Instrument, 

389 attributes: Attributes, 

390 start_time_unix_nano: int, 

391 ) -> _Aggregation: 

392 

393 # pylint: disable=too-many-return-statements 

394 if isinstance(instrument, Counter): 

395 return _SumAggregation( 

396 attributes, 

397 instrument_is_monotonic=True, 

398 instrument_temporality=AggregationTemporality.DELTA, 

399 start_time_unix_nano=start_time_unix_nano, 

400 ) 

401 if isinstance(instrument, UpDownCounter): 

402 return _SumAggregation( 

403 attributes, 

404 instrument_is_monotonic=False, 

405 instrument_temporality=AggregationTemporality.DELTA, 

406 start_time_unix_nano=start_time_unix_nano, 

407 ) 

408 

409 if isinstance(instrument, ObservableCounter): 

410 return _SumAggregation( 

411 attributes, 

412 instrument_is_monotonic=True, 

413 instrument_temporality=AggregationTemporality.CUMULATIVE, 

414 start_time_unix_nano=start_time_unix_nano, 

415 ) 

416 

417 if isinstance(instrument, ObservableUpDownCounter): 

418 return _SumAggregation( 

419 attributes, 

420 instrument_is_monotonic=False, 

421 instrument_temporality=AggregationTemporality.CUMULATIVE, 

422 start_time_unix_nano=start_time_unix_nano, 

423 ) 

424 

425 if isinstance(instrument, Histogram): 

426 return _ExplicitBucketHistogramAggregation( 

427 attributes, start_time_unix_nano 

428 ) 

429 

430 if isinstance(instrument, ObservableGauge): 

431 return _LastValueAggregation(attributes) 

432 

433 raise Exception(f"Invalid instrument type {type(instrument)} found") 

434 

435 

436class ExplicitBucketHistogramAggregation(Aggregation): 

437 """This aggregation informs the SDK to collect: 

438 

439 - Count of Measurement values falling within explicit bucket boundaries. 

440 - Arithmetic sum of Measurement values in population. This SHOULD NOT be collected when used with instruments that record negative measurements, e.g. UpDownCounter or ObservableGauge. 

441 - Min (optional) Measurement value in population. 

442 - Max (optional) Measurement value in population. 

443 

444 

445 Args: 

446 boundaries: Array of increasing values representing explicit bucket boundary values. 

447 record_min_max: Whether to record min and max. 

448 """ 

449 

450 def __init__( 

451 self, 

452 boundaries: Sequence[float] = ( 

453 0.0, 

454 5.0, 

455 10.0, 

456 25.0, 

457 50.0, 

458 75.0, 

459 100.0, 

460 250.0, 

461 500.0, 

462 750.0, 

463 1000.0, 

464 2500.0, 

465 5000.0, 

466 7500.0, 

467 10000.0, 

468 ), 

469 record_min_max: bool = True, 

470 ) -> None: 

471 self._boundaries = boundaries 

472 self._record_min_max = record_min_max 

473 

474 def _create_aggregation( 

475 self, 

476 instrument: Instrument, 

477 attributes: Attributes, 

478 start_time_unix_nano: int, 

479 ) -> _Aggregation: 

480 return _ExplicitBucketHistogramAggregation( 

481 attributes, 

482 start_time_unix_nano, 

483 self._boundaries, 

484 self._record_min_max, 

485 ) 

486 

487 

488class SumAggregation(Aggregation): 

489 """This aggregation informs the SDK to collect: 

490 

491 - The arithmetic sum of Measurement values. 

492 """ 

493 

494 def _create_aggregation( 

495 self, 

496 instrument: Instrument, 

497 attributes: Attributes, 

498 start_time_unix_nano: int, 

499 ) -> _Aggregation: 

500 

501 temporality = AggregationTemporality.UNSPECIFIED 

502 if isinstance(instrument, Synchronous): 

503 temporality = AggregationTemporality.DELTA 

504 elif isinstance(instrument, Asynchronous): 

505 temporality = AggregationTemporality.CUMULATIVE 

506 

507 return _SumAggregation( 

508 attributes, 

509 isinstance(instrument, (Counter, ObservableCounter)), 

510 temporality, 

511 start_time_unix_nano, 

512 ) 

513 

514 

515class LastValueAggregation(Aggregation): 

516 """ 

517 This aggregation informs the SDK to collect: 

518 

519 - The last Measurement. 

520 - The timestamp of the last Measurement. 

521 """ 

522 

523 def _create_aggregation( 

524 self, 

525 instrument: Instrument, 

526 attributes: Attributes, 

527 start_time_unix_nano: int, 

528 ) -> _Aggregation: 

529 return _LastValueAggregation(attributes) 

530 

531 

532class DropAggregation(Aggregation): 

533 """Using this aggregation will make all measurements be ignored.""" 

534 

535 def _create_aggregation( 

536 self, 

537 instrument: Instrument, 

538 attributes: Attributes, 

539 start_time_unix_nano: int, 

540 ) -> _Aggregation: 

541 return _DropAggregation(attributes)