Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prometheus_client/metrics.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

291 statements  

1import os 

2from threading import Lock 

3import time 

4import types 

5from typing import ( 

6 Any, Callable, Dict, Iterable, List, Literal, Optional, Sequence, Tuple, 

7 Type, TypeVar, Union, 

8) 

9import warnings 

10 

11from . import values # retain this import style for testability 

12from .context_managers import ExceptionCounter, InprogressTracker, Timer 

13from .metrics_core import Metric 

14from .registry import Collector, CollectorRegistry, REGISTRY 

15from .samples import Exemplar, Sample 

16from .utils import floatToGoString, INF 

17from .validation import ( 

18 _validate_exemplar, _validate_labelnames, _validate_metric_name, 

19) 

20 

21T = TypeVar('T', bound='MetricWrapperBase') 

22F = TypeVar("F", bound=Callable[..., Any]) 

23 

24 

25def _build_full_name(metric_type, name, namespace, subsystem, unit): 

26 if not name: 

27 raise ValueError('Metric name should not be empty') 

28 full_name = '' 

29 if namespace: 

30 full_name += namespace + '_' 

31 if subsystem: 

32 full_name += subsystem + '_' 

33 full_name += name 

34 if metric_type == 'counter' and full_name.endswith('_total'): 

35 full_name = full_name[:-6] # Munge to OpenMetrics. 

36 if unit and not full_name.endswith("_" + unit): 

37 full_name += "_" + unit 

38 if unit and metric_type in ('info', 'stateset'): 

39 raise ValueError('Metric name is of a type that cannot have a unit: ' + full_name) 

40 return full_name 

41 

42 

43 

44def _get_use_created() -> bool: 

45 return os.environ.get("PROMETHEUS_DISABLE_CREATED_SERIES", 'False').lower() not in ('true', '1', 't') 

46 

47 

48_use_created = _get_use_created() 

49 

50 

51def disable_created_metrics(): 

52 """Disable exporting _created metrics on counters, histograms, and summaries.""" 

53 global _use_created 

54 _use_created = False 

55 

56 

57def enable_created_metrics(): 

58 """Enable exporting _created metrics on counters, histograms, and summaries.""" 

59 global _use_created 

60 _use_created = True 

61 

62 

63class MetricWrapperBase(Collector): 

64 _type: Optional[str] = None 

65 _reserved_labelnames: Sequence[str] = () 

66 

67 def _is_observable(self): 

68 # Whether this metric is observable, i.e. 

69 # * a metric without label names and values, or 

70 # * the child of a labelled metric. 

71 return not self._labelnames or (self._labelnames and self._labelvalues) 

72 

73 def _raise_if_not_observable(self): 

74 # Functions that mutate the state of the metric, for example incrementing 

75 # a counter, will fail if the metric is not observable, because only if a 

76 # metric is observable will the value be initialized. 

77 if not self._is_observable(): 

78 raise ValueError('%s metric is missing label values' % str(self._type)) 

79 

80 def _is_parent(self): 

81 return self._labelnames and not self._labelvalues 

82 

83 def _get_metric(self): 

84 return Metric(self._name, self._documentation, self._type, self._unit) 

85 

86 def describe(self) -> Iterable[Metric]: 

87 return [self._get_metric()] 

88 

89 def collect(self) -> Iterable[Metric]: 

90 metric = self._get_metric() 

91 for suffix, labels, value, timestamp, exemplar, native_histogram_value in self._samples(): 

92 metric.add_sample(self._name + suffix, labels, value, timestamp, exemplar, native_histogram_value) 

93 return [metric] 

94 

95 def __str__(self) -> str: 

96 return f"{self._type}:{self._name}" 

97 

98 def __repr__(self) -> str: 

99 metric_type = type(self) 

100 return f"{metric_type.__module__}.{metric_type.__name__}({self._name})" 

101 

