Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/opentelemetry/sdk/metrics/_internal/measurement_consumer.py: 48%

50 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Copyright The OpenTelemetry Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15# pylint: disable=unused-import 

16 

17from abc import ABC, abstractmethod 

18from threading import Lock 

19from time import time_ns 

20from typing import Iterable, List, Mapping 

21 

22# This kind of import is needed to avoid Sphinx errors. 

23import opentelemetry.sdk.metrics 

24import opentelemetry.sdk.metrics._internal.instrument 

25import opentelemetry.sdk.metrics._internal.sdk_configuration 

26from opentelemetry.metrics._internal.instrument import CallbackOptions 

27from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError 

28from opentelemetry.sdk.metrics._internal.measurement import Measurement 

29from opentelemetry.sdk.metrics._internal.metric_reader_storage import ( 

30 MetricReaderStorage, 

31) 

32from opentelemetry.sdk.metrics._internal.point import Metric 

33 

34 

35class MeasurementConsumer(ABC): 

36 @abstractmethod 

37 def consume_measurement(self, measurement: Measurement) -> None: 

38 pass 

39 

40 @abstractmethod 

41 def register_asynchronous_instrument( 

42 self, 

43 instrument: ( 

44 "opentelemetry.sdk.metrics._internal.instrument_Asynchronous" 

45 ), 

46 ): 

47 pass 

48 

49 @abstractmethod 

50 def collect( 

51 self, 

52 metric_reader: "opentelemetry.sdk.metrics.MetricReader", 

53 timeout_millis: float = 10_000, 

54 ) -> Iterable[Metric]: 

55 pass 

56 

57 

58class SynchronousMeasurementConsumer(MeasurementConsumer): 

59 def __init__( 

60 self, 

61 sdk_config: "opentelemetry.sdk.metrics._internal.SdkConfiguration", 

62 ) -> None: 

63 self._lock = Lock() 

64 self._sdk_config = sdk_config 

65 # should never be mutated 

66 self._reader_storages: Mapping[ 

67 "opentelemetry.sdk.metrics.MetricReader", MetricReaderStorage 

68 ] = { 

69 reader: MetricReaderStorage( 

70 sdk_config, 

71 reader._instrument_class_temporality, 

72 reader._instrument_class_aggregation, 

73 ) 

74 for reader in sdk_config.metric_readers 

75 } 

76 self._async_instruments: List[ 

77 "opentelemetry.sdk.metrics._internal.instrument._Asynchronous" 

78 ] = [] 

79 

80 def consume_measurement(self, measurement: Measurement) -> None: 

81 for reader_storage in self._reader_storages.values(): 

82 reader_storage.consume_measurement(measurement) 

83 

84 def register_asynchronous_instrument( 

85 self, 

86 instrument: ( 

87 "opentelemetry.sdk.metrics._internal.instrument._Asynchronous" 

88 ), 

89 ) -> None: 

90 with self._lock: 

91 self._async_instruments.append(instrument) 

92 

93 def collect( 

94 self, 

95 metric_reader: "opentelemetry.sdk.metrics.MetricReader", 

96 timeout_millis: float = 10_000, 

97 ) -> Iterable[Metric]: 

98 

99 with self._lock: 

100 metric_reader_storage = self._reader_storages[metric_reader] 

101 # for now, just use the defaults 

102 callback_options = CallbackOptions() 

103 deadline_ns = time_ns() + timeout_millis * 10**6 

104 

105 default_timeout_millis = 10000 * 10**6 

106 

107 for async_instrument in self._async_instruments: 

108 

109 remaining_time = deadline_ns - time_ns() 

110 

111 if remaining_time < default_timeout_millis: 

112 

113 callback_options = CallbackOptions( 

114 timeout_millis=remaining_time 

115 ) 

116 

117 measurements = async_instrument.callback(callback_options) 

118 if time_ns() >= deadline_ns: 

119 raise MetricsTimeoutError( 

120 "Timed out while executing callback" 

121 ) 

122 

123 for measurement in measurements: 

124 metric_reader_storage.consume_measurement(measurement) 

125 

126 return self._reader_storages[metric_reader].collect()