Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prometheus_client/metrics.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

318 statements  

1import os 

2from threading import Lock 

3import time 

4import types 

5from typing import ( 

6 Any, Callable, Dict, Iterable, List, Literal, Optional, Sequence, Tuple, 

7 Type, TypeVar, Union, 

8) 

9import warnings 

10 

11from . import values # retain this import style for testability 

12from .context_managers import ExceptionCounter, InprogressTracker, Timer 

13from .metrics_core import Metric 

14from .registry import Collector, CollectorRegistry, REGISTRY 

15from .samples import Exemplar, Sample 

16from .utils import floatToGoString, INF 

17from .validation import ( 

18 _validate_exemplar, _validate_labelnames, _validate_metric_name, 

19) 

20 

21T = TypeVar('T', bound='MetricWrapperBase') 

22F = TypeVar("F", bound=Callable[..., Any]) 

23 

24 

25def _build_full_name(metric_type, name, namespace, subsystem, unit): 

26 if not name: 

27 raise ValueError('Metric name should not be empty') 

28 full_name = '' 

29 if namespace: 

30 full_name += namespace + '_' 

31 if subsystem: 

32 full_name += subsystem + '_' 

33 full_name += name 

34 if metric_type == 'counter' and full_name.endswith('_total'): 

35 full_name = full_name[:-6] # Munge to OpenMetrics. 

36 if unit and not full_name.endswith("_" + unit): 

37 full_name += "_" + unit 

38 if unit and metric_type in ('info', 'stateset'): 

39 raise ValueError('Metric name is of a type that cannot have a unit: ' + full_name) 

40 return full_name 

41 

42 

43 

44def _get_use_created() -> bool: 

45 return os.environ.get("PROMETHEUS_DISABLE_CREATED_SERIES", 'False').lower() not in ('true', '1', 't') 

46 

47 

48_use_created = _get_use_created() 

49 

50 

51def disable_created_metrics(): 

52 """Disable exporting _created metrics on counters, histograms, and summaries.""" 

53 global _use_created 

54 _use_created = False 

55 

56 

57def enable_created_metrics(): 

58 """Enable exporting _created metrics on counters, histograms, and summaries.""" 

59 global _use_created 

60 _use_created = True 

61 

62 

63class MetricWrapperBase(Collector): 

64 _type: Optional[str] = None 

65 _reserved_labelnames: Sequence[str] = () 

66 

67 def _is_observable(self): 

68 # Whether this metric is observable, i.e. 

69 # * a metric without label names and values, or 

70 # * the child of a labelled metric. 

71 return not self._labelnames or (self._labelnames and self._labelvalues) 

72 

73 def _raise_if_not_observable(self): 

74 # Functions that mutate the state of the metric, for example incrementing 

75 # a counter, will fail if the metric is not observable, because only if a 

76 # metric is observable will the value be initialized. 

77 if not self._is_observable(): 

78 raise ValueError('%s metric is missing label values' % str(self._type)) 

79 

80 def _is_parent(self): 

81 return self._labelnames and not self._labelvalues 

82 

83 def _get_metric(self): 

84 return Metric(self._name, self._documentation, self._type, self._unit) 

85 

86 def describe(self) -> Iterable[Metric]: 

87 return [self._get_metric()] 

88 

89 def collect(self) -> Iterable[Metric]: 

90 metric = self._get_metric() 

91 for suffix, labels, value, timestamp, exemplar, native_histogram_value in self._samples(): 

92 metric.add_sample(self._name + suffix, labels, value, timestamp, exemplar, native_histogram_value) 

93 return [metric] 

94 

95 def __str__(self) -> str: 

96 return f"{self._type}:{self._name}" 

97 

98 def __repr__(self) -> str: 

99 metric_type = type(self) 

100 return f"{metric_type.__module__}.{metric_type.__name__}({self._name})" 

101 

102 def __init__(self: T, 

103 name: str, 

104 documentation: str, 

105 labelnames: Iterable[str] = (), 

106 namespace: str = '', 

107 subsystem: str = '', 

108 unit: str = '', 

109 registry: Optional[CollectorRegistry] = REGISTRY, 

110 _labelvalues: Optional[Sequence[str]] = None, 

111 ) -> None: 

