Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/opentelemetry/sdk/metrics/_internal/__init__.py: 23%

163 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Copyright The OpenTelemetry Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from atexit import register, unregister 

16from logging import getLogger 

17from threading import Lock 

18from time import time_ns 

19from typing import Optional, Sequence 

20 

21# This kind of import is needed to avoid Sphinx errors. 

22import opentelemetry.sdk.metrics 

23from opentelemetry.metrics import Counter as APICounter 

24from opentelemetry.metrics import Histogram as APIHistogram 

25from opentelemetry.metrics import Meter as APIMeter 

26from opentelemetry.metrics import MeterProvider as APIMeterProvider 

27from opentelemetry.metrics import NoOpMeter 

28from opentelemetry.metrics import ObservableCounter as APIObservableCounter 

29from opentelemetry.metrics import ObservableGauge as APIObservableGauge 

30from opentelemetry.metrics import ( 

31 ObservableUpDownCounter as APIObservableUpDownCounter, 

32) 

33from opentelemetry.metrics import UpDownCounter as APIUpDownCounter 

34from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError 

35from opentelemetry.sdk.metrics._internal.instrument import ( 

36 _Counter, 

37 _Histogram, 

38 _ObservableCounter, 

39 _ObservableGauge, 

40 _ObservableUpDownCounter, 

41 _UpDownCounter, 

42) 

43from opentelemetry.sdk.metrics._internal.measurement_consumer import ( 

44 MeasurementConsumer, 

45 SynchronousMeasurementConsumer, 

46) 

47from opentelemetry.sdk.metrics._internal.sdk_configuration import ( 

48 SdkConfiguration, 

49) 

50from opentelemetry.sdk.resources import Resource 

51from opentelemetry.sdk.util.instrumentation import InstrumentationScope 

52from opentelemetry.util._once import Once 

53 

54_logger = getLogger(__name__) 

55 

56 

57class Meter(APIMeter): 

58 """See `opentelemetry.metrics.Meter`.""" 

59 

60 def __init__( 

61 self, 

62 instrumentation_scope: InstrumentationScope, 

63 measurement_consumer: MeasurementConsumer, 

64 ): 

65 super().__init__(instrumentation_scope) 

66 self._instrumentation_scope = instrumentation_scope 

67 self._measurement_consumer = measurement_consumer 

68 self._instrument_id_instrument = {} 

69 self._instrument_id_instrument_lock = Lock() 

70 

71 def create_counter(self, name, unit="", description="") -> APICounter: 

72 

73 ( 

74 is_instrument_registered, 

75 instrument_id, 

76 ) = self._is_instrument_registered(name, _Counter, unit, description) 

77 

78 if is_instrument_registered: 

79 # FIXME #2558 go through all views here and check if this 

80 # instrument registration conflict can be fixed. If it can be, do 

81 # not log the following warning. 

82 _logger.warning( 

83 "An instrument with name %s, type %s, unit %s and " 

84 "description %s has been created already.", 

85 name, 

86 APICounter.__name__, 

87 unit, 

88 description, 

89 ) 

90 with self._instrument_id_instrument_lock: 

91 return self._instrument_id_instrument[instrument_id] 

92 

93 instrument = _Counter( 

94 name, 

95 self._instrumentation_scope, 

96 self._measurement_consumer, 

97 unit, 

98 description, 

99 ) 

100 

101 with self._instrument_id_instrument_lock: 

102 self._instrument_id_instrument[instrument_id] = instrument 

103 return instrument 

104 

105 def create_up_down_counter( 

106 self, name, unit="", description="" 

107 ) -> APIUpDownCounter: 

108 

109 ( 

110 is_instrument_registered, 

111 instrument_id, 

112 ) = self._is_instrument_registered( 

113 name, _UpDownCounter, unit, description 

114 ) 

115 

116 if is_instrument_registered: 

117 # FIXME #2558 go through all views here and check if this 

118 # instrument registration conflict can be fixed. If it can be, do 

119 # not log the following warning. 

120 _logger.warning( 

121 "An instrument with name %s, type %s, unit %s and " 

122 "description %s has been created already.", 

123 name, 

124 APIUpDownCounter.__name__, 

125 unit, 

126 description, 

127 ) 

