1import os
2from threading import Lock
3import time
4import types
5from typing import (
6 Any, Callable, Dict, Iterable, List, Literal, Optional, Sequence, Tuple,
7 Type, TypeVar, Union,
8)
9import warnings
10
11from . import values # retain this import style for testability
12from .context_managers import ExceptionCounter, InprogressTracker, Timer
13from .metrics_core import (
14 Metric, METRIC_LABEL_NAME_RE, METRIC_NAME_RE,
15 RESERVED_METRIC_LABEL_NAME_RE,
16)
17from .registry import Collector, CollectorRegistry, REGISTRY
18from .samples import Exemplar, Sample
19from .utils import floatToGoString, INF
20
21T = TypeVar('T', bound='MetricWrapperBase')
22F = TypeVar("F", bound=Callable[..., Any])
23
24
25def _build_full_name(metric_type, name, namespace, subsystem, unit):
26 full_name = ''
27 if namespace:
28 full_name += namespace + '_'
29 if subsystem:
30 full_name += subsystem + '_'
31 full_name += name
32 if metric_type == 'counter' and full_name.endswith('_total'):
33 full_name = full_name[:-6] # Munge to OpenMetrics.
34 if unit and not full_name.endswith("_" + unit):
35 full_name += "_" + unit
36 if unit and metric_type in ('info', 'stateset'):
37 raise ValueError('Metric name is of a type that cannot have a unit: ' + full_name)
38 return full_name
39
40
41def _validate_labelname(l):
42 if not METRIC_LABEL_NAME_RE.match(l):
43 raise ValueError('Invalid label metric name: ' + l)
44 if RESERVED_METRIC_LABEL_NAME_RE.match(l):
45 raise ValueError('Reserved label metric name: ' + l)
46
47
48def _validate_labelnames(cls, labelnames):
49 labelnames = tuple(labelnames)
50 for l in labelnames:
51 _validate_labelname(l)
52 if l in cls._reserved_labelnames:
53 raise ValueError('Reserved label metric name: ' + l)
54 return labelnames
55
56
57def _validate_exemplar(exemplar):
58 runes = 0
59 for k, v in exemplar.items():
60 _validate_labelname(k)
61 runes += len(k)
62 runes += len(v)
63 if runes > 128:
64 raise ValueError('Exemplar labels have %d UTF-8 characters, exceeding the limit of 128')
65
66
67def _get_use_created() -> bool:
68 return os.environ.get("PROMETHEUS_DISABLE_CREATED_SERIES", 'False').lower() not in ('true', '1', 't')
69
70
71_use_created = _get_use_created()
72
73
74def disable_created_metrics():
75 """Disable exporting _created metrics on counters, histograms, and summaries."""
76 global _use_created
77 _use_created = False
78
79
80def enable_created_metrics():
81 """Enable exporting _created metrics on counters, histograms, and summaries."""
82 global _use_created
83 _use_created = True
84
85
86class MetricWrapperBase(Collector):
87 _type: Optional[str] = None
88 _reserved_labelnames: Sequence[str] = ()
89
90 def _is_observable(self):
91 # Whether this metric is observable, i.e.
92 # * a metric without label names and values, or
93 # * the child of a labelled metric.
94 return not self._labelnames or (self._labelnames and self._labelvalues)
95
96 def _raise_if_not_observable(self):
97 # Functions that mutate the state of the metric, for example incrementing
98 # a counter, will fail if the metric is not observable, because only if a
99 # metric is observable will the value be initialized.
100 if not self._is_observable():
101 raise ValueError('%s metric is missing label values' % str(self._type))
102
103 def _is_parent(self):
104 return self._labelnames and not self._labelvalues
105
106 def _get_metric(self):
107 return Metric(self._name, self._documentation, self._type, self._unit)
108
109 def describe(self) -> Iterable[Metric]:
110 return [self._get_metric()]
111
112 def collect(self) -> Iterable[Metric]:
113 metric = self._get_metric()
114 for suffix, labels, value, timestamp, exemplar in self._samples():
115 metric.add_sample(self._name + suffix, labels, value, timestamp, exemplar)
116 return [metric]
117
118 def __str__(self) -> str:
119 return f"{self._type}:{self._name}"
120
121 def __repr__(self) -> str:
122 metric_type = type(self)
123 return f"{metric_type.__module__}.{metric_type.__name__}({self._name})"
124
125 def __init__(self: T,
126 name: str,
127 documentation: str,
128 labelnames: Iterable[str] = (),
129 namespace: str = '',
130 subsystem: str = '',
131 unit: str = '',
132 registry: Optional[CollectorRegistry] = REGISTRY,
133 _labelvalues: Optional[Sequence[str]] = None,
134 ) -> None:
135 self._name = _build_full_name(self._type, name, namespace, subsystem, unit)
136 self._labelnames = _validate_labelnames(self, labelnames)
137 self._labelvalues = tuple(_labelvalues or ())
138 self._kwargs: Dict[str, Any] = {}
139 self._documentation = documentation
140 self._unit = unit
141
142 if not METRIC_NAME_RE.match(self._name):
143 raise ValueError('Invalid metric name: ' + self._name)
144
145 if self._is_parent():
146 # Prepare the fields needed for child metrics.
147 self._lock = Lock()
148 self._metrics: Dict[Sequence[str], T] = {}
149
150 if self._is_observable():
151 self._metric_init()
152
153 if not self._labelvalues:
154 # Register the multi-wrapper parent metric, or if a label-less metric, the whole shebang.
155 if registry:
156 registry.register(self)
157
158 def labels(self: T, *labelvalues: Any, **labelkwargs: Any) -> T:
159 """Return the child for the given labelset.
160
161 All metrics can have labels, allowing grouping of related time series.
162 Taking a counter as an example:
163
164 from prometheus_client import Counter
165
166 c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint'])
167 c.labels('get', '/').inc()
168 c.labels('post', '/submit').inc()
169
170 Labels can also be provided as keyword arguments:
171
172 from prometheus_client import Counter
173
174 c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint'])
175 c.labels(method='get', endpoint='/').inc()
176 c.labels(method='post', endpoint='/submit').inc()
177
178 See the best practices on [naming](http://prometheus.io/docs/practices/naming/)
179 and [labels](http://prometheus.io/docs/practices/instrumentation/#use-labels).
180 """
181 if not self._labelnames:
182 raise ValueError('No label names were set when constructing %s' % self)
183
184 if self._labelvalues:
185 raise ValueError('{} already has labels set ({}); can not chain calls to .labels()'.format(
186 self,
187 dict(zip(self._labelnames, self._labelvalues))
188 ))
189
190 if labelvalues and labelkwargs:
191 raise ValueError("Can't pass both *args and **kwargs")
192
193 if labelkwargs:
194 if sorted(labelkwargs) != sorted(self._labelnames):
195 raise ValueError('Incorrect label names')
196 labelvalues = tuple(str(labelkwargs[l]) for l in self._labelnames)
197 else:
198 if len(labelvalues) != len(self._labelnames):
199 raise ValueError('Incorrect label count')
200 labelvalues = tuple(str(l) for l in labelvalues)
201 with self._lock:
202 if labelvalues not in self._metrics:
203 self._metrics[labelvalues] = self.__class__(
204 self._name,
205 documentation=self._documentation,
206 labelnames=self._labelnames,
207 unit=self._unit,
208 _labelvalues=labelvalues,
209 **self._kwargs
210 )
211 return self._metrics[labelvalues]
212
213 def remove(self, *labelvalues: Any) -> None:
214 if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
215 warnings.warn(
216 "Removal of labels has not been implemented in multi-process mode yet.",
217 UserWarning)
218
219 if not self._labelnames:
220 raise ValueError('No label names were set when constructing %s' % self)
221
222 """Remove the given labelset from the metric."""
223 if len(labelvalues) != len(self._labelnames):
224 raise ValueError('Incorrect label count (expected %d, got %s)' % (len(self._labelnames), labelvalues))
225 labelvalues = tuple(str(l) for l in labelvalues)
226 with self._lock:
227 del self._metrics[labelvalues]
228
229 def clear(self) -> None:
230 """Remove all labelsets from the metric"""
231 if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
232 warnings.warn(
233 "Clearing labels has not been implemented in multi-process mode yet",
234 UserWarning)
235 with self._lock:
236 self._metrics = {}
237
238 def _samples(self) -> Iterable[Sample]:
239 if self._is_parent():
240 return self._multi_samples()
241 else:
242 return self._child_samples()
243
244 def _multi_samples(self) -> Iterable[Sample]:
245 with self._lock:
246 metrics = self._metrics.copy()
247 for labels, metric in metrics.items():
248 series_labels = list(zip(self._labelnames, labels))
249 for suffix, sample_labels, value, timestamp, exemplar in metric._samples():
250 yield Sample(suffix, dict(series_labels + list(sample_labels.items())), value, timestamp, exemplar)
251
252 def _child_samples(self) -> Iterable[Sample]: # pragma: no cover
253 raise NotImplementedError('_child_samples() must be implemented by %r' % self)
254
255 def _metric_init(self): # pragma: no cover
256 """
257 Initialize the metric object as a child, i.e. when it has labels (if any) set.
258
259 This is factored as a separate function to allow for deferred initialization.
260 """
261 raise NotImplementedError('_metric_init() must be implemented by %r' % self)
262
263
264class Counter(MetricWrapperBase):
265 """A Counter tracks counts of events or running totals.
266
267 Example use cases for Counters:
268 - Number of requests processed
269 - Number of items that were inserted into a queue
270 - Total amount of data that a system has processed
271
272 Counters can only go up (and be reset when the process restarts). If your use case can go down,
273 you should use a Gauge instead.
274
275 An example for a Counter:
276
277 from prometheus_client import Counter
278
279 c = Counter('my_failures_total', 'Description of counter')
280 c.inc() # Increment by 1
281 c.inc(1.6) # Increment by given value
282
283 There are utilities to count exceptions raised:
284
285 @c.count_exceptions()
286 def f():
287 pass
288
289 with c.count_exceptions():
290 pass
291
292 # Count only one type of exception
293 with c.count_exceptions(ValueError):
294 pass
295
296 You can also reset the counter to zero in case your logical "process" restarts
297 without restarting the actual python process.
298
299 c.reset()
300
301 """
302 _type = 'counter'
303
304 def _metric_init(self) -> None:
305 self._value = values.ValueClass(self._type, self._name, self._name + '_total', self._labelnames,
306 self._labelvalues, self._documentation)
307 self._created = time.time()
308
309 def inc(self, amount: float = 1, exemplar: Optional[Dict[str, str]] = None) -> None:
310 """Increment counter by the given amount."""
311 self._raise_if_not_observable()
312 if amount < 0:
313 raise ValueError('Counters can only be incremented by non-negative amounts.')
314 self._value.inc(amount)
315 if exemplar:
316 _validate_exemplar(exemplar)
317 self._value.set_exemplar(Exemplar(exemplar, amount, time.time()))
318
319 def reset(self) -> None:
320 """Reset the counter to zero. Use this when a logical process restarts without restarting the actual python process."""
321 self._value.set(0)
322 self._created = time.time()
323
324 def count_exceptions(self, exception: Union[Type[BaseException], Tuple[Type[BaseException], ...]] = Exception) -> ExceptionCounter:
325 """Count exceptions in a block of code or function.
326
327 Can be used as a function decorator or context manager.
328 Increments the counter when an exception of the given
329 type is raised up out of the code.
330 """
331 self._raise_if_not_observable()
332 return ExceptionCounter(self, exception)
333
334 def _child_samples(self) -> Iterable[Sample]:
335 sample = Sample('_total', {}, self._value.get(), None, self._value.get_exemplar())
336 if _use_created:
337 return (
338 sample,
339 Sample('_created', {}, self._created, None, None)
340 )
341 return (sample,)
342
343
344class Gauge(MetricWrapperBase):
345 """Gauge metric, to report instantaneous values.
346
347 Examples of Gauges include:
348 - Inprogress requests
349 - Number of items in a queue
350 - Free memory
351 - Total memory
352 - Temperature
353
354 Gauges can go both up and down.
355
356 from prometheus_client import Gauge
357
358 g = Gauge('my_inprogress_requests', 'Description of gauge')
359 g.inc() # Increment by 1
360 g.dec(10) # Decrement by given value
361 g.set(4.2) # Set to a given value
362
363 There are utilities for common use cases:
364
365 g.set_to_current_time() # Set to current unixtime
366
367 # Increment when entered, decrement when exited.
368 @g.track_inprogress()
369 def f():
370 pass
371
372 with g.track_inprogress():
373 pass
374
375 A Gauge can also take its value from a callback:
376
377 d = Gauge('data_objects', 'Number of objects')
378 my_dict = {}
379 d.set_function(lambda: len(my_dict))
380 """
381 _type = 'gauge'
382 _MULTIPROC_MODES = frozenset(('all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'))
383 _MOST_RECENT_MODES = frozenset(('mostrecent', 'livemostrecent'))
384
385 def __init__(self,
386 name: str,
387 documentation: str,
388 labelnames: Iterable[str] = (),
389 namespace: str = '',
390 subsystem: str = '',
391 unit: str = '',
392 registry: Optional[CollectorRegistry] = REGISTRY,
393 _labelvalues: Optional[Sequence[str]] = None,
394 multiprocess_mode: Literal['all', 'liveall', 'min', 'livemin', 'max', 'livemax', 'sum', 'livesum', 'mostrecent', 'livemostrecent'] = 'all',
395 ):
396 self._multiprocess_mode = multiprocess_mode
397 if multiprocess_mode not in self._MULTIPROC_MODES:
398 raise ValueError('Invalid multiprocess mode: ' + multiprocess_mode)
399 super().__init__(
400 name=name,
401 documentation=documentation,
402 labelnames=labelnames,
403 namespace=namespace,
404 subsystem=subsystem,
405 unit=unit,
406 registry=registry,
407 _labelvalues=_labelvalues,
408 )
409 self._kwargs['multiprocess_mode'] = self._multiprocess_mode
410 self._is_most_recent = self._multiprocess_mode in self._MOST_RECENT_MODES
411
412 def _metric_init(self) -> None:
413 self._value = values.ValueClass(
414 self._type, self._name, self._name, self._labelnames, self._labelvalues,
415 self._documentation, multiprocess_mode=self._multiprocess_mode
416 )
417
418 def inc(self, amount: float = 1) -> None:
419 """Increment gauge by the given amount."""
420 if self._is_most_recent:
421 raise RuntimeError("inc must not be used with the mostrecent mode")
422 self._raise_if_not_observable()
423 self._value.inc(amount)
424
425 def dec(self, amount: float = 1) -> None:
426 """Decrement gauge by the given amount."""
427 if self._is_most_recent:
428 raise RuntimeError("dec must not be used with the mostrecent mode")
429 self._raise_if_not_observable()
430 self._value.inc(-amount)
431
432 def set(self, value: float) -> None:
433 """Set gauge to the given value."""
434 self._raise_if_not_observable()
435 if self._is_most_recent:
436 self._value.set(float(value), timestamp=time.time())
437 else:
438 self._value.set(float(value))
439
440 def set_to_current_time(self) -> None:
441 """Set gauge to the current unixtime."""
442 self.set(time.time())
443
444 def track_inprogress(self) -> InprogressTracker:
445 """Track inprogress blocks of code or functions.
446
447 Can be used as a function decorator or context manager.
448 Increments the gauge when the code is entered,
449 and decrements when it is exited.
450 """
451 self._raise_if_not_observable()
452 return InprogressTracker(self)
453
454 def time(self) -> Timer:
455 """Time a block of code or function, and set the duration in seconds.
456
457 Can be used as a function decorator or context manager.
458 """
459 return Timer(self, 'set')
460
461 def set_function(self, f: Callable[[], float]) -> None:
462 """Call the provided function to return the Gauge value.
463
464 The function must return a float, and may be called from
465 multiple threads. All other methods of the Gauge become NOOPs.
466 """
467
468 self._raise_if_not_observable()
469
470 def samples(_: Gauge) -> Iterable[Sample]:
471 return (Sample('', {}, float(f()), None, None),)
472
473 self._child_samples = types.MethodType(samples, self) # type: ignore
474
475 def _child_samples(self) -> Iterable[Sample]:
476 return (Sample('', {}, self._value.get(), None, None),)
477
478
479class Summary(MetricWrapperBase):
480 """A Summary tracks the size and number of events.
481
482 Example use cases for Summaries:
483 - Response latency
484 - Request size
485
486 Example for a Summary:
487
488 from prometheus_client import Summary
489
490 s = Summary('request_size_bytes', 'Request size (bytes)')
491 s.observe(512) # Observe 512 (bytes)
492
493 Example for a Summary using time:
494
495 from prometheus_client import Summary
496
497 REQUEST_TIME = Summary('response_latency_seconds', 'Response latency (seconds)')
498
499 @REQUEST_TIME.time()
500 def create_response(request):
501 '''A dummy function'''
502 time.sleep(1)
503
504 Example for using the same Summary object as a context manager:
505
506 with REQUEST_TIME.time():
507 pass # Logic to be timed
508 """
509 _type = 'summary'
510 _reserved_labelnames = ['quantile']
511
512 def _metric_init(self) -> None:
513 self._count = values.ValueClass(self._type, self._name, self._name + '_count', self._labelnames,
514 self._labelvalues, self._documentation)
515 self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation)
516 self._created = time.time()
517
518 def observe(self, amount: float) -> None:
519 """Observe the given amount.
520
521 The amount is usually positive or zero. Negative values are
522 accepted but prevent current versions of Prometheus from
523 properly detecting counter resets in the sum of
524 observations. See
525 https://prometheus.io/docs/practices/histograms/#count-and-sum-of-observations
526 for details.
527 """
528 self._raise_if_not_observable()
529 self._count.inc(1)
530 self._sum.inc(amount)
531
532 def time(self) -> Timer:
533 """Time a block of code or function, and observe the duration in seconds.
534
535 Can be used as a function decorator or context manager.
536 """
537 return Timer(self, 'observe')
538
539 def _child_samples(self) -> Iterable[Sample]:
540 samples = [
541 Sample('_count', {}, self._count.get(), None, None),
542 Sample('_sum', {}, self._sum.get(), None, None),
543 ]
544 if _use_created:
545 samples.append(Sample('_created', {}, self._created, None, None))
546 return tuple(samples)
547
548
549class Histogram(MetricWrapperBase):
550 """A Histogram tracks the size and number of events in buckets.
551
552 You can use Histograms for aggregatable calculation of quantiles.
553
554 Example use cases:
555 - Response latency
556 - Request size
557
558 Example for a Histogram:
559
560 from prometheus_client import Histogram
561
562 h = Histogram('request_size_bytes', 'Request size (bytes)')
563 h.observe(512) # Observe 512 (bytes)
564
565 Example for a Histogram using time:
566
567 from prometheus_client import Histogram
568
569 REQUEST_TIME = Histogram('response_latency_seconds', 'Response latency (seconds)')
570
571 @REQUEST_TIME.time()
572 def create_response(request):
573 '''A dummy function'''
574 time.sleep(1)
575
576 Example of using the same Histogram object as a context manager:
577
578 with REQUEST_TIME.time():
579 pass # Logic to be timed
580
581 The default buckets are intended to cover a typical web/rpc request from milliseconds to seconds.
582 They can be overridden by passing `buckets` keyword argument to `Histogram`.
583 """
584 _type = 'histogram'
585 _reserved_labelnames = ['le']
586 DEFAULT_BUCKETS = (.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, INF)
587
588 def __init__(self,
589 name: str,
590 documentation: str,
591 labelnames: Iterable[str] = (),
592 namespace: str = '',
593 subsystem: str = '',
594 unit: str = '',
595 registry: Optional[CollectorRegistry] = REGISTRY,
596 _labelvalues: Optional[Sequence[str]] = None,
597 buckets: Sequence[Union[float, str]] = DEFAULT_BUCKETS,
598 ):
599 self._prepare_buckets(buckets)
600 super().__init__(
601 name=name,
602 documentation=documentation,
603 labelnames=labelnames,
604 namespace=namespace,
605 subsystem=subsystem,
606 unit=unit,
607 registry=registry,
608 _labelvalues=_labelvalues,
609 )
610 self._kwargs['buckets'] = buckets
611
612 def _prepare_buckets(self, source_buckets: Sequence[Union[float, str]]) -> None:
613 buckets = [float(b) for b in source_buckets]
614 if buckets != sorted(buckets):
615 # This is probably an error on the part of the user,
616 # so raise rather than sorting for them.
617 raise ValueError('Buckets not in sorted order')
618 if buckets and buckets[-1] != INF:
619 buckets.append(INF)
620 if len(buckets) < 2:
621 raise ValueError('Must have at least two buckets')
622 self._upper_bounds = buckets
623
624 def _metric_init(self) -> None:
625 self._buckets: List[values.ValueClass] = []
626 self._created = time.time()
627 bucket_labelnames = self._labelnames + ('le',)
628 self._sum = values.ValueClass(self._type, self._name, self._name + '_sum', self._labelnames, self._labelvalues, self._documentation)
629 for b in self._upper_bounds:
630 self._buckets.append(values.ValueClass(
631 self._type,
632 self._name,
633 self._name + '_bucket',
634 bucket_labelnames,
635 self._labelvalues + (floatToGoString(b),),
636 self._documentation)
637 )
638
639 def observe(self, amount: float, exemplar: Optional[Dict[str, str]] = None) -> None:
640 """Observe the given amount.
641
642 The amount is usually positive or zero. Negative values are
643 accepted but prevent current versions of Prometheus from
644 properly detecting counter resets in the sum of
645 observations. See
646 https://prometheus.io/docs/practices/histograms/#count-and-sum-of-observations
647 for details.
648 """
649 self._raise_if_not_observable()
650 self._sum.inc(amount)
651 for i, bound in enumerate(self._upper_bounds):
652 if amount <= bound:
653 self._buckets[i].inc(1)
654 if exemplar:
655 _validate_exemplar(exemplar)
656 self._buckets[i].set_exemplar(Exemplar(exemplar, amount, time.time()))
657 break
658
659 def time(self) -> Timer:
660 """Time a block of code or function, and observe the duration in seconds.
661
662 Can be used as a function decorator or context manager.
663 """
664 return Timer(self, 'observe')
665
666 def _child_samples(self) -> Iterable[Sample]:
667 samples = []
668 acc = 0.0
669 for i, bound in enumerate(self._upper_bounds):
670 acc += self._buckets[i].get()
671 samples.append(Sample('_bucket', {'le': floatToGoString(bound)}, acc, None, self._buckets[i].get_exemplar()))
672 samples.append(Sample('_count', {}, acc, None, None))
673 if self._upper_bounds[0] >= 0:
674 samples.append(Sample('_sum', {}, self._sum.get(), None, None))
675 if _use_created:
676 samples.append(Sample('_created', {}, self._created, None, None))
677 return tuple(samples)
678
679
680class Info(MetricWrapperBase):
681 """Info metric, key-value pairs.
682
683 Examples of Info include:
684 - Build information
685 - Version information
686 - Potential target metadata
687
688 Example usage:
689 from prometheus_client import Info
690
691 i = Info('my_build', 'Description of info')
692 i.info({'version': '1.2.3', 'buildhost': 'foo@bar'})
693
694 Info metrics do not work in multiprocess mode.
695 """
696 _type = 'info'
697
698 def _metric_init(self):
699 self._labelname_set = set(self._labelnames)
700 self._lock = Lock()
701 self._value = {}
702
703 def info(self, val: Dict[str, str]) -> None:
704 """Set info metric."""
705 if self._labelname_set.intersection(val.keys()):
706 raise ValueError('Overlapping labels for Info metric, metric: {} child: {}'.format(
707 self._labelnames, val))
708 with self._lock:
709 self._value = dict(val)
710
711 def _child_samples(self) -> Iterable[Sample]:
712 with self._lock:
713 return (Sample('_info', self._value, 1.0, None, None),)
714
715
716class Enum(MetricWrapperBase):
717 """Enum metric, which of a set of states is true.
718
719 Example usage:
720 from prometheus_client import Enum
721
722 e = Enum('task_state', 'Description of enum',
723 states=['starting', 'running', 'stopped'])
724 e.state('running')
725
726 The first listed state will be the default.
727 Enum metrics do not work in multiprocess mode.
728 """
729 _type = 'stateset'
730
731 def __init__(self,
732 name: str,
733 documentation: str,
734 labelnames: Sequence[str] = (),
735 namespace: str = '',
736 subsystem: str = '',
737 unit: str = '',
738 registry: Optional[CollectorRegistry] = REGISTRY,
739 _labelvalues: Optional[Sequence[str]] = None,
740 states: Optional[Sequence[str]] = None,
741 ):
742 super().__init__(
743 name=name,
744 documentation=documentation,
745 labelnames=labelnames,
746 namespace=namespace,
747 subsystem=subsystem,
748 unit=unit,
749 registry=registry,
750 _labelvalues=_labelvalues,
751 )
752 if name in labelnames:
753 raise ValueError(f'Overlapping labels for Enum metric: {name}')
754 if not states:
755 raise ValueError(f'No states provided for Enum metric: {name}')
756 self._kwargs['states'] = self._states = states
757
758 def _metric_init(self) -> None:
759 self._value = 0
760 self._lock = Lock()
761
762 def state(self, state: str) -> None:
763 """Set enum metric state."""
764 self._raise_if_not_observable()
765 with self._lock:
766 self._value = self._states.index(state)
767
768 def _child_samples(self) -> Iterable[Sample]:
769 with self._lock:
770 return [
771 Sample('', {self._name: s}, 1 if i == self._value else 0, None, None)
772 for i, s
773 in enumerate(self._states)
774 ]