112 

113 self._original_name = name 

114 self._namespace = namespace 

115 self._subsystem = subsystem 

116 self._name = _build_full_name(self._type, name, namespace, subsystem, unit) 

117 self._labelnames = _validate_labelnames(self, labelnames) 

118 self._labelvalues = tuple(_labelvalues or ()) 

119 self._kwargs: Dict[str, Any] = {} 

120 self._documentation = documentation 

121 self._unit = unit 

122 

123 _validate_metric_name(self._name) 

124 

125 if self._is_parent(): 

126 # Prepare the fields needed for child metrics. 

127 self._lock = Lock() 

128 self._metrics: Dict[Sequence[str], T] = {} 

129 

130 if self._is_observable(): 

131 self._metric_init() 

132 

133 if not self._labelvalues: 

134 # Register the multi-wrapper parent metric, or if a label-less metric, the whole shebang. 

135 if registry: 

136 registry.register(self) 

137 

138 def labels(self: T, *labelvalues: Any, **labelkwargs: Any) -> T: 

139 """Return the child for the given labelset. 

140 

141 All metrics can have labels, allowing grouping of related time series. 

142 Taking a counter as an example: 

143 

144 from prometheus_client import Counter 

145 

146 c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint']) 

147 c.labels('get', '/').inc() 

148 c.labels('post', '/submit').inc() 

149 

150 Labels can also be provided as keyword arguments: 

151 

152 from prometheus_client import Counter 

153 

154 c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint']) 

155 c.labels(method='get', endpoint='/').inc() 

156 c.labels(method='post', endpoint='/submit').inc() 

157 

158 See the best practices on [naming](http://prometheus.io/docs/practices/naming/) 

159 and [labels](http://prometheus.io/docs/practices/instrumentation/#use-labels). 

160 """ 

161 if not self._labelnames: 

162 raise ValueError('No label names were set when constructing %s' % self) 

163 

164 if self._labelvalues: 

165 raise ValueError('{} already has labels set ({}); can not chain calls to .labels()'.format( 

166 self, 

167 dict(zip(self._labelnames, self._labelvalues)) 

168 )) 

169 

170 if labelvalues and labelkwargs: 

171 raise ValueError("Can't pass both *args and **kwargs") 

172 

173 if labelkwargs: 

174 if sorted(labelkwargs) != sorted(self._labelnames): 

175 raise ValueError('Incorrect label names') 

176 labelvalues = tuple(str(labelkwargs[l]) for l in self._labelnames) 

177 else: 

178 if len(labelvalues) != len(self._labelnames): 

179 raise ValueError('Incorrect label count') 

180 labelvalues = tuple(str(l) for l in labelvalues) 

181 with self._lock: 

182 if labelvalues not in self._metrics: 

183 

184 original_name = getattr(self, '_original_name', self._name) 

185 namespace = getattr(self, '_namespace', '') 

186 subsystem = getattr(self, '_subsystem', '') 

187 unit = getattr(self, '_unit', '') 

188 

189 child_kwargs = dict(self._kwargs) if self._kwargs else {} 

190 for k in ('namespace', 'subsystem', 'unit'): 

191 child_kwargs.pop(k, None) 

192 

193 self._metrics[labelvalues] = self.__class__( 

194 original_name, 

195 documentation=self._documentation, 

196 labelnames=self._labelnames, 

197 namespace=namespace, 

198 subsystem=subsystem, 

199 unit=unit, 

200 _labelvalues=labelvalues, 

201 **child_kwargs 

202 ) 

203 return self._metrics[labelvalues] 

204 

205 def remove(self, *labelvalues: Any) -> None: 

206 if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ: 

207 warnings.warn( 

208 "Removal of labels has not been implemented in multi-process mode yet.", 

209 UserWarning) 

210 

211 if not self._labelnames: 

212 raise ValueError('No label names were set when constructing %s' % self) 

213 

214 """Remove the given labelset from the metric.""" 

215 if len(labelvalues) != len(self._labelnames): 

216 raise ValueError('Incorrect label count (expected %d, got %s)' % (len(self._labelnames), labelvalues)) 

217 labelvalues = tuple(str(l) for l in labelvalues) 

218 with self._lock: 