128 with self._instrument_id_instrument_lock: 

129 return self._instrument_id_instrument[instrument_id] 

130 

131 instrument = _UpDownCounter( 

132 name, 

133 self._instrumentation_scope, 

134 self._measurement_consumer, 

135 unit, 

136 description, 

137 ) 

138 

139 with self._instrument_id_instrument_lock: 

140 self._instrument_id_instrument[instrument_id] = instrument 

141 return instrument 

142 

143 def create_observable_counter( 

144 self, name, callbacks=None, unit="", description="" 

145 ) -> APIObservableCounter: 

146 

147 ( 

148 is_instrument_registered, 

149 instrument_id, 

150 ) = self._is_instrument_registered( 

151 name, _ObservableCounter, unit, description 

152 ) 

153 

154 if is_instrument_registered: 

155 # FIXME #2558 go through all views here and check if this 

156 # instrument registration conflict can be fixed. If it can be, do 

157 # not log the following warning. 

158 _logger.warning( 

159 "An instrument with name %s, type %s, unit %s and " 

160 "description %s has been created already.", 

161 name, 

162 APIObservableCounter.__name__, 

163 unit, 

164 description, 

165 ) 

166 with self._instrument_id_instrument_lock: 

167 return self._instrument_id_instrument[instrument_id] 

168 

169 instrument = _ObservableCounter( 

170 name, 

171 self._instrumentation_scope, 

172 self._measurement_consumer, 

173 callbacks, 

174 unit, 

175 description, 

176 ) 

177 

178 self._measurement_consumer.register_asynchronous_instrument(instrument) 

179 

180 with self._instrument_id_instrument_lock: 

181 self._instrument_id_instrument[instrument_id] = instrument 

182 return instrument 

183 

184 def create_histogram(self, name, unit="", description="") -> APIHistogram: 

185 

186 ( 

187 is_instrument_registered, 

188 instrument_id, 

189 ) = self._is_instrument_registered(name, _Histogram, unit, description) 

190 

191 if is_instrument_registered: 

192 # FIXME #2558 go through all views here and check if this 

193 # instrument registration conflict can be fixed. If it can be, do 

194 # not log the following warning. 

195 _logger.warning( 

196 "An instrument with name %s, type %s, unit %s and " 

197 "description %s has been created already.", 

198 name, 

199 APIHistogram.__name__, 

200 unit, 

201 description, 

202 ) 

203 with self._instrument_id_instrument_lock: 

204 return self._instrument_id_instrument[instrument_id] 

205 

206 instrument = _Histogram( 

207 name, 

208 self._instrumentation_scope, 

209 self._measurement_consumer, 

210 unit, 

211 description, 

212 ) 

213 with self._instrument_id_instrument_lock: 

214 self._instrument_id_instrument[instrument_id] = instrument 

215 return instrument 

216 

217 def create_observable_gauge( 

218 self, name, callbacks=None, unit="", description="" 

219 ) -> APIObservableGauge: 

220 

221 ( 

222 is_instrument_registered, 

223 instrument_id, 

224 ) = self._is_instrument_registered( 

225 name, _ObservableGauge, unit, description 

226 ) 

227 

228 if is_instrument_registered: 

229 # FIXME #2558 go through all views here and check if this 

230 # instrument registration conflict can be fixed. If it can be, do 

231 # not log the following warning. 

232 _logger.warning( 

233 "An instrument with name %s, type %s, unit %s and " 

234 "description %s has been created already.", 

235 name, 

236 APIObservableGauge.__name__, 

237 unit, 

238 description, 

239 ) 

240 with self._instrument_id_instrument_lock: 

241 return self._instrument_id_instrument[instrument_id] 

242 

243 instrument = _ObservableGauge( 

244 name, 

245 self._instrumentation_scope, 

246 self._measurement_consumer, 

247 callbacks, 

248 unit, 

249 description, 

250 ) 

251 

252 self._measurement_consumer.register_asynchronous_instrument(instrument) 

253 

254 with self._instrument_id_instrument_lock: 

