1from abc import ABC, abstractmethod
2import copy
3from threading import Lock
4from typing import Dict, Iterable, List, Optional
5
6from .metrics_core import Metric
7
8
9# Ideally this would be a Protocol, but Protocols are only available in Python >= 3.8.
10class Collector(ABC):
11 @abstractmethod
12 def collect(self) -> Iterable[Metric]:
13 pass
14
15
16class _EmptyCollector(Collector):
17 def collect(self) -> Iterable[Metric]:
18 return []
19
20
21class CollectorRegistry(Collector):
22 """Metric collector registry.
23
24 Collectors must have a no-argument method 'collect' that returns a list of
25 Metric objects. The returned metrics should be consistent with the Prometheus
26 exposition formats.
27 """
28
29 def __init__(self, auto_describe: bool = False, target_info: Optional[Dict[str, str]] = None):
30 self._collector_to_names: Dict[Collector, List[str]] = {}
31 self._names_to_collectors: Dict[str, Collector] = {}
32 self._auto_describe = auto_describe
33 self._lock = Lock()
34 self._target_info: Optional[Dict[str, str]] = {}
35 self.set_target_info(target_info)
36
37 def register(self, collector: Collector) -> None:
38 """Add a collector to the registry."""
39 with self._lock:
40 names = self._get_names(collector)
41 duplicates = set(self._names_to_collectors).intersection(names)
42 if duplicates:
43 raise ValueError(
44 'Duplicated timeseries in CollectorRegistry: {}'.format(
45 duplicates))
46 for name in names:
47 self._names_to_collectors[name] = collector
48 self._collector_to_names[collector] = names
49
50 def unregister(self, collector: Collector) -> None:
51 """Remove a collector from the registry."""
52 with self._lock:
53 for name in self._collector_to_names[collector]:
54 del self._names_to_collectors[name]
55 del self._collector_to_names[collector]
56
57 def _get_names(self, collector):
58 """Get names of timeseries the collector produces and clashes with."""
59 desc_func = None
60 # If there's a describe function, use it.
61 try:
62 desc_func = collector.describe
63 except AttributeError:
64 pass
65 # Otherwise, if auto describe is enabled use the collect function.
66 if not desc_func and self._auto_describe:
67 desc_func = collector.collect
68
69 if not desc_func:
70 return []
71
72 result = []
73 type_suffixes = {
74 'counter': ['_total', '_created'],
75 'summary': ['_sum', '_count', '_created'],
76 'histogram': ['_bucket', '_sum', '_count', '_created'],
77 'gaugehistogram': ['_bucket', '_gsum', '_gcount'],
78 'info': ['_info'],
79 }
80 for metric in desc_func():
81 result.append(metric.name)
82 for suffix in type_suffixes.get(metric.type, []):
83 result.append(metric.name + suffix)
84 return result
85
86 def collect(self) -> Iterable[Metric]:
87 """Yields metrics from the collectors in the registry."""
88 collectors = None
89 ti = None
90 with self._lock:
91 collectors = copy.copy(self._collector_to_names)
92 if self._target_info:
93 ti = self._target_info_metric()
94 if ti:
95 yield ti
96 for collector in collectors:
97 yield from collector.collect()
98
99 def restricted_registry(self, names: Iterable[str]) -> "RestrictedRegistry":
100 """Returns object that only collects some metrics.
101
102 Returns an object which upon collect() will return
103 only samples with the given names.
104
105 Intended usage is:
106 generate_latest(REGISTRY.restricted_registry(['a_timeseries']))
107
108 Experimental."""
109 names = set(names)
110 return RestrictedRegistry(names, self)
111
112 def set_target_info(self, labels: Optional[Dict[str, str]]) -> None:
113 with self._lock:
114 if labels:
115 if not self._target_info and 'target_info' in self._names_to_collectors:
116 raise ValueError('CollectorRegistry already contains a target_info metric')
117 self._names_to_collectors['target_info'] = _EmptyCollector()
118 elif self._target_info:
119 self._names_to_collectors.pop('target_info', None)
120 self._target_info = labels
121
122 def get_target_info(self) -> Optional[Dict[str, str]]:
123 with self._lock:
124 return self._target_info
125
126 def _target_info_metric(self):
127 m = Metric('target', 'Target metadata', 'info')
128 m.add_sample('target_info', self._target_info, 1)
129 return m
130
131 def get_sample_value(self, name: str, labels: Optional[Dict[str, str]] = None) -> Optional[float]:
132 """Returns the sample value, or None if not found.
133
134 This is inefficient, and intended only for use in unittests.
135 """
136 if labels is None:
137 labels = {}
138 for metric in self.collect():
139 for s in metric.samples:
140 if s.name == name and s.labels == labels:
141 return s.value
142 return None
143
144
145class RestrictedRegistry:
146 def __init__(self, names: Iterable[str], registry: CollectorRegistry):
147 self._name_set = set(names)
148 self._registry = registry
149
150 def collect(self) -> Iterable[Metric]:
151 collectors = set()
152 target_info_metric = None
153 with self._registry._lock:
154 if 'target_info' in self._name_set and self._registry._target_info:
155 target_info_metric = self._registry._target_info_metric()
156 for name in self._name_set:
157 if name != 'target_info' and name in self._registry._names_to_collectors:
158 collectors.add(self._registry._names_to_collectors[name])
159 if target_info_metric:
160 yield target_info_metric
161 for collector in collectors:
162 for metric in collector.collect():
163 m = metric._restricted_metric(self._name_set)
164 if m:
165 yield m
166
167
168REGISTRY = CollectorRegistry(auto_describe=True)