219 if labelvalues in self._metrics: 

220 del self._metrics[labelvalues] 

221 

222 def remove_by_labels(self, labels: dict[str, str]) -> None: 

223 """Remove all series whose labelset partially matches the given labels.""" 

224 if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ: 

225 warnings.warn( 

226 "Removal of labels has not been implemented in multi-process mode yet.", 

227 UserWarning 

228 ) 

229 

230 if not self._labelnames: 

231 raise ValueError('No label names were set when constructing %s' % self) 

232 

233 if not isinstance(labels, dict): 

234 raise TypeError("labels must be a dict of {label_name: label_value}") 

235 

236 if not labels: 

237 return # no operation 

238 

239 invalid = [k for k in labels.keys() if k not in self._labelnames] 

240 if invalid: 

241 raise ValueError( 

242 'Unknown label names: %s; expected %s' % (invalid, self._labelnames) 

243 ) 

244 

245 pos_filter = {self._labelnames.index(k): str(v) for k, v in labels.items()} 

246 

247 with self._lock: 

248 # list(...) to avoid "dictionary changed size during iteration" 

249 for lv in list(self._metrics.keys()): 

250 if all(lv[pos] == want for pos, want in pos_filter.items()): 

251 # pop with default avoids KeyError if concurrently removed 

252 self._metrics.pop(lv, None) 

253 

254 

255 def clear(self) -> None: 

256 """Remove all labelsets from the metric""" 

257 if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ: 

258 warnings.warn( 

259 "Clearing labels has not been implemented in multi-process mode yet", 

260 UserWarning) 

261 with self._lock: 

262 self._metrics = {} 

263 

264 def _samples(self) -> Iterable[Sample]: 

265 if self._is_parent(): 

266 return self._multi_samples() 

267 else: 

268 return self._child_samples() 

269 

270 def _multi_samples(self) -> Iterable[Sample]: 

271 with self._lock: 

272 metrics = self._metrics.copy() 

273 for labels, metric in metrics.items(): 

274 series_labels = list(zip(self._labelnames, labels)) 

275 for suffix, sample_labels, value, timestamp, exemplar, native_histogram_value in metric._samples(): 

276 yield Sample(suffix, dict(series_labels + list(sample_labels.items())), value, timestamp, exemplar, native_histogram_value) 

277 

278 def _child_samples(self) -> Iterable[Sample]: # pragma: no cover 

279 raise NotImplementedError('_child_samples() must be implemented by %r' % self) 

280 

281 def _metric_init(self): # pragma: no cover 

282 """ 

283 Initialize the metric object as a child, i.e. when it has labels (if any) set. 

284 

285 This is factored as a separate function to allow for deferred initialization. 

286 """ 

287 raise NotImplementedError('_metric_init() must be implemented by %r' % self) 

288 

289 

290class Counter(MetricWrapperBase): 

291 """A Counter tracks counts of events or running totals. 

292 

293 Example use cases for Counters: 

294 - Number of requests processed 

295 - Number of items that were inserted into a queue 

296 - Total amount of data that a system has processed 

297 

298 Counters can only go up (and be reset when the process restarts). If your use case can go down, 

299 you should use a Gauge instead. 

300 

301 An example for a Counter: 

302 

303 from prometheus_client import Counter 

304 

305 c = Counter('my_failures_total', 'Description of counter') 

306 c.inc() # Increment by 1 

307 c.inc(1.6) # Increment by given value 

308 

309 There are utilities to count exceptions raised: 

310 

311 @c.count_exceptions() 

312 def f(): 

313 pass 

314 

315 with c.count_exceptions(): 

316 pass 

317 

318 # Count only one type of exception 

319 with c.count_exceptions(ValueError): 

320 pass 

321  

322 You can also reset the counter to zero in case your logical "process" restarts 

323 without restarting the actual python process. 

324 

325 c.reset() 

326 

327 """ 

328 _type = 'counter' 

329 

330 def _metric_init(self) -> None: 

331 self._value = values.ValueClass(self._type, self._name, self._name + '_total', self._labelnames, 

332 self._labelvalues, self._documentation) 

333 self._created = time.time() 

334 

335 def inc(self, amount: float = 1, exemplar: Optional[Dict[str, str]] = None) -> None: 