102 def __init__(self: T, 

103 name: str, 

104 documentation: str, 

105 labelnames: Iterable[str] = (), 

106 namespace: str = '', 

107 subsystem: str = '', 

108 unit: str = '', 

109 registry: Optional[CollectorRegistry] = REGISTRY, 

110 _labelvalues: Optional[Sequence[str]] = None, 

111 ) -> None: 

112 self._name = _build_full_name(self._type, name, namespace, subsystem, unit) 

113 self._labelnames = _validate_labelnames(self, labelnames) 

114 self._labelvalues = tuple(_labelvalues or ()) 

115 self._kwargs: Dict[str, Any] = {} 

116 self._documentation = documentation 

117 self._unit = unit 

118 

119 _validate_metric_name(self._name) 

120 

121 if self._is_parent(): 

122 # Prepare the fields needed for child metrics. 

123 self._lock = Lock() 

124 self._metrics: Dict[Sequence[str], T] = {} 

125 

126 if self._is_observable(): 

127 self._metric_init() 

128 

129 if not self._labelvalues: 

130 # Register the multi-wrapper parent metric, or if a label-less metric, the whole shebang. 

131 if registry: 

132 registry.register(self) 

133 

134 def labels(self: T, *labelvalues: Any, **labelkwargs: Any) -> T: 

135 """Return the child for the given labelset. 

136 

137 All metrics can have labels, allowing grouping of related time series. 

138 Taking a counter as an example: 

139 

140 from prometheus_client import Counter 

141 

142 c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint']) 

143 c.labels('get', '/').inc() 

144 c.labels('post', '/submit').inc() 

145 

146 Labels can also be provided as keyword arguments: 

147 

148 from prometheus_client import Counter 

149 

150 c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint']) 

151 c.labels(method='get', endpoint='/').inc() 

152 c.labels(method='post', endpoint='/submit').inc() 

153 

154 See the best practices on [naming](http://prometheus.io/docs/practices/naming/) 

155 and [labels](http://prometheus.io/docs/practices/instrumentation/#use-labels). 

156 """ 

157 if not self._labelnames: 

158 raise ValueError('No label names were set when constructing %s' % self) 

159 

160 if self._labelvalues: 

161 raise ValueError('{} already has labels set ({}); can not chain calls to .labels()'.format( 

162 self, 

163 dict(zip(self._labelnames, self._labelvalues)) 

164 )) 

165 

166 if labelvalues and labelkwargs: 

167 raise ValueError("Can't pass both *args and **kwargs") 

168 

169 if labelkwargs: 

170 if sorted(labelkwargs) != sorted(self._labelnames): 

171 raise ValueError('Incorrect label names') 

172 labelvalues = tuple(str(labelkwargs[l]) for l in self._labelnames) 

173 else: 

174 if len(labelvalues) != len(self._labelnames): 

175 raise ValueError('Incorrect label count') 

176 labelvalues = tuple(str(l) for l in labelvalues) 

177 with self._lock: 

178 if labelvalues not in self._metrics: 

179 self._metrics[labelvalues] = self.__class__( 

180 self._name, 

181 documentation=self._documentation, 

182 labelnames=self._labelnames, 

183 unit=self._unit, 

184 _labelvalues=labelvalues, 

185 **self._kwargs 

186 ) 

187 return self._metrics[labelvalues] 

188 

189 def remove(self, *labelvalues: Any) -> None: 

190 if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ: 

191 warnings.warn( 

192 "Removal of labels has not been implemented in multi-process mode yet.", 

193 UserWarning) 

194 

195 if not self._labelnames: 

196 raise ValueError('No label names were set when constructing %s' % self) 

197 

198 """Remove the given labelset from the metric.""" 

199 if len(labelvalues) != len(self._labelnames): 

200 raise ValueError('Incorrect label count (expected %d, got %s)' % (len(self._labelnames), labelvalues)) 

201 labelvalues = tuple(str(l) for l in labelvalues) 

202 with self._lock: 

203 if labelvalues in self._metrics: 

204 del self._metrics[labelvalues] 