255 self._instrument_id_instrument[instrument_id] = instrument 

256 return instrument 

257 

258 def create_observable_up_down_counter( 

259 self, name, callbacks=None, unit="", description="" 

260 ) -> APIObservableUpDownCounter: 

261 

262 ( 

263 is_instrument_registered, 

264 instrument_id, 

265 ) = self._is_instrument_registered( 

266 name, _ObservableUpDownCounter, unit, description 

267 ) 

268 

269 if is_instrument_registered: 

270 # FIXME #2558 go through all views here and check if this 

271 # instrument registration conflict can be fixed. If it can be, do 

272 # not log the following warning. 

273 _logger.warning( 

274 "An instrument with name %s, type %s, unit %s and " 

275 "description %s has been created already.", 

276 name, 

277 APIObservableUpDownCounter.__name__, 

278 unit, 

279 description, 

280 ) 

281 with self._instrument_id_instrument_lock: 

282 return self._instrument_id_instrument[instrument_id] 

283 

284 instrument = _ObservableUpDownCounter( 

285 name, 

286 self._instrumentation_scope, 

287 self._measurement_consumer, 

288 callbacks, 

289 unit, 

290 description, 

291 ) 

292 

293 self._measurement_consumer.register_asynchronous_instrument(instrument) 

294 

295 with self._instrument_id_instrument_lock: 

296 self._instrument_id_instrument[instrument_id] = instrument 

297 return instrument 

298 

299 

300class MeterProvider(APIMeterProvider): 

301 r"""See `opentelemetry.metrics.MeterProvider`. 

302 

303 Args: 

304 metric_readers: Register metric readers to collect metrics from the SDK 

305 on demand. Each :class:`opentelemetry.sdk.metrics.export.MetricReader` is 

306 completely independent and will collect separate streams of 

307 metrics. TODO: reference ``PeriodicExportingMetricReader`` usage with push 

308 exporters here. 

309 resource: The resource representing what the metrics emitted from the SDK pertain to. 

310 shutdown_on_exit: If true, registers an `atexit` handler to call 

311 `MeterProvider.shutdown` 

312 views: The views to configure the metric output the SDK 

313 

314 By default, instruments which do not match any :class:`opentelemetry.sdk.metrics.view.View` (or if no :class:`opentelemetry.sdk.metrics.view.View`\ s 

315 are provided) will report metrics with the default aggregation for the 

316 instrument's kind. To disable instruments by default, configure a match-all 

317 :class:`opentelemetry.sdk.metrics.view.View` with `DropAggregation` and then create :class:`opentelemetry.sdk.metrics.view.View`\ s to re-enable 

318 individual instruments: 

319 

320 .. code-block:: python 

321 :caption: Disable default views 

322 

323 MeterProvider( 

324 views=[ 

325 View(instrument_name="*", aggregation=DropAggregation()), 

326 View(instrument_name="mycounter"), 

327 ], 

328 # ... 

329 ) 

330 """ 

331 

332 _all_metric_readers_lock = Lock() 

333 _all_metric_readers = set() 

334 

335 def __init__( 

336 self, 

337 metric_readers: Sequence[ 

338 "opentelemetry.sdk.metrics.export.MetricReader" 

339 ] = (), 

340 resource: Resource = Resource.create({}), 

341 shutdown_on_exit: bool = True, 

342 views: Sequence["opentelemetry.sdk.metrics.view.View"] = (), 

343 ): 

344 self._lock = Lock() 

345 self._meter_lock = Lock() 

346 self._atexit_handler = None 

347 self._sdk_config = SdkConfiguration( 

348 resource=resource, 

349 metric_readers=metric_readers, 

350 views=views, 

351 ) 

352 self._measurement_consumer = SynchronousMeasurementConsumer( 

353 sdk_config=self._sdk_config 

354 ) 

355 

356 if shutdown_on_exit: 

357 self._atexit_handler = register(self.shutdown) 

358 

359 self._meters = {} 

360 self._shutdown_once = Once() 

361 self._shutdown = False 

362 

363 for metric_reader in self._sdk_config.metric_readers: 

364 

365 with self._all_metric_readers_lock: 

