Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/opentelemetry/sdk/metrics/_internal/_view_instrument_match.py: 29%

55 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Copyright The OpenTelemetry Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from logging import getLogger 

17from threading import Lock 

18from time import time_ns 

19from typing import Dict, List, Sequence 

20 

21from opentelemetry.metrics import Instrument 

22from opentelemetry.sdk.metrics._internal.aggregation import ( 

23 Aggregation, 

24 DefaultAggregation, 

25 _Aggregation, 

26 _SumAggregation, 

27) 

28from opentelemetry.sdk.metrics._internal.export import AggregationTemporality 

29from opentelemetry.sdk.metrics._internal.measurement import Measurement 

30from opentelemetry.sdk.metrics._internal.point import DataPointT 

31from opentelemetry.sdk.metrics._internal.view import View 

32 

33_logger = getLogger(__name__) 

34 

35 

36class _ViewInstrumentMatch: 

37 def __init__( 

38 self, 

39 view: View, 

40 instrument: Instrument, 

41 instrument_class_aggregation: Dict[type, Aggregation], 

42 ): 

43 self._start_time_unix_nano = time_ns() 

44 self._view = view 

45 self._instrument = instrument 

46 self._attributes_aggregation: Dict[frozenset, _Aggregation] = {} 

47 self._lock = Lock() 

48 self._instrument_class_aggregation = instrument_class_aggregation 

49 self._name = self._view._name or self._instrument.name 

50 self._description = ( 

51 self._view._description or self._instrument.description 

52 ) 

53 if not isinstance(self._view._aggregation, DefaultAggregation): 

54 self._aggregation = self._view._aggregation._create_aggregation( 

55 self._instrument, None, 0 

56 ) 

57 else: 

58 self._aggregation = self._instrument_class_aggregation[ 

59 self._instrument.__class__ 

60 ]._create_aggregation(self._instrument, None, 0) 

61 

62 def conflicts(self, other: "_ViewInstrumentMatch") -> bool: 

63 # pylint: disable=protected-access 

64 

65 result = ( 

66 self._name == other._name 

67 and self._instrument.unit == other._instrument.unit 

68 # The aggregation class is being used here instead of data point 

69 # type since they are functionally equivalent. 

70 and self._aggregation.__class__ == other._aggregation.__class__ 

71 ) 

72 if isinstance(self._aggregation, _SumAggregation): 

73 result = ( 

74 result 

75 and self._aggregation._instrument_is_monotonic 

76 == other._aggregation._instrument_is_monotonic 

77 and self._aggregation._instrument_temporality 

78 == other._aggregation._instrument_temporality 

79 ) 

80 

81 return result 

82 

83 # pylint: disable=protected-access 

84 def consume_measurement(self, measurement: Measurement) -> None: 

85 

86 if self._view._attribute_keys is not None: 

87 

88 attributes = {} 

89 

90 for key, value in (measurement.attributes or {}).items(): 

91 if key in self._view._attribute_keys: 

92 attributes[key] = value 

93 elif measurement.attributes is not None: 

94 attributes = measurement.attributes 

95 else: 

96 attributes = {} 

97 

98 aggr_key = frozenset(attributes.items()) 

99 

100 if aggr_key not in self._attributes_aggregation: 

101 with self._lock: 

102 if aggr_key not in self._attributes_aggregation: 

103 if not isinstance( 

104 self._view._aggregation, DefaultAggregation 

105 ): 

106 aggregation = ( 

107 self._view._aggregation._create_aggregation( 

108 self._instrument, 

109 attributes, 

110 self._start_time_unix_nano, 

111 ) 

112 ) 

113 else: 

114 aggregation = self._instrument_class_aggregation[ 

115 self._instrument.__class__ 

116 ]._create_aggregation( 

117 self._instrument, 

118 attributes, 

119 self._start_time_unix_nano, 

120 ) 

121 self._attributes_aggregation[aggr_key] = aggregation 

122 

123 self._attributes_aggregation[aggr_key].aggregate(measurement) 

124 

125 def collect( 

126 self, 

127 aggregation_temporality: AggregationTemporality, 

128 collection_start_nanos: int, 

129 ) -> Sequence[DataPointT]: 

130 

131 data_points: List[DataPointT] = [] 

132 with self._lock: 

133 for aggregation in self._attributes_aggregation.values(): 

134 data_point = aggregation.collect( 

135 aggregation_temporality, collection_start_nanos 

136 ) 

137 if data_point is not None: 

138 data_points.append(data_point) 

139 return data_points