205 

206 def clear(self) -> None: 

207 """Remove all labelsets from the metric""" 

208 if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ: 

209 warnings.warn( 

210 "Clearing labels has not been implemented in multi-process mode yet", 

211 UserWarning) 

212 with self._lock: 

213 self._metrics = {} 

214 

215 def _samples(self) -> Iterable[Sample]: 

216 if self._is_parent(): 

217 return self._multi_samples() 

218 else: 

219 return self._child_samples() 

220 

221 def _multi_samples(self) -> Iterable[Sample]: 

222 with self._lock: 

223 metrics = self._metrics.copy() 

224 for labels, metric in metrics.items(): 

225 series_labels = list(zip(self._labelnames, labels)) 

226 for suffix, sample_labels, value, timestamp, exemplar, native_histogram_value in metric._samples(): 

227 yield Sample(suffix, dict(series_labels + list(sample_labels.items())), value, timestamp, exemplar, native_histogram_value) 

228 

229 def _child_samples(self) -> Iterable[Sample]: # pragma: no cover 

230 raise NotImplementedError('_child_samples() must be implemented by %r' % self) 

231 

232 def _metric_init(self): # pragma: no cover 

233 """ 

234 Initialize the metric object as a child, i.e. when it has labels (if any) set. 

235 

236 This is factored as a separate function to allow for deferred initialization. 

237 """ 

238 raise NotImplementedError('_metric_init() must be implemented by %r' % self) 

239 

240 

241class Counter(MetricWrapperBase): 

242 """A Counter tracks counts of events or running totals. 

243 

244 Example use cases for Counters: 

245 - Number of requests processed 

246 - Number of items that were inserted into a queue 

247 - Total amount of data that a system has processed 

248 

249 Counters can only go up (and be reset when the process restarts). If your use case can go down, 

250 you should use a Gauge instead. 

251 

252 An example for a Counter: 

253 

254 from prometheus_client import Counter 

255 

256 c = Counter('my_failures_total', 'Description of counter') 

257 c.inc() # Increment by 1 

258 c.inc(1.6) # Increment by given value 

259 

260 There are utilities to count exceptions raised: 

261 

262 @c.count_exceptions() 

263 def f(): 

264 pass 

265 

266 with c.count_exceptions(): 

267 pass 

268 

269 # Count only one type of exception 

270 with c.count_exceptions(ValueError): 

271 pass 

272  

273 You can also reset the counter to zero in case your logical "process" restarts 

274 without restarting the actual python process. 

275 

276 c.reset() 

277 

278 """ 

279 _type = 'counter' 

280 

281 def _metric_init(self) -> None: 

282 self._value = values.ValueClass(self._type, self._name, self._name + '_total', self._labelnames, 

283 self._labelvalues, self._documentation) 

284 self._created = time.time() 

285 

286 def inc(self, amount: float = 1, exemplar: Optional[Dict[str, str]] = None) -> None: 

287 """Increment counter by the given amount.""" 

288 self._raise_if_not_observable() 

289 if amount < 0: 

290 raise ValueError('Counters can only be incremented by non-negative amounts.') 

291 self._value.inc(amount) 

292 if exemplar: 

293 _validate_exemplar(exemplar) 

294 self._value.set_exemplar(Exemplar(exemplar, amount, time.time())) 

295 

296 def reset(self) -> None: 

297 """Reset the counter to zero. Use this when a logical process restarts without restarting the actual python process.""" 

298 self._value.set(0) 

299 self._created = time.time() 

300 

301 def count_exceptions(self, exception: Union[Type[BaseException], Tuple[Type[BaseException], ...]] = Exception) -> ExceptionCounter: 

302 """Count exceptions in a block of code or function. 

303 

304 Can be used as a function decorator or context manager. 

305 Increments the counter when an exception of the given 

306 type is raised up out of the code. 

307 """ 

308 self._raise_if_not_observable() 

309 return ExceptionCounter(self, exception) 