336 """Increment counter by the given amount.""" 

337 self._raise_if_not_observable() 

338 if amount < 0: 

339 raise ValueError('Counters can only be incremented by non-negative amounts.') 

340 self._value.inc(amount) 

341 if exemplar: 

342 _validate_exemplar(exemplar) 

343 self._value.set_exemplar(Exemplar(exemplar, amount, time.time())) 

344 

345 def reset(self) -> None: 

346 """Reset the counter to zero. Use this when a logical process restarts without restarting the actual python process.""" 

347 self._value.set(0) 

348 self._created = time.time() 

349 

350 def count_exceptions(self, exception: Union[Type[BaseException], Tuple[Type[BaseException], ...]] = Exception) -> ExceptionCounter: 

351 """Count exceptions in a block of code or function. 

352 

353 Can be used as a function decorator or context manager. 

354 Increments the counter when an exception of the given 

355 type is raised up out of the code. 

356 """ 

357 self._raise_if_not_observable() 

358 return ExceptionCounter(self, exception) 

359 

360 def _child_samples(self) -> Iterable[Sample]: 

361 sample = Sample('_total', {}, self._value.get(), None, self._value.get_exemplar()) 

362 if _use_created: 

363 return ( 

364 sample, 

365 Sample('_created', {}, self._created, None, None) 

366 ) 

367 return (sample,) 

368 

369 

370class Gauge(MetricWrapperBase): 

371 """Gauge metric, to report instantaneous values. 

372 

373 Examples of Gauges include: 

374 - Inprogress requests 

375 - Number of items in a queue 

376 - Free memory 

377 - Total memory 

378 - Temperature 

379 

380 Gauges can go both up and down. 

381 

382 from prometheus_client import Gauge 

383 

384 g = Gauge('my_inprogress_requests', 'Description of gauge') 

385 g.inc() # Increment by 1 

386 g.dec(10) # Decrement by given value 

387 g.set(4.2) # Set to a given value 

388 

389 There are utilities for common use cases: 

390 

391 g.set_to_current_time() # Set to current unixtime 

392 

393 # Increment when entered, decrement when exited. 

394 @g.track_inprogress() 

395 def f(): 

396 pass 

397 

398 with g.track_inprogress(): 

399 pass 

400 

401 A Gauge can also take its value from a callback: 

402 

403 d = Gauge('data_objects', 'Number of objects') 

404 my_dict = {} 

405 d.set_function(lambda: len(my_dict)) 

406 """ 

407 _type = 'gauge' 

408 _MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent')) 

409 _MOST_RECENT_MODES = frozenset(('mostrecent', 'livemostrecent')) 

410 

411 def __init__(self, 

412 name: str, 

413 documentation: str, 

414 labelnames: Iterable[str] = (), 

415 namespace: str = '', 

416 subsystem: str = '', 

417 unit: str = '', 

418 registry: Optional[CollectorRegistry] = REGISTRY, 

419 _labelvalues: Optional[Sequence[str]] = None, 

420 multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'] = 'all', 

421 ): 

422 self._multiprocess_mode = multiprocess_mode 

423 if multiprocess_mode not in self._MULTIPROC_MODES: 

424 raise ValueError('Invalid multiprocess mode: ' + multiprocess_mode) 

425 super().__init__( 

426 name=name, 

427 documentation=documentation, 

428 labelnames=labelnames, 

429 namespace=namespace, 

430 subsystem=subsystem, 

431 unit=unit, 

432 registry=registry, 

433 _labelvalues=_labelvalues, 

434 ) 

435 self._kwargs['multiprocess_mode'] = self._multiprocess_mode 

436 self._is_most_recent = self._multiprocess_mode in self._MOST_RECENT_MODES 

437 

438 def _metric_init(self) -> None: 

439 self._value = values.ValueClass( 

440 self._type, self._name, self._name, self._labelnames, self._labelvalues, 

441 self._documentation, multiprocess_mode=self._multiprocess_mode 

442 ) 

443 

444 def inc(self, amount: float = 1) -> None: 

445 """Increment gauge by the given amount.""" 

446 if self._is_most_recent: 

447 raise RuntimeError("inc must not be used with the mostrecent mode") 