366 if metric_reader in self._all_metric_readers: 

367 raise Exception( 

368 f"MetricReader {metric_reader} has been registered " 

369 "already in other MeterProvider instance" 

370 ) 

371 

372 self._all_metric_readers.add(metric_reader) 

373 

374 metric_reader._set_collect_callback( 

375 self._measurement_consumer.collect 

376 ) 

377 

378 def force_flush(self, timeout_millis: float = 10_000) -> bool: 

379 deadline_ns = time_ns() + timeout_millis * 10**6 

380 

381 metric_reader_error = {} 

382 

383 for metric_reader in self._sdk_config.metric_readers: 

384 current_ts = time_ns() 

385 try: 

386 if current_ts >= deadline_ns: 

387 raise MetricsTimeoutError( 

388 "Timed out while flushing metric readers" 

389 ) 

390 metric_reader.force_flush( 

391 timeout_millis=(deadline_ns - current_ts) / 10**6 

392 ) 

393 

394 # pylint: disable=broad-except 

395 except Exception as error: 

396 

397 metric_reader_error[metric_reader] = error 

398 

399 if metric_reader_error: 

400 

401 metric_reader_error_string = "\n".join( 

402 [ 

403 f"{metric_reader.__class__.__name__}: {repr(error)}" 

404 for metric_reader, error in metric_reader_error.items() 

405 ] 

406 ) 

407 

408 raise Exception( 

409 "MeterProvider.force_flush failed because the following " 

410 "metric readers failed during collect:\n" 

411 f"{metric_reader_error_string}" 

412 ) 

413 return True 

414 

415 def shutdown(self, timeout_millis: float = 30_000): 

416 deadline_ns = time_ns() + timeout_millis * 10**6 

417 

418 def _shutdown(): 

419 self._shutdown = True 

420 

421 did_shutdown = self._shutdown_once.do_once(_shutdown) 

422 

423 if not did_shutdown: 

424 _logger.warning("shutdown can only be called once") 

425 return 

426 

427 metric_reader_error = {} 

428 

429 for metric_reader in self._sdk_config.metric_readers: 

430 current_ts = time_ns() 

431 try: 

432 if current_ts >= deadline_ns: 

433 raise Exception( 

434 "Didn't get to execute, deadline already exceeded" 

435 ) 

436 metric_reader.shutdown( 

437 timeout_millis=(deadline_ns - current_ts) / 10**6 

438 ) 

439 

440 # pylint: disable=broad-except 

441 except Exception as error: 

442 

443 metric_reader_error[metric_reader] = error 

444 

445 if self._atexit_handler is not None: 

446 unregister(self._atexit_handler) 

447 self._atexit_handler = None 

448 

449 if metric_reader_error: 

450 

451 metric_reader_error_string = "\n".join( 

452 [ 

453 f"{metric_reader.__class__.__name__}: {repr(error)}" 

454 for metric_reader, error in metric_reader_error.items() 

455 ] 

456 ) 

457 

458 raise Exception( 

459 ( 

460 "MeterProvider.shutdown failed because the following " 

461 "metric readers failed during shutdown:\n" 

462 f"{metric_reader_error_string}" 

463 ) 

464 ) 

465 

466 def get_meter( 

467 self, 

468 name: str, 

469 version: Optional[str] = None, 

470 schema_url: Optional[str] = None, 

471 ) -> Meter: 

472 

473 if self._shutdown: 

474 _logger.warning( 

475 "A shutdown `MeterProvider` can not provide a `Meter`" 

476 ) 

477 return NoOpMeter(name, version=version, schema_url=schema_url) 

478 

479 if not name: 

480 _logger.warning("Meter name cannot be None or empty.") 

481 return NoOpMeter(name, version=version, schema_url=schema_url) 

482 

483 info = InstrumentationScope(name, version, schema_url) 

484 with self._meter_lock: 

485 if not self._meters.get(info): 

486 # FIXME #2558 pass SDKConfig object to meter so that the meter 

487 # has access to views. 

488 self._meters[info] = Meter( 

489 info, 

490 self._measurement_consumer, 

491 ) 

492 return self._meters[info]