310 

311 def _child_samples(self) -> Iterable[Sample]: 

312 sample = Sample('_total', {}, self._value.get(), None, self._value.get_exemplar()) 

313 if _use_created: 

314 return ( 

315 sample, 

316 Sample('_created', {}, self._created, None, None) 

317 ) 

318 return (sample,) 

319 

320 

321class Gauge(MetricWrapperBase): 

322 """Gauge metric, to report instantaneous values. 

323 

324 Examples of Gauges include: 

325 - Inprogress requests 

326 - Number of items in a queue 

327 - Free memory 

328 - Total memory 

329 - Temperature 

330 

331 Gauges can go both up and down. 

332 

333 from prometheus_client import Gauge 

334 

335 g = Gauge('my_inprogress_requests', 'Description of gauge') 

336 g.inc() # Increment by 1 

337 g.dec(10) # Decrement by given value 

338 g.set(4.2) # Set to a given value 

339 

340 There are utilities for common use cases: 

341 

342 g.set_to_current_time() # Set to current unixtime 

343 

344 # Increment when entered, decrement when exited. 

345 @g.track_inprogress() 

346 def f(): 

347 pass 

348 

349 with g.track_inprogress(): 

350 pass 

351 

352 A Gauge can also take its value from a callback: 

353 

354 d = Gauge('data_objects', 'Number of objects') 

355 my_dict = {} 

356 d.set_function(lambda: len(my_dict)) 

357 """ 

358 _type = 'gauge' 

359 _MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent')) 

360 _MOST_RECENT_MODES = frozenset(('mostrecent', 'livemostrecent')) 

361 

362 def __init__(self, 

363 name: str, 

364 documentation: str, 

365 labelnames: Iterable[str] = (), 

366 namespace: str = '', 

367 subsystem: str = '', 

368 unit: str = '', 

369 registry: Optional[CollectorRegistry] = REGISTRY, 

370 _labelvalues: Optional[Sequence[str]] = None, 

371 multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'] = 'all', 

372 ): 

373 self._multiprocess_mode = multiprocess_mode 

374 if multiprocess_mode not in self._MULTIPROC_MODES: 

375 raise ValueError('Invalid multiprocess mode: ' + multiprocess_mode) 

376 super().__init__( 

377 name=name, 

378 documentation=documentation, 

379 labelnames=labelnames, 

380 namespace=namespace, 

381 subsystem=subsystem, 

382 unit=unit, 

383 registry=registry, 

384 _labelvalues=_labelvalues, 

385 ) 

386 self._kwargs['multiprocess_mode'] = self._multiprocess_mode 

387 self._is_most_recent = self._multiprocess_mode in self._MOST_RECENT_MODES 

388 

389 def _metric_init(self) -> None: 

390 self._value = values.ValueClass( 

391 self._type, self._name, self._name, self._labelnames, self._labelvalues, 

392 self._documentation, multiprocess_mode=self._multiprocess_mode 

393 ) 

394 

395 def inc(self, amount: float = 1) -> None: 

396 """Increment gauge by the given amount.""" 

397 if self._is_most_recent: 

398 raise RuntimeError("inc must not be used with the mostrecent mode") 

399 self._raise_if_not_observable() 

400 self._value.inc(amount) 

401 

402 def dec(self, amount: float = 1) -> None: 

403 """Decrement gauge by the given amount.""" 

404 if self._is_most_recent: 

405 raise RuntimeError("dec must not be used with the mostrecent mode") 

406 self._raise_if_not_observable() 

407 self._value.inc(-amount) 

408 

409 def set(self, value: float) -> None: 

410 """Set gauge to the given value.""" 

411 self._raise_if_not_observable() 

412 if self._is_most_recent: 

413 self._value.set(float(value), timestamp=time.time()) 

414 else: 

415 self._value.set(float(value)) 

416 

417 def set_to_current_time(self) -> None: 

418 """Set gauge to the current unixtime.""" 