448 self._raise_if_not_observable() 

449 self._value.inc(amount) 

450 

451 def dec(self, amount: float = 1) -> None: 

452 """Decrement gauge by the given amount.""" 

453 if self._is_most_recent: 

454 raise RuntimeError("dec must not be used with the mostrecent mode") 

455 self._raise_if_not_observable() 

456 self._value.inc(-amount) 

457 

458 def set(self, value: float) -> None: 

459 """Set gauge to the given value.""" 

460 self._raise_if_not_observable() 

461 if self._is_most_recent: 

462 self._value.set(float(value), timestamp=time.time()) 

463 else: 

464 self._value.set(float(value)) 

465 

466 def set_to_current_time(self) -> None: 

467 """Set gauge to the current unixtime.""" 

468 self.set(time.time()) 

469 

470 def track_inprogress(self) -> InprogressTracker: 

471 """Track inprogress blocks of code or functions. 

472 

473 Can be used as a function decorator or context manager. 

474 Increments the gauge when the code is entered, 

475 and decrements when it is exited. 

476 """ 

477 self._raise_if_not_observable() 

478 return InprogressTracker(self) 

479 

480 def time(self) -> Timer: 

481 """Time a block of code or function, and set the duration in seconds. 

482 

483 Can be used as a function decorator or context manager. 

484 """ 

485 return Timer(self, 'set') 

486 

487 def set_function(self, f: Callable[[], float]) -> None: 

488 """Call the provided function to return the Gauge value. 

489 

490 The function must return a float, and may be called from 

491 multiple threads. All other methods of the Gauge become NOOPs. 

492 """ 

493 

494 self._raise_if_not_observable() 

495 

496 def samples(_: Gauge) -> Iterable[Sample]: 

497 return (Sample('', {}, float(f()), None, None),) 

498 

499 self._child_samples = types.MethodType(samples, self) # type: ignore 

500 

501 def _child_samples(self) -> Iterable[Sample]: 

502 return (Sample('', {}, self._value.get(), None, None),) 

503 

504 

505class Summary(MetricWrapperBase): 

506 """A Summary tracks the size and number of events. 

507 

508 Example use cases for Summaries: 

509 - Response latency 

510 - Request size 

511 

512 Example for a Summary: 

513 

514 from prometheus_client import Summary 

515 

516 s = Summary('request_size_bytes', 'Request size (bytes)') 

517 s.observe(512) # Observe 512 (bytes) 

518 

519 Example for a Summary using time: 

520 

521 from prometheus_client import Summary 

522 

523 REQUEST_TIME = Summary('response_latency_seconds', 'Response latency (seconds)') 

524 

525 @REQUEST_TIME.time() 

526 def create_response(request): 

527 '''A dummy function''' 

528 time.sleep(1) 

529 

530 Example for using the same Summary object as a context manager: 

531 

532 with REQUEST_TIME.time(): 

533 pass # Logic to be timed 

534 """ 

535 _type = 'summary' 

536 _reserved_labelnames = ['quantile'] 

537 

538 def _metric_init(self) -> None: 

539 self._count = values.ValueClass(self._type, self._name, self._name + '_count', self._labelnames, 

540 self._labelvalues, self._documentation) 

541 self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation) 

542 self._created = time.time() 

543 

544 def observe(self, amount: float) -> None: 

545 """Observe the given amount. 

546 

547 The amount is usually positive or zero. Negative values are 

548 accepted but prevent current versions of Prometheus from 

549 properly detecting counter resets in the sum of 

550 observations. See 

551 https://prometheus.io/docs/practices/histograms/#count-and-sum-of-observations 

552 for details. 

553 """ 

554 self._raise_if_not_observable() 

555 self._count.inc(1) 

556 self._sum.inc(amount) 

557 

558 def time(self) -> Timer: 

559 """Time a block of code or function, and observe the duration in seconds. 

560 

561 Can be used as a function decorator or context manager. 

562 """ 

563 return Timer(self, 'observe') 

564 

565 def _child_samples(self) -> Iterable[Sample]: 

566 samples = [ 

567 Sample('_count', {}, self._count.get(), None, None), 

568 Sample('_sum', {}, self._sum.get(), None, None), 

569 ] 

