1import copy
2from threading import Lock
3from typing import Dict, Iterable, List, Optional, Protocol
4
5from .metrics_core import Metric
6
7
8class Collector(Protocol):
9 def collect(self) -> Iterable[Metric]:
10 """Collect metrics."""
11
12
13class _EmptyCollector:
14 def collect(self) -> Iterable[Metric]:
15 return []
16
17
18class CollectorRegistry:
19 """Metric collector registry.
20
21 Collectors must have a no-argument method 'collect' that returns a list of
22 Metric objects. The returned metrics should be consistent with the Prometheus
23 exposition formats.
24 """
25
26 def __init__(self, auto_describe: bool = False, target_info: Optional[Dict[str, str]] = None):
27 self._collector_to_names: Dict[Collector, List[str]] = {}
28 self._names_to_collectors: Dict[str, Collector] = {}
29 self._auto_describe = auto_describe
30 self._lock = Lock()
31 self._target_info: Optional[Dict[str, str]] = {}
32 self.set_target_info(target_info)
33
34 def register(self, collector: Collector) -> None:
35 """Add a collector to the registry."""
36 with self._lock:
37 names = self._get_names(collector)
38 duplicates = set(self._names_to_collectors).intersection(names)
39 if duplicates:
40 raise ValueError(
41 'Duplicated timeseries in CollectorRegistry: {}'.format(
42 duplicates))
43 for name in names:
44 self._names_to_collectors[name] = collector
45 self._collector_to_names[collector] = names
46
47 def unregister(self, collector: Collector) -> None:
48 """Remove a collector from the registry."""
49 with self._lock:
50 for name in self._collector_to_names[collector]:
51 del self._names_to_collectors[name]
52 del self._collector_to_names[collector]
53
54 def _get_names(self, collector):
55 """Get names of timeseries the collector produces and clashes with."""
56 desc_func = None
57 # If there's a describe function, use it.
58 try:
59 desc_func = collector.describe
60 except AttributeError:
61 pass
62 # Otherwise, if auto describe is enabled use the collect function.
63 if not desc_func and self._auto_describe:
64 desc_func = collector.collect
65
66 if not desc_func:
67 return []
68
69 result = []
70 type_suffixes = {
71 'counter': ['_total', '_created'],
72 'summary': ['_sum', '_count', '_created'],
73 'histogram': ['_bucket', '_sum', '_count', '_created'],
74 'gaugehistogram': ['_bucket', '_gsum', '_gcount'],
75 'info': ['_info'],
76 }
77 for metric in desc_func():
78 result.append(metric.name)
79 for suffix in type_suffixes.get(metric.type, []):
80 result.append(metric.name + suffix)
81 return result
82
83 def collect(self) -> Iterable[Metric]:
84 """Yields metrics from the collectors in the registry."""
85 collectors = None
86 ti = None
87 with self._lock:
88 collectors = copy.copy(self._collector_to_names)
89 if self._target_info:
90 ti = self._target_info_metric()
91 if ti:
92 yield ti
93 for collector in collectors:
94 yield from collector.collect()
95
96 def restricted_registry(self, names: Iterable[str]) -> "RestrictedRegistry":
97 """Returns object that only collects some metrics.
98
99 Returns an object which upon collect() will return
100 only samples with the given names.
101
102 Intended usage is:
103 generate_latest(REGISTRY.restricted_registry(['a_timeseries']), escaping)
104
105 Experimental."""
106 names = set(names)
107 return RestrictedRegistry(names, self)
108
109 def set_target_info(self, labels: Optional[Dict[str, str]]) -> None:
110 with self._lock:
111 if labels:
112 if not self._target_info and 'target_info' in self._names_to_collectors:
113 raise ValueError('CollectorRegistry already contains a target_info metric')
114 self._names_to_collectors['target_info'] = _EmptyCollector()
115 elif self._target_info:
116 self._names_to_collectors.pop('target_info', None)
117 self._target_info = labels
118
119 def get_target_info(self) -> Optional[Dict[str, str]]:
120 with self._lock:
121 return self._target_info
122
123 def _target_info_metric(self):
124 m = Metric('target', 'Target metadata', 'info')
125 m.add_sample('target_info', self._target_info, 1)
126 return m
127
128 def get_sample_value(self, name: str, labels: Optional[Dict[str, str]] = None) -> Optional[float]:
129 """Returns the sample value, or None if not found.
130
131 This is inefficient, and intended only for use in unittests.
132 """
133 if labels is None:
134 labels = {}
135 for metric in self.collect():
136 for s in metric.samples:
137 if s.name == name and s.labels == labels:
138 return s.value
139 return None
140
141
142class RestrictedRegistry:
143 def __init__(self, names: Iterable[str], registry: CollectorRegistry):
144 self._name_set = set(names)
145 self._registry = registry
146
147 def collect(self) -> Iterable[Metric]:
148 collectors = set()
149 target_info_metric = None
150 with self._registry._lock:
151 if 'target_info' in self._name_set and self._registry._target_info:
152 target_info_metric = self._registry._target_info_metric()
153 for name in self._name_set:
154 if name != 'target_info' and name in self._registry._names_to_collectors:
155 collectors.add(self._registry._names_to_collectors[name])
156 if target_info_metric:
157 yield target_info_metric
158 for collector in collectors:
159 for metric in collector.collect():
160 m = metric._restricted_metric(self._name_set)
161 if m:
162 yield m
163
164
165REGISTRY = CollectorRegistry(auto_describe=True)