Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/opentelemetry/sdk/metrics/_internal/instrument.py: 39%

120 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Copyright The OpenTelemetry Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15# pylint: disable=too-many-ancestors, unused-import 

16 

17from logging import getLogger 

18from typing import Dict, Generator, Iterable, List, Optional, Union 

19 

20# This kind of import is needed to avoid Sphinx errors. 

21import opentelemetry.sdk.metrics 

22from opentelemetry.metrics import CallbackT 

23from opentelemetry.metrics import Counter as APICounter 

24from opentelemetry.metrics import Histogram as APIHistogram 

25from opentelemetry.metrics import ObservableCounter as APIObservableCounter 

26from opentelemetry.metrics import ObservableGauge as APIObservableGauge 

27from opentelemetry.metrics import ( 

28 ObservableUpDownCounter as APIObservableUpDownCounter, 

29) 

30from opentelemetry.metrics import UpDownCounter as APIUpDownCounter 

31from opentelemetry.metrics._internal.instrument import CallbackOptions 

32from opentelemetry.sdk.metrics._internal.measurement import Measurement 

33from opentelemetry.sdk.util.instrumentation import InstrumentationScope 

34 

35_logger = getLogger(__name__) 

36 

37 

38_ERROR_MESSAGE = ( 

39 "Expected ASCII string of maximum length 63 characters but got {}" 

40) 

41 

42 

43class _Synchronous: 

44 def __init__( 

45 self, 

46 name: str, 

47 instrumentation_scope: InstrumentationScope, 

48 measurement_consumer: "opentelemetry.sdk.metrics.MeasurementConsumer", 

49 unit: str = "", 

50 description: str = "", 

51 ): 

52 # pylint: disable=no-member 

53 result = self._check_name_unit_description(name, unit, description) 

54 

55 if result["name"] is None: 

56 raise Exception(_ERROR_MESSAGE.format(name)) 

57 

58 if result["unit"] is None: 

59 raise Exception(_ERROR_MESSAGE.format(unit)) 

60 

61 name = result["name"] 

62 unit = result["unit"] 

63 description = result["description"] 

64 

65 self.name = name.lower() 

66 self.unit = unit 

67 self.description = description 

68 self.instrumentation_scope = instrumentation_scope 

69 self._measurement_consumer = measurement_consumer 

70 super().__init__(name, unit=unit, description=description) 

71 

72 

73class _Asynchronous: 

74 def __init__( 

75 self, 

76 name: str, 

77 instrumentation_scope: InstrumentationScope, 

78 measurement_consumer: "opentelemetry.sdk.metrics.MeasurementConsumer", 

79 callbacks: Optional[Iterable[CallbackT]] = None, 

80 unit: str = "", 

81 description: str = "", 

82 ): 

83 # pylint: disable=no-member 

84 result = self._check_name_unit_description(name, unit, description) 

85 

86 if result["name"] is None: 

87 raise Exception(_ERROR_MESSAGE.format(name)) 

88 

89 if result["unit"] is None: 

90 raise Exception(_ERROR_MESSAGE.format(unit)) 

91 

92 name = result["name"] 

93 unit = result["unit"] 

94 description = result["description"] 

95 

96 self.name = name.lower() 

97 self.unit = unit 

98 self.description = description 

99 self.instrumentation_scope = instrumentation_scope 

100 self._measurement_consumer = measurement_consumer 

101 super().__init__(name, callbacks, unit=unit, description=description) 

102 

103 self._callbacks: List[CallbackT] = [] 

104 

105 if callbacks is not None: 

106 

107 for callback in callbacks: 

108 

109 if isinstance(callback, Generator): 

110 

111 # advance generator to it's first yield 

112 next(callback) 

113 

114 def inner( 

115 options: CallbackOptions, 

116 callback=callback, 

117 ) -> Iterable[Measurement]: 

118 try: 

119 return callback.send(options) 

120 except StopIteration: 

121 return [] 

122 