570 if _use_created: 

571 samples.append(Sample('_created', {}, self._created, None, None)) 

572 return tuple(samples) 

573 

574 

575class Histogram(MetricWrapperBase): 

576 """A Histogram tracks the size and number of events in buckets. 

577 

578 You can use Histograms for aggregatable calculation of quantiles. 

579 

580 Example use cases: 

581 - Response latency 

582 - Request size 

583 

584 Example for a Histogram: 

585 

586 from prometheus_client import Histogram 

587 

588 h = Histogram('request_size_bytes', 'Request size (bytes)') 

589 h.observe(512) # Observe 512 (bytes) 

590 

591 Example for a Histogram using time: 

592 

593 from prometheus_client import Histogram 

594 

595 REQUEST_TIME = Histogram('response_latency_seconds', 'Response latency (seconds)') 

596 

597 @REQUEST_TIME.time() 

598 def create_response(request): 

599 '''A dummy function''' 

600 time.sleep(1) 

601 

602 Example of using the same Histogram object as a context manager: 

603 

604 with REQUEST_TIME.time(): 

605 pass # Logic to be timed 

606 

607 The default buckets are intended to cover a typical web/rpc request from milliseconds to seconds. 

608 They can be overridden by passing `buckets` keyword argument to `Histogram`. 

609 """ 

610 _type = 'histogram' 

611 _reserved_labelnames = ['le'] 

612 DEFAULT_BUCKETS = (.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, INF) 

613 

614 def __init__(self, 

615 name: str, 

616 documentation: str, 

617 labelnames: Iterable[str] = (), 

618 namespace: str = '', 

619 subsystem: str = '', 

620 unit: str = '', 

621 registry: Optional[CollectorRegistry] = REGISTRY, 

622 _labelvalues: Optional[Sequence[str]] = None, 

623 buckets: Sequence[Union[float, str]] = DEFAULT_BUCKETS, 

624 ): 

625 self._prepare_buckets(buckets) 

626 super().__init__( 

627 name=name, 

628 documentation=documentation, 

629 labelnames=labelnames, 

630 namespace=namespace, 

631 subsystem=subsystem, 

632 unit=unit, 

633 registry=registry, 

634 _labelvalues=_labelvalues, 

635 ) 

636 self._kwargs['buckets'] = buckets 

637 

638 def _prepare_buckets(self, source_buckets: Sequence[Union[float, str]]) -> None: 

639 buckets = [float(b) for b in source_buckets] 

640 if buckets != sorted(buckets): 

641 # This is probably an error on the part of the user, 

642 # so raise rather than sorting for them. 

643 raise ValueError('Buckets not in sorted order') 

644 if buckets and buckets[-1] != INF: 

645 buckets.append(INF) 

646 if len(buckets) < 2: 

647 raise ValueError('Must have at least two buckets') 

648 self._upper_bounds = buckets 

649 

650 def _metric_init(self) -> None: 

651 self._buckets: List[values.ValueClass] = [] 

652 self._created = time.time() 

653 bucket_labelnames = self._labelnames + ('le',) 

654 self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation) 

655 for b in self._upper_bounds: 

656 self._buckets.append(values.ValueClass( 

657 self._type, 

658 self._name, 

659 self._name + '_bucket', 

660 bucket_labelnames, 

661 self._labelvalues + (floatToGoString(b),), 

662 self._documentation) 

663 ) 

664 

665 def observe(self, amount: float, exemplar: Optional[Dict[str, str]] = None) -> None: 

666 """Observe the given amount. 

667 

668 The amount is usually positive or zero. Negative values are 

669 accepted but prevent current versions of Prometheus from 

670 properly detecting counter resets in the sum of 

671 observations. See 

672 https://prometheus.io/docs/practices/histograms/#count-and-sum-of-observations 

673 for details. 

674 """ 

675 self._raise_if_not_observable() 

676 self._sum.inc(amount) 

677 for i, bound in enumerate(self._upper_bounds): 

678 if amount <= bound: 

679 self._buckets[i].inc(1) 

680 if exemplar: 

681 _validate_exemplar(exemplar) 

682 self._buckets[i].set_exemplar(Exemplar(exemplar, amount, time.time())) 