419 self.set(time.time()) 

420 

421 def track_inprogress(self) -> InprogressTracker: 

422 """Track inprogress blocks of code or functions. 

423 

424 Can be used as a function decorator or context manager. 

425 Increments the gauge when the code is entered, 

426 and decrements when it is exited. 

427 """ 

428 self._raise_if_not_observable() 

429 return InprogressTracker(self) 

430 

431 def time(self) -> Timer: 

432 """Time a block of code or function, and set the duration in seconds. 

433 

434 Can be used as a function decorator or context manager. 

435 """ 

436 return Timer(self, 'set') 

437 

438 def set_function(self, f: Callable[[], float]) -> None: 

439 """Call the provided function to return the Gauge value. 

440 

441 The function must return a float, and may be called from 

442 multiple threads. All other methods of the Gauge become NOOPs. 

443 """ 

444 

445 self._raise_if_not_observable() 

446 

447 def samples(_: Gauge) -> Iterable[Sample]: 

448 return (Sample('', {}, float(f()), None, None),) 

449 

450 self._child_samples = types.MethodType(samples, self) # type: ignore 

451 

452 def _child_samples(self) -> Iterable[Sample]: 

453 return (Sample('', {}, self._value.get(), None, None),) 

454 

455 

456class Summary(MetricWrapperBase): 

457 """A Summary tracks the size and number of events. 

458 

459 Example use cases for Summaries: 

460 - Response latency 

461 - Request size 

462 

463 Example for a Summary: 

464 

465 from prometheus_client import Summary 

466 

467 s = Summary('request_size_bytes', 'Request size (bytes)') 

468 s.observe(512) # Observe 512 (bytes) 

469 

470 Example for a Summary using time: 

471 

472 from prometheus_client import Summary 

473 

474 REQUEST_TIME = Summary('response_latency_seconds', 'Response latency (seconds)') 

475 

476 @REQUEST_TIME.time() 

477 def create_response(request): 

478 '''A dummy function''' 

479 time.sleep(1) 

480 

481 Example for using the same Summary object as a context manager: 

482 

483 with REQUEST_TIME.time(): 

484 pass # Logic to be timed 

485 """ 

486 _type = 'summary' 

487 _reserved_labelnames = ['quantile'] 

488 

489 def _metric_init(self) -> None: 

490 self._count = values.ValueClass(self._type, self._name, self._name + '_count', self._labelnames, 

491 self._labelvalues, self._documentation) 

492 self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation) 

493 self._created = time.time() 

494 

495 def observe(self, amount: float) -> None: 

496 """Observe the given amount. 

497 

498 The amount is usually positive or zero. Negative values are 

499 accepted but prevent current versions of Prometheus from 

500 properly detecting counter resets in the sum of 

501 observations. See 

502 https://prometheus.io/docs/practices/histograms/#count-and-sum-of-observations 

503 for details. 

504 """ 

505 self._raise_if_not_observable() 

506 self._count.inc(1) 

507 self._sum.inc(amount) 

508 

509 def time(self) -> Timer: 

510 """Time a block of code or function, and observe the duration in seconds. 

511 

512 Can be used as a function decorator or context manager. 

513 """ 

514 return Timer(self, 'observe') 

515 

516 def _child_samples(self) -> Iterable[Sample]: 

517 samples = [ 

518 Sample('_count', {}, self._count.get(), None, None), 

519 Sample('_sum', {}, self._sum.get(), None, None), 

520 ] 

521 if _use_created: 

522 samples.append(Sample('_created', {}, self._created, None, None)) 

523 return tuple(samples) 

524 

525 

526class Histogram(MetricWrapperBase): 