123 self._callbacks.append(inner) 

124 else: 

125 self._callbacks.append(callback) 

126 

127 def callback( 

128 self, callback_options: CallbackOptions 

129 ) -> Iterable[Measurement]: 

130 for callback in self._callbacks: 

131 try: 

132 for api_measurement in callback(callback_options): 

133 yield Measurement( 

134 api_measurement.value, 

135 instrument=self, 

136 attributes=api_measurement.attributes, 

137 ) 

138 except Exception: # pylint: disable=broad-except 

139 _logger.exception( 

140 "Callback failed for instrument %s.", self.name 

141 ) 

142 

143 

144class Counter(_Synchronous, APICounter): 

145 def __new__(cls, *args, **kwargs): 

146 if cls is Counter: 

147 raise TypeError("Counter must be instantiated via a meter.") 

148 return super().__new__(cls) 

149 

150 def add( 

151 self, amount: Union[int, float], attributes: Dict[str, str] = None 

152 ): 

153 if amount < 0: 

154 _logger.warning( 

155 "Add amount must be non-negative on Counter %s.", self.name 

156 ) 

157 return 

158 self._measurement_consumer.consume_measurement( 

159 Measurement(amount, self, attributes) 

160 ) 

161 

162 

163class UpDownCounter(_Synchronous, APIUpDownCounter): 

164 def __new__(cls, *args, **kwargs): 

165 if cls is UpDownCounter: 

166 raise TypeError("UpDownCounter must be instantiated via a meter.") 

167 return super().__new__(cls) 

168 

169 def add( 

170 self, amount: Union[int, float], attributes: Dict[str, str] = None 

171 ): 

172 self._measurement_consumer.consume_measurement( 

173 Measurement(amount, self, attributes) 

174 ) 

175 

176 

177class ObservableCounter(_Asynchronous, APIObservableCounter): 

178 def __new__(cls, *args, **kwargs): 

179 if cls is ObservableCounter: 

180 raise TypeError( 

181 "ObservableCounter must be instantiated via a meter." 

182 ) 

183 return super().__new__(cls) 

184 

185 

186class ObservableUpDownCounter(_Asynchronous, APIObservableUpDownCounter): 

187 def __new__(cls, *args, **kwargs): 

188 if cls is ObservableUpDownCounter: 

189 raise TypeError( 

190 "ObservableUpDownCounter must be instantiated via a meter." 

191 ) 

192 return super().__new__(cls) 

193 

194 

195class Histogram(_Synchronous, APIHistogram): 

196 def __new__(cls, *args, **kwargs): 

197 if cls is Histogram: 

198 raise TypeError("Histogram must be instantiated via a meter.") 

199 return super().__new__(cls) 

200 

201 def record( 

202 self, amount: Union[int, float], attributes: Dict[str, str] = None 

203 ): 

204 if amount < 0: 

205 _logger.warning( 

206 "Record amount must be non-negative on Histogram %s.", 

207 self.name, 

208 ) 

209 return 

210 self._measurement_consumer.consume_measurement( 

211 Measurement(amount, self, attributes) 

212 ) 

213 

214 

215class ObservableGauge(_Asynchronous, APIObservableGauge): 

216 def __new__(cls, *args, **kwargs): 

217 if cls is ObservableGauge: 

218 raise TypeError( 

219 "ObservableGauge must be instantiated via a meter." 

220 ) 

221 return super().__new__(cls) 

222 

223 

224# Below classes exist to prevent the direct instantiation 

225class _Counter(Counter): 

226 pass 

227 

228 

229class _UpDownCounter(UpDownCounter): 

230 pass 

231 

232 

233class _ObservableCounter(ObservableCounter): 

234 pass 

235 

236 

237class _ObservableUpDownCounter(ObservableUpDownCounter): 

238 pass 

239 

240 

241class _Histogram(Histogram): 

242 pass 

243 

244 

245class _ObservableGauge(ObservableGauge): 

246 pass