1import os
2from threading import Lock
3import time
4import types
5from typing import (
6 Any, Callable, Dict, Iterable, List, Literal, Optional, Sequence, Tuple,
7 Type, TypeVar, Union,
8)
9import warnings
10
11from . import values # retain this import style for testability
12from .context_managers import ExceptionCounter, InprogressTracker, Timer
13from .metrics_core import Metric
14from .registry import Collector, CollectorRegistry, REGISTRY
15from .samples import Exemplar, Sample
16from .utils import floatToGoString, INF
17from .validation import (
18 _validate_exemplar, _validate_labelnames, _validate_metric_name,
19)
20
21T = TypeVar('T', bound='MetricWrapperBase')
22F = TypeVar("F", bound=Callable[..., Any])
23
24
25def _build_full_name(metric_type, name, namespace, subsystem, unit):
26 if not name:
27 raise ValueError('Metric name should not be empty')
28 full_name = ''
29 if namespace:
30 full_name += namespace + '_'
31 if subsystem:
32 full_name += subsystem + '_'
33 full_name += name
34 if metric_type == 'counter' and full_name.endswith('_total'):
35 full_name = full_name[:-6] # Munge to OpenMetrics.
36 if unit and not full_name.endswith("_" + unit):
37 full_name += "_" + unit
38 if unit and metric_type in ('info', 'stateset'):
39 raise ValueError('Metric name is of a type that cannot have a unit: ' + full_name)
40 return full_name
41
42
43
44def _get_use_created() -> bool:
45 return os.environ.get("PROMETHEUS_DISABLE_CREATED_SERIES", 'False').lower() not in ('true', '1', 't')
46
47
48_use_created = _get_use_created()
49
50
51def disable_created_metrics():
52 """Disable exporting _created metrics on counters, histograms, and summaries."""
53 global _use_created
54 _use_created = False
55
56
57def enable_created_metrics():
58 """Enable exporting _created metrics on counters, histograms, and summaries."""
59 global _use_created
60 _use_created = True
61
62
63class MetricWrapperBase(Collector):
64 _type: Optional[str] = None
65 _reserved_labelnames: Sequence[str] = ()
66
67 def _is_observable(self):
68 # Whether this metric is observable, i.e.
69 # * a metric without label names and values, or
70 # * the child of a labelled metric.
71 return not self._labelnames or (self._labelnames and self._labelvalues)
72
73 def _raise_if_not_observable(self):
74 # Functions that mutate the state of the metric, for example incrementing
75 # a counter, will fail if the metric is not observable, because only if a
76 # metric is observable will the value be initialized.
77 if not self._is_observable():
78 raise ValueError('%s metric is missing label values' % str(self._type))
79
80 def _is_parent(self):
81 return self._labelnames and not self._labelvalues
82
83 def _get_metric(self):
84 return Metric(self._name, self._documentation, self._type, self._unit)
85
86 def describe(self) -> Iterable[Metric]:
87 return [self._get_metric()]
88
89 def collect(self) -> Iterable[Metric]:
90 metric = self._get_metric()
91 for suffix, labels, value, timestamp, exemplar, native_histogram_value in self._samples():
92 metric.add_sample(self._name + suffix, labels, value, timestamp, exemplar, native_histogram_value)
93 return [metric]
94
95 def __str__(self) -> str:
96 return f"{self._type}:{self._name}"
97
98 def __repr__(self) -> str:
99 metric_type = type(self)
100 return f"{metric_type.__module__}.{metric_type.__name__}({self._name})"
101
102 def __init__(self: T,
103 name: str,
104 documentation: str,
105 labelnames: Iterable[str] = (),
106 namespace: str = '',
107 subsystem: str = '',
108 unit: str = '',
109 registry: Optional[CollectorRegistry] = REGISTRY,
110 _labelvalues: Optional[Sequence[str]] = None,
111 ) -> None:
112
113 self._original_name = name
114 self._namespace = namespace
115 self._subsystem = subsystem
116 self._name = _build_full_name(self._type, name, namespace, subsystem, unit)
117 self._labelnames = _validate_labelnames(self, labelnames)
118 self._labelvalues = tuple(_labelvalues or ())
119 self._kwargs: Dict[str, Any] = {}
120 self._documentation = documentation
121 self._unit = unit
122
123 _validate_metric_name(self._name)
124
125 if self._is_parent():
126 # Prepare the fields needed for child metrics.
127 self._lock = Lock()
128 self._metrics: Dict[Sequence[str], T] = {}
129
130 if self._is_observable():
131 self._metric_init()
132
133 if not self._labelvalues:
134 # Register the multi-wrapper parent metric, or if a label-less metric, the whole shebang.
135 if registry:
136 registry.register(self)
137
138 def labels(self: T, *labelvalues: Any, **labelkwargs: Any) -> T:
139 """Return the child for the given labelset.
140
141 All metrics can have labels, allowing grouping of related time series.
142 Taking a counter as an example:
143
144 from prometheus_client import Counter
145
146 c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint'])
147 c.labels('get', '/').inc()
148 c.labels('post', '/submit').inc()
149
150 Labels can also be provided as keyword arguments:
151
152 from prometheus_client import Counter
153
154 c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint'])
155 c.labels(method='get', endpoint='/').inc()
156 c.labels(method='post', endpoint='/submit').inc()
157
158 See the best practices on [naming](http://prometheus.io/docs/practices/naming/)
159 and [labels](http://prometheus.io/docs/practices/instrumentation/#use-labels).
160 """
161 if not self._labelnames:
162 raise ValueError('No label names were set when constructing %s' % self)
163
164 if self._labelvalues:
165 raise ValueError('{} already has labels set ({}); can not chain calls to .labels()'.format(
166 self,
167 dict(zip(self._labelnames, self._labelvalues))
168 ))
169
170 if labelvalues and labelkwargs:
171 raise ValueError("Can't pass both *args and **kwargs")
172
173 if labelkwargs:
174 if sorted(labelkwargs) != sorted(self._labelnames):
175 raise ValueError('Incorrect label names')
176 labelvalues = tuple(str(labelkwargs[l]) for l in self._labelnames)
177 else:
178 if len(labelvalues) != len(self._labelnames):
179 raise ValueError('Incorrect label count')
180 labelvalues = tuple(str(l) for l in labelvalues)
181 with self._lock:
182 if labelvalues not in self._metrics:
183
184 original_name = getattr(self, '_original_name', self._name)
185 namespace = getattr(self, '_namespace', '')
186 subsystem = getattr(self, '_subsystem', '')
187 unit = getattr(self, '_unit', '')
188
189 child_kwargs = dict(self._kwargs) if self._kwargs else {}
190 for k in ('namespace', 'subsystem', 'unit'):
191 child_kwargs.pop(k, None)
192
193 self._metrics[labelvalues] = self.__class__(
194 original_name,
195 documentation=self._documentation,
196 labelnames=self._labelnames,
197 namespace=namespace,
198 subsystem=subsystem,
199 unit=unit,
200 _labelvalues=labelvalues,
201 **child_kwargs
202 )
203 return self._metrics[labelvalues]
204
205 def remove(self, *labelvalues: Any) -> None:
206 if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
207 warnings.warn(
208 "Removal of labels has not been implemented in multi-process mode yet.",
209 UserWarning)
210
211 if not self._labelnames:
212 raise ValueError('No label names were set when constructing %s' % self)
213
214 """Remove the given labelset from the metric."""
215 if len(labelvalues) != len(self._labelnames):
216 raise ValueError('Incorrect label count (expected %d, got %s)' % (len(self._labelnames), labelvalues))
217 labelvalues = tuple(str(l) for l in labelvalues)
218 with self._lock:
219 if labelvalues in self._metrics:
220 del self._metrics[labelvalues]
221
222 def remove_by_labels(self, labels: dict[str, str]) -> None:
223 """Remove all series whose labelset partially matches the given labels."""
224 if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
225 warnings.warn(
226 "Removal of labels has not been implemented in multi-process mode yet.",
227 UserWarning
228 )
229
230 if not self._labelnames:
231 raise ValueError('No label names were set when constructing %s' % self)
232
233 if not isinstance(labels, dict):
234 raise TypeError("labels must be a dict of {label_name: label_value}")
235
236 if not labels:
237 return # no operation
238
239 invalid = [k for k in labels.keys() if k not in self._labelnames]
240 if invalid:
241 raise ValueError(
242 'Unknown label names: %s; expected %s' % (invalid, self._labelnames)
243 )
244
245 pos_filter = {self._labelnames.index(k): str(v) for k, v in labels.items()}
246
247 with self._lock:
248 # list(...) to avoid "dictionary changed size during iteration"
249 for lv in list(self._metrics.keys()):
250 if all(lv[pos] == want for pos, want in pos_filter.items()):
251 # pop with default avoids KeyError if concurrently removed
252 self._metrics.pop(lv, None)
253
254
255 def clear(self) -> None:
256 """Remove all labelsets from the metric"""
257 if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
258 warnings.warn(
259 "Clearing labels has not been implemented in multi-process mode yet",
260 UserWarning)
261 with self._lock:
262 self._metrics = {}
263
264 def _samples(self) -> Iterable[Sample]:
265 if self._is_parent():
266 return self._multi_samples()
267 else:
268 return self._child_samples()
269
270 def _multi_samples(self) -> Iterable[Sample]:
271 with self._lock:
272 metrics = self._metrics.copy()
273 for labels, metric in metrics.items():
274 series_labels = list(zip(self._labelnames, labels))
275 for suffix, sample_labels, value, timestamp, exemplar, native_histogram_value in metric._samples():
276 yield Sample(suffix, dict(series_labels + list(sample_labels.items())), value, timestamp, exemplar, native_histogram_value)
277
278 def _child_samples(self) -> Iterable[Sample]: # pragma: no cover
279 raise NotImplementedError('_child_samples() must be implemented by %r' % self)
280
281 def _metric_init(self): # pragma: no cover
282 """
283 Initialize the metric object as a child, i.e. when it has labels (if any) set.
284
285 This is factored as a separate function to allow for deferred initialization.
286 """
287 raise NotImplementedError('_metric_init() must be implemented by %r' % self)
288
289
290class Counter(MetricWrapperBase):
291 """A Counter tracks counts of events or running totals.
292
293 Example use cases for Counters:
294 - Number of requests processed
295 - Number of items that were inserted into a queue
296 - Total amount of data that a system has processed
297
298 Counters can only go up (and be reset when the process restarts). If your use case can go down,
299 you should use a Gauge instead.
300
301 An example for a Counter:
302
303 from prometheus_client import Counter
304
305 c = Counter('my_failures_total', 'Description of counter')
306 c.inc() # Increment by 1
307 c.inc(1.6) # Increment by given value
308
309 There are utilities to count exceptions raised:
310
311 @c.count_exceptions()
312 def f():
313 pass
314
315 with c.count_exceptions():
316 pass
317
318 # Count only one type of exception
319 with c.count_exceptions(ValueError):
320 pass
321
322 You can also reset the counter to zero in case your logical "process" restarts
323 without restarting the actual python process.
324
325 c.reset()
326
327 """
328 _type = 'counter'
329
330 def _metric_init(self) -> None:
331 self._value = values.ValueClass(self._type, self._name, self._name + '_total', self._labelnames,
332 self._labelvalues, self._documentation)
333 self._created = time.time()
334
335 def inc(self, amount: float = 1, exemplar: Optional[Dict[str, str]] = None) -> None:
336 """Increment counter by the given amount."""
337 self._raise_if_not_observable()
338 if amount < 0:
339 raise ValueError('Counters can only be incremented by non-negative amounts.')
340 self._value.inc(amount)
341 if exemplar:
342 _validate_exemplar(exemplar)
343 self._value.set_exemplar(Exemplar(exemplar, amount, time.time()))
344
345 def reset(self) -> None:
346 """Reset the counter to zero. Use this when a logical process restarts without restarting the actual python process."""
347 self._value.set(0)
348 self._created = time.time()
349
350 def count_exceptions(self, exception: Union[Type[BaseException], Tuple[Type[BaseException], ...]] = Exception) -> ExceptionCounter:
351 """Count exceptions in a block of code or function.
352
353 Can be used as a function decorator or context manager.
354 Increments the counter when an exception of the given
355 type is raised up out of the code.
356 """
357 self._raise_if_not_observable()
358 return ExceptionCounter(self, exception)
359
360 def _child_samples(self) -> Iterable[Sample]:
361 sample = Sample('_total', {}, self._value.get(), None, self._value.get_exemplar())
362 if _use_created:
363 return (
364 sample,
365 Sample('_created', {}, self._created, None, None)
366 )
367 return (sample,)
368
369
370class Gauge(MetricWrapperBase):
371 """Gauge metric, to report instantaneous values.
372
373 Examples of Gauges include:
374 - Inprogress requests
375 - Number of items in a queue
376 - Free memory
377 - Total memory
378 - Temperature
379
380 Gauges can go both up and down.
381
382 from prometheus_client import Gauge
383
384 g = Gauge('my_inprogress_requests', 'Description of gauge')
385 g.inc() # Increment by 1
386 g.dec(10) # Decrement by given value
387 g.set(4.2) # Set to a given value
388
389 There are utilities for common use cases:
390
391 g.set_to_current_time() # Set to current unixtime
392
393 # Increment when entered, decrement when exited.
394 @g.track_inprogress()
395 def f():
396 pass
397
398 with g.track_inprogress():
399 pass
400
401 A Gauge can also take its value from a callback:
402
403 d = Gauge('data_objects', 'Number of objects')
404 my_dict = {}
405 d.set_function(lambda: len(my_dict))
406 """
407 _type = 'gauge'
408 _MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'))
409 _MOST_RECENT_MODES = frozenset(('mostrecent', 'livemostrecent'))
410
411 def __init__(self,
412 name: str,
413 documentation: str,
414 labelnames: Iterable[str] = (),
415 namespace: str = '',
416 subsystem: str = '',
417 unit: str = '',
418 registry: Optional[CollectorRegistry] = REGISTRY,
419 _labelvalues: Optional[Sequence[str]] = None,
420 multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'] = 'all',
421 ):
422 self._multiprocess_mode = multiprocess_mode
423 if multiprocess_mode not in self._MULTIPROC_MODES:
424 raise ValueError('Invalid multiprocess mode: ' + multiprocess_mode)
425 super().__init__(
426 name=name,
427 documentation=documentation,
428 labelnames=labelnames,
429 namespace=namespace,
430 subsystem=subsystem,
431 unit=unit,
432 registry=registry,
433 _labelvalues=_labelvalues,
434 )
435 self._kwargs['multiprocess_mode'] = self._multiprocess_mode
436 self._is_most_recent = self._multiprocess_mode in self._MOST_RECENT_MODES
437
438 def _metric_init(self) -> None:
439 self._value = values.ValueClass(
440 self._type, self._name, self._name, self._labelnames, self._labelvalues,
441 self._documentation, multiprocess_mode=self._multiprocess_mode
442 )
443
444 def inc(self, amount: float = 1) -> None:
445 """Increment gauge by the given amount."""
446 if self._is_most_recent:
447 raise RuntimeError("inc must not be used with the mostrecent mode")
448 self._raise_if_not_observable()
449 self._value.inc(amount)
450
451 def dec(self, amount: float = 1) -> None:
452 """Decrement gauge by the given amount."""
453 if self._is_most_recent:
454 raise RuntimeError("dec must not be used with the mostrecent mode")
455 self._raise_if_not_observable()
456 self._value.inc(-amount)
457
458 def set(self, value: float) -> None:
459 """Set gauge to the given value."""
460 self._raise_if_not_observable()
461 if self._is_most_recent:
462 self._value.set(float(value), timestamp=time.time())
463 else:
464 self._value.set(float(value))
465
466 def set_to_current_time(self) -> None:
467 """Set gauge to the current unixtime."""
468 self.set(time.time())
469
470 def track_inprogress(self) -> InprogressTracker:
471 """Track inprogress blocks of code or functions.
472
473 Can be used as a function decorator or context manager.
474 Increments the gauge when the code is entered,
475 and decrements when it is exited.
476 """
477 self._raise_if_not_observable()
478 return InprogressTracker(self)
479
480 def time(self) -> Timer:
481 """Time a block of code or function, and set the duration in seconds.
482
483 Can be used as a function decorator or context manager.
484 """
485 return Timer(self, 'set')
486
487 def set_function(self, f: Callable[[], float]) -> None:
488 """Call the provided function to return the Gauge value.
489
490 The function must return a float, and may be called from
491 multiple threads. All other methods of the Gauge become NOOPs.
492 """
493
494 self._raise_if_not_observable()
495
496 def samples(_: Gauge) -> Iterable[Sample]:
497 return (Sample('', {}, float(f()), None, None),)
498
499 self._child_samples = types.MethodType(samples, self) # type: ignore
500
501 def _child_samples(self) -> Iterable[Sample]:
502 return (Sample('', {}, self._value.get(), None, None),)
503
504
505class Summary(MetricWrapperBase):
506 """A Summary tracks the size and number of events.
507
508 Example use cases for Summaries:
509 - Response latency
510 - Request size
511
512 Example for a Summary:
513
514 from prometheus_client import Summary
515
516 s = Summary('request_size_bytes', 'Request size (bytes)')
517 s.observe(512) # Observe 512 (bytes)
518
519 Example for a Summary using time:
520
521 from prometheus_client import Summary
522
523 REQUEST_TIME = Summary('response_latency_seconds', 'Response latency (seconds)')
524
525 @REQUEST_TIME.time()
526 def create_response(request):
527 '''A dummy function'''
528 time.sleep(1)
529
530 Example for using the same Summary object as a context manager:
531
532 with REQUEST_TIME.time():
533 pass # Logic to be timed
534 """
535 _type = 'summary'
536 _reserved_labelnames = ['quantile']
537
538 def _metric_init(self) -> None:
539 self._count = values.ValueClass(self._type, self._name, self._name + '_count', self._labelnames,
540 self._labelvalues, self._documentation)
541 self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation)
542 self._created = time.time()
543
544 def observe(self, amount: float) -> None:
545 """Observe the given amount.
546
547 The amount is usually positive or zero. Negative values are
548 accepted but prevent current versions of Prometheus from
549 properly detecting counter resets in the sum of
550 observations. See
551 https://prometheus.io/docs/practices/histograms/#count-and-sum-of-observations
552 for details.
553 """
554 self._raise_if_not_observable()
555 self._count.inc(1)
556 self._sum.inc(amount)
557
558 def time(self) -> Timer:
559 """Time a block of code or function, and observe the duration in seconds.
560
561 Can be used as a function decorator or context manager.
562 """
563 return Timer(self, 'observe')
564
565 def _child_samples(self) -> Iterable[Sample]:
566 samples = [
567 Sample('_count', {}, self._count.get(), None, None),
568 Sample('_sum', {}, self._sum.get(), None, None),
569 ]
570 if _use_created:
571 samples.append(Sample('_created', {}, self._created, None, None))
572 return tuple(samples)
573
574
575class Histogram(MetricWrapperBase):
576 """A Histogram tracks the size and number of events in buckets.
577
578 You can use Histograms for aggregatable calculation of quantiles.
579
580 Example use cases:
581 - Response latency
582 - Request size
583
584 Example for a Histogram:
585
586 from prometheus_client import Histogram
587
588 h = Histogram('request_size_bytes', 'Request size (bytes)')
589 h.observe(512) # Observe 512 (bytes)
590
591 Example for a Histogram using time:
592
593 from prometheus_client import Histogram
594
595 REQUEST_TIME = Histogram('response_latency_seconds', 'Response latency (seconds)')
596
597 @REQUEST_TIME.time()
598 def create_response(request):
599 '''A dummy function'''
600 time.sleep(1)
601
602 Example of using the same Histogram object as a context manager:
603
604 with REQUEST_TIME.time():
605 pass # Logic to be timed
606
607 The default buckets are intended to cover a typical web/rpc request from milliseconds to seconds.
608 They can be overridden by passing `buckets` keyword argument to `Histogram`.
609 """
610 _type = 'histogram'
611 _reserved_labelnames = ['le']
612 DEFAULT_BUCKETS = (.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, INF)
613
614 def __init__(self,
615 name: str,
616 documentation: str,
617 labelnames: Iterable[str] = (),
618 namespace: str = '',
619 subsystem: str = '',
620 unit: str = '',
621 registry: Optional[CollectorRegistry] = REGISTRY,
622 _labelvalues: Optional[Sequence[str]] = None,
623 buckets: Sequence[Union[float, str]] = DEFAULT_BUCKETS,
624 ):
625 self._prepare_buckets(buckets)
626 super().__init__(
627 name=name,
628 documentation=documentation,
629 labelnames=labelnames,
630 namespace=namespace,
631 subsystem=subsystem,
632 unit=unit,
633 registry=registry,
634 _labelvalues=_labelvalues,
635 )
636 self._kwargs['buckets'] = buckets
637
638 def _prepare_buckets(self, source_buckets: Sequence[Union[float, str]]) -> None:
639 buckets = [float(b) for b in source_buckets]
640 if buckets != sorted(buckets):
641 # This is probably an error on the part of the user,
642 # so raise rather than sorting for them.
643 raise ValueError('Buckets not in sorted order')
644 if buckets and buckets[-1] != INF:
645 buckets.append(INF)
646 if len(buckets) < 2:
647 raise ValueError('Must have at least two buckets')
648 self._upper_bounds = buckets
649
650 def _metric_init(self) -> None:
651 self._buckets: List[values.ValueClass] = []
652 self._created = time.time()
653 bucket_labelnames = self._labelnames + ('le',)
654 self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation)
655 for b in self._upper_bounds:
656 self._buckets.append(values.ValueClass(
657 self._type,
658 self._name,
659 self._name + '_bucket',
660 bucket_labelnames,
661 self._labelvalues + (floatToGoString(b),),
662 self._documentation)
663 )
664
665 def observe(self, amount: float, exemplar: Optional[Dict[str, str]] = None) -> None:
666 """Observe the given amount.
667
668 The amount is usually positive or zero. Negative values are
669 accepted but prevent current versions of Prometheus from
670 properly detecting counter resets in the sum of
671 observations. See
672 https://prometheus.io/docs/practices/histograms/#count-and-sum-of-observations
673 for details.
674 """
675 self._raise_if_not_observable()
676 self._sum.inc(amount)
677 for i, bound in enumerate(self._upper_bounds):
678 if amount <= bound:
679 self._buckets[i].inc(1)
680 if exemplar:
681 _validate_exemplar(exemplar)
682 self._buckets[i].set_exemplar(Exemplar(exemplar, amount, time.time()))
683 break
684
685 def time(self) -> Timer:
686 """Time a block of code or function, and observe the duration in seconds.
687
688 Can be used as a function decorator or context manager.
689 """
690 return Timer(self, 'observe')
691
692 def _child_samples(self) -> Iterable[Sample]:
693 samples = []
694 acc = 0.0
695 for i, bound in enumerate(self._upper_bounds):
696 acc += self._buckets[i].get()
697 samples.append(Sample('_bucket', {'le': floatToGoString(bound)}, acc, None, self._buckets[i].get_exemplar()))
698 samples.append(Sample('_count', {}, acc, None, None))
699 if self._upper_bounds[0] >= 0:
700 samples.append(Sample('_sum', {}, self._sum.get(), None, None))
701 if _use_created:
702 samples.append(Sample('_created', {}, self._created, None, None))
703 return tuple(samples)
704
705
706class Info(MetricWrapperBase):
707 """Info metric, key-value pairs.
708
709 Examples of Info include:
710 - Build information
711 - Version information
712 - Potential target metadata
713
714 Example usage:
715 from prometheus_client import Info
716
717 i = Info('my_build', 'Description of info')
718 i.info({'version': '1.2.3', 'buildhost': 'foo@bar'})
719
720 Info metrics do not work in multiprocess mode.
721 """
722 _type = 'info'
723
724 def _metric_init(self):
725 self._labelname_set = set(self._labelnames)
726 self._lock = Lock()
727 self._value = {}
728
729 def info(self, val: Dict[str, str]) -> None:
730 """Set info metric."""
731 if self._labelname_set.intersection(val.keys()):
732 raise ValueError('Overlapping labels for Info metric, metric: {} child: {}'.format(
733 self._labelnames, val))
734 if any(i is None for i in val.values()):
735 raise ValueError('Label value cannot be None')
736 with self._lock:
737 self._value = dict(val)
738
739 def _child_samples(self) -> Iterable[Sample]:
740 with self._lock:
741 return (Sample('_info', self._value, 1.0, None, None),)
742
743
744class Enum(MetricWrapperBase):
745 """Enum metric, which of a set of states is true.
746
747 Example usage:
748 from prometheus_client import Enum
749
750 e = Enum('task_state', 'Description of enum',
751 states=['starting', 'running', 'stopped'])
752 e.state('running')
753
754 The first listed state will be the default.
755 Enum metrics do not work in multiprocess mode.
756 """
757 _type = 'stateset'
758
759 def __init__(self,
760 name: str,
761 documentation: str,
762 labelnames: Sequence[str] = (),
763 namespace: str = '',
764 subsystem: str = '',
765 unit: str = '',
766 registry: Optional[CollectorRegistry] = REGISTRY,
767 _labelvalues: Optional[Sequence[str]] = None,
768 states: Optional[Sequence[str]] = None,
769 ):
770 super().__init__(
771 name=name,
772 documentation=documentation,
773 labelnames=labelnames,
774 namespace=namespace,
775 subsystem=subsystem,
776 unit=unit,
777 registry=registry,
778 _labelvalues=_labelvalues,
779 )
780 if name in labelnames:
781 raise ValueError(f'Overlapping labels for Enum metric: {name}')
782 if not states:
783 raise ValueError(f'No states provided for Enum metric: {name}')
784 self._kwargs['states'] = self._states = states
785
786 def _metric_init(self) -> None:
787 self._value = 0
788 self._lock = Lock()
789
790 def state(self, state: str) -> None:
791 """Set enum metric state."""
792 self._raise_if_not_observable()
793 with self._lock:
794 self._value = self._states.index(state)
795
796 def _child_samples(self) -> Iterable[Sample]:
797 with self._lock:
798 return [
799 Sample('', {self._name: s}, 1 if i == self._value else 0, None, None)
800 for i, s
801 in enumerate(self._states)
802 ]