527 """A Histogram tracks the size and number of events in buckets. 

528 

529 You can use Histograms for aggregatable calculation of quantiles. 

530 

531 Example use cases: 

532 - Response latency 

533 - Request size 

534 

535 Example for a Histogram: 

536 

537 from prometheus_client import Histogram 

538 

539 h = Histogram('request_size_bytes', 'Request size (bytes)') 

540 h.observe(512) # Observe 512 (bytes) 

541 

542 Example for a Histogram using time: 

543 

544 from prometheus_client import Histogram 

545 

546 REQUEST_TIME = Histogram('response_latency_seconds', 'Response latency (seconds)') 

547 

548 @REQUEST_TIME.time() 

549 def create_response(request): 

550 '''A dummy function''' 

551 time.sleep(1) 

552 

553 Example of using the same Histogram object as a context manager: 

554 

555 with REQUEST_TIME.time(): 

556 pass # Logic to be timed 

557 

558 The default buckets are intended to cover a typical web/rpc request from milliseconds to seconds. 

559 They can be overridden by passing `buckets` keyword argument to `Histogram`. 

560 """ 

561 _type = 'histogram' 

562 _reserved_labelnames = ['le'] 

563 DEFAULT_BUCKETS = (.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, INF) 

564 

565 def __init__(self, 

566 name: str, 

567 documentation: str, 

568 labelnames: Iterable[str] = (), 

569 namespace: str = '', 

570 subsystem: str = '', 

571 unit: str = '', 

572 registry: Optional[CollectorRegistry] = REGISTRY, 

573 _labelvalues: Optional[Sequence[str]] = None, 

574 buckets: Sequence[Union[float, str]] = DEFAULT_BUCKETS, 

575 ): 

576 self._prepare_buckets(buckets) 

577 super().__init__( 

578 name=name, 

579 documentation=documentation, 

580 labelnames=labelnames, 

581 namespace=namespace, 

582 subsystem=subsystem, 

583 unit=unit, 

584 registry=registry, 

585 _labelvalues=_labelvalues, 

586 ) 

587 self._kwargs['buckets'] = buckets 

588 

589 def _prepare_buckets(self, source_buckets: Sequence[Union[float, str]]) -> None: 

590 buckets = [float(b) for b in source_buckets] 

591 if buckets != sorted(buckets): 

592 # This is probably an error on the part of the user, 

593 # so raise rather than sorting for them. 

594 raise ValueError('Buckets not in sorted order') 

595 if buckets and buckets[-1] != INF: 

596 buckets.append(INF) 

597 if len(buckets) < 2: 

598 raise ValueError('Must have at least two buckets') 

599 self._upper_bounds = buckets 

600 

601 def _metric_init(self) -> None: 

602 self._buckets: List[values.ValueClass] = [] 

603 self._created = time.time() 

604 bucket_labelnames = self._labelnames + ('le',) 

605 self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation) 

606 for b in self._upper_bounds: 

607 self._buckets.append(values.ValueClass( 

608 self._type, 

609 self._name, 

610 self._name + '_bucket', 

611 bucket_labelnames, 

612 self._labelvalues + (floatToGoString(b),), 

613 self._documentation) 

614 ) 

615 

616 def observe(self, amount: float, exemplar: Optional[Dict[str, str]] = None) -> None: 

617 """Observe the given amount. 

618 

619 The amount is usually positive or zero. Negative values are 

620 accepted but prevent current versions of Prometheus from 

621 properly detecting counter resets in the sum of 

622 observations. See 

623 https://prometheus.io/docs/practices/histograms/#count-and-sum-of-observations 

624 for details. 

625 """ 

626 self._raise_if_not_observable() 

627 self._sum.inc(amount) 

628 for i, bound in enumerate(self._upper_bounds): 

629 if amount <= bound: 

630 self._buckets[i].inc(1) 

631 if exemplar: 

632 _validate_exemplar(exemplar) 

633 self._buckets[i].set_exemplar(Exemplar(exemplar, amount, time.time())) 

634 break 

635 

636 def time(self) -> Timer: 

637 """Time a block of code or function, and observe the duration in seconds. 

638 

639 Can be used as a function decorator or context manager. 

640 """ 

641 return Timer(self, 'observe') 

642 