683 break 

684 

685 def time(self) -> Timer: 

686 """Time a block of code or function, and observe the duration in seconds. 

687 

688 Can be used as a function decorator or context manager. 

689 """ 

690 return Timer(self, 'observe') 

691 

692 def _child_samples(self) -> Iterable[Sample]: 

693 samples = [] 

694 acc = 0.0 

695 for i, bound in enumerate(self._upper_bounds): 

696 acc += self._buckets[i].get() 

697 samples.append(Sample('_bucket', {'le': floatToGoString(bound)}, acc, None, self._buckets[i].get_exemplar())) 

698 samples.append(Sample('_count', {}, acc, None, None)) 

699 if self._upper_bounds[0] >= 0: 

700 samples.append(Sample('_sum', {}, self._sum.get(), None, None)) 

701 if _use_created: 

702 samples.append(Sample('_created', {}, self._created, None, None)) 

703 return tuple(samples) 

704 

705 

706class Info(MetricWrapperBase): 

707 """Info metric, key-value pairs. 

708 

709 Examples of Info include: 

710 - Build information 

711 - Version information 

712 - Potential target metadata 

713 

714 Example usage: 

715 from prometheus_client import Info 

716 

717 i = Info('my_build', 'Description of info') 

718 i.info({'version': '1.2.3', 'buildhost': 'foo@bar'}) 

719 

720 Info metrics do not work in multiprocess mode. 

721 """ 

722 _type = 'info' 

723 

724 def _metric_init(self): 

725 self._labelname_set = set(self._labelnames) 

726 self._lock = Lock() 

727 self._value = {} 

728 

729 def info(self, val: Dict[str, str]) -> None: 

730 """Set info metric.""" 

731 if self._labelname_set.intersection(val.keys()): 

732 raise ValueError('Overlapping labels for Info metric, metric: {} child: {}'.format( 

733 self._labelnames, val)) 

734 if any(i is None for i in val.values()): 

735 raise ValueError('Label value cannot be None') 

736 with self._lock: 

737 self._value = dict(val) 

738 

739 def _child_samples(self) -> Iterable[Sample]: 

740 with self._lock: 

741 return (Sample('_info', self._value, 1.0, None, None),) 

742 

743 

744class Enum(MetricWrapperBase): 

745 """Enum metric, which of a set of states is true. 

746 

747 Example usage: 

748 from prometheus_client import Enum 

749 

750 e = Enum('task_state', 'Description of enum', 

751 states=['starting', 'running', 'stopped']) 

752 e.state('running') 

753 

754 The first listed state will be the default. 

755 Enum metrics do not work in multiprocess mode. 

756 """ 

757 _type = 'stateset' 

758 

759 def __init__(self, 

760 name: str, 

761 documentation: str, 

762 labelnames: Sequence[str] = (), 

763 namespace: str = '', 

764 subsystem: str = '', 

765 unit: str = '', 

766 registry: Optional[CollectorRegistry] = REGISTRY, 

767 _labelvalues: Optional[Sequence[str]] = None, 

768 states: Optional[Sequence[str]] = None, 

769 ): 

770 super().__init__( 

771 name=name, 

772 documentation=documentation, 

773 labelnames=labelnames, 

774 namespace=namespace, 

775 subsystem=subsystem, 

776 unit=unit, 

777 registry=registry, 

778 _labelvalues=_labelvalues, 

779 ) 

780 if name in labelnames: 

781 raise ValueError(f'Overlapping labels for Enum metric: {name}') 

782 if not states: 

783 raise ValueError(f'No states provided for Enum metric: {name}') 

784 self._kwargs['states'] = self._states = states 

785 

786 def _metric_init(self) -> None: 

787 self._value = 0 

788 self._lock = Lock() 

789 

790 def state(self, state: str) -> None: 

791 """Set enum metric state.""" 

792 self._raise_if_not_observable() 

793 with self._lock: 

794 self._value = self._states.index(state) 

795 

796 def _child_samples(self) -> Iterable[Sample]: 

797 with self._lock: 

798 return [ 

799 Sample('', {self._name: s}, 1 if i == self._value else 0, None, None) 

800 for i, s 

801 in enumerate(self._states) 

802 ]