643 def _child_samples(self) -> Iterable[Sample]: 

644 samples = [] 

645 acc = 0.0 

646 for i, bound in enumerate(self._upper_bounds): 

647 acc += self._buckets[i].get() 

648 samples.append(Sample('_bucket', {'le': floatToGoString(bound)}, acc, None, self._buckets[i].get_exemplar())) 

649 samples.append(Sample('_count', {}, acc, None, None)) 

650 if self._upper_bounds[0] >= 0: 

651 samples.append(Sample('_sum', {}, self._sum.get(), None, None)) 

652 if _use_created: 

653 samples.append(Sample('_created', {}, self._created, None, None)) 

654 return tuple(samples) 

655 

656 

657class Info(MetricWrapperBase): 

658 """Info metric, key-value pairs. 

659 

660 Examples of Info include: 

661 - Build information 

662 - Version information 

663 - Potential target metadata 

664 

665 Example usage: 

666 from prometheus_client import Info 

667 

668 i = Info('my_build', 'Description of info') 

669 i.info({'version': '1.2.3', 'buildhost': 'foo@bar'}) 

670 

671 Info metrics do not work in multiprocess mode. 

672 """ 

673 _type = 'info' 

674 

675 def _metric_init(self): 

676 self._labelname_set = set(self._labelnames) 

677 self._lock = Lock() 

678 self._value = {} 

679 

680 def info(self, val: Dict[str, str]) -> None: 

681 """Set info metric.""" 

682 if self._labelname_set.intersection(val.keys()): 

683 raise ValueError('Overlapping labels for Info metric, metric: {} child: {}'.format( 

684 self._labelnames, val)) 

685 if any(i is None for i in val.values()): 

686 raise ValueError('Label value cannot be None') 

687 with self._lock: 

688 self._value = dict(val) 

689 

690 def _child_samples(self) -> Iterable[Sample]: 

691 with self._lock: 

692 return (Sample('_info', self._value, 1.0, None, None),) 

693 

694 

695class Enum(MetricWrapperBase): 

696 """Enum metric, which of a set of states is true. 

697 

698 Example usage: 

699 from prometheus_client import Enum 

700 

701 e = Enum('task_state', 'Description of enum', 

702 states=['starting', 'running', 'stopped']) 

703 e.state('running') 

704 

705 The first listed state will be the default. 

706 Enum metrics do not work in multiprocess mode. 

707 """ 

708 _type = 'stateset' 

709 

710 def __init__(self, 

711 name: str, 

712 documentation: str, 

713 labelnames: Sequence[str] = (), 

714 namespace: str = '', 

715 subsystem: str = '', 

716 unit: str = '', 

717 registry: Optional[CollectorRegistry] = REGISTRY, 

718 _labelvalues: Optional[Sequence[str]] = None, 

719 states: Optional[Sequence[str]] = None, 

720 ): 

721 super().__init__( 

722 name=name, 

723 documentation=documentation, 

724 labelnames=labelnames, 

725 namespace=namespace, 

726 subsystem=subsystem, 

727 unit=unit, 

728 registry=registry, 

729 _labelvalues=_labelvalues, 

730 ) 

731 if name in labelnames: 

732 raise ValueError(f'Overlapping labels for Enum metric: {name}') 

733 if not states: 

734 raise ValueError(f'No states provided for Enum metric: {name}') 

735 self._kwargs['states'] = self._states = states 

736 

737 def _metric_init(self) -> None: 

738 self._value = 0 

739 self._lock = Lock() 

740 

741 def state(self, state: str) -> None: 

742 """Set enum metric state.""" 

743 self._raise_if_not_observable() 

744 with self._lock: 

745 self._value = self._states.index(state) 

746 

747 def _child_samples(self) -> Iterable[Sample]: 

748 with self._lock: 

749 return [ 

750 Sample('', {self._name: s}, 1 if i == self._value else 0, None, None) 

751 for i, s 

752 in enumerate(self._states) 

753 ]