Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prometheus_client/registry.py: 51%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

107 statements  

1import copy 

2from threading import Lock 

3from typing import Dict, Iterable, List, Optional, Protocol 

4 

5from .metrics_core import Metric 

6 

7 

8class Collector(Protocol): 

9 def collect(self) -> Iterable[Metric]: 

10 """Collect metrics.""" 

11 

12 

13class _EmptyCollector: 

14 def collect(self) -> Iterable[Metric]: 

15 return [] 

16 

17 

18class CollectorRegistry: 

19 """Metric collector registry. 

20 

21 Collectors must have a no-argument method 'collect' that returns a list of 

22 Metric objects. The returned metrics should be consistent with the Prometheus 

23 exposition formats. 

24 """ 

25 

26 def __init__(self, auto_describe: bool = False, target_info: Optional[Dict[str, str]] = None): 

27 self._collector_to_names: Dict[Collector, List[str]] = {} 

28 self._names_to_collectors: Dict[str, Collector] = {} 

29 self._auto_describe = auto_describe 

30 self._lock = Lock() 

31 self._target_info: Optional[Dict[str, str]] = {} 

32 self.set_target_info(target_info) 

33 

34 def register(self, collector: Collector) -> None: 

35 """Add a collector to the registry.""" 

36 with self._lock: 

37 names = self._get_names(collector) 

38 duplicates = set(self._names_to_collectors).intersection(names) 

39 if duplicates: 

40 raise ValueError( 

41 'Duplicated timeseries in CollectorRegistry: {}'.format( 

42 duplicates)) 

43 for name in names: 

44 self._names_to_collectors[name] = collector 

45 self._collector_to_names[collector] = names 

46 

47 def unregister(self, collector: Collector) -> None: 

48 """Remove a collector from the registry.""" 

49 with self._lock: 

50 for name in self._collector_to_names[collector]: 

51 del self._names_to_collectors[name] 

52 del self._collector_to_names[collector] 

53 

54 def _get_names(self, collector): 

55 """Get names of timeseries the collector produces and clashes with.""" 

56 desc_func = None 

57 # If there's a describe function, use it. 

58 try: 

59 desc_func = collector.describe 

60 except AttributeError: 

61 pass 

62 # Otherwise, if auto describe is enabled use the collect function. 

63 if not desc_func and self._auto_describe: 

64 desc_func = collector.collect 

65 

66 if not desc_func: 

67 return [] 

68 

69 result = [] 

70 type_suffixes = { 

71 'counter': ['_total', '_created'], 

72 'summary': ['_sum', '_count', '_created'], 

73 'histogram': ['_bucket', '_sum', '_count', '_created'], 

74 'gaugehistogram': ['_bucket', '_gsum', '_gcount'], 

75 'info': ['_info'], 

76 } 

77 for metric in desc_func(): 

78 result.append(metric.name) 

79 for suffix in type_suffixes.get(metric.type, []): 

80 result.append(metric.name + suffix) 

81 return result 

82 

83 def collect(self) -> Iterable[Metric]: 

84 """Yields metrics from the collectors in the registry.""" 

85 collectors = None 

86 ti = None 

87 with self._lock: 

88 collectors = copy.copy(self._collector_to_names) 

89 if self._target_info: 

90 ti = self._target_info_metric() 

91 if ti: 

92 yield ti 

93 for collector in collectors: 

94 yield from collector.collect() 

95 

96 def restricted_registry(self, names: Iterable[str]) -> "RestrictedRegistry": 

97 """Returns object that only collects some metrics. 

98 

99 Returns an object which upon collect() will return 

100 only samples with the given names. 

101 

102 Intended usage is: 

103 generate_latest(REGISTRY.restricted_registry(['a_timeseries']), escaping) 

104 

105 Experimental.""" 

106 names = set(names) 

107 return RestrictedRegistry(names, self) 

108 

109 def set_target_info(self, labels: Optional[Dict[str, str]]) -> None: 

110 with self._lock: 

111 if labels: 

112 if not self._target_info and 'target_info' in self._names_to_collectors: 

113 raise ValueError('CollectorRegistry already contains a target_info metric') 

114 self._names_to_collectors['target_info'] = _EmptyCollector() 

115 elif self._target_info: 

116 self._names_to_collectors.pop('target_info', None) 

117 self._target_info = labels 

118 

119 def get_target_info(self) -> Optional[Dict[str, str]]: 

120 with self._lock: 

121 return self._target_info 

122 

123 def _target_info_metric(self): 

124 m = Metric('target', 'Target metadata', 'info') 

125 m.add_sample('target_info', self._target_info, 1) 

126 return m 

127 

128 def get_sample_value(self, name: str, labels: Optional[Dict[str, str]] = None) -> Optional[float]: 

129 """Returns the sample value, or None if not found. 

130 

131 This is inefficient, and intended only for use in unittests. 

132 """ 

133 if labels is None: 

134 labels = {} 

135 for metric in self.collect(): 

136 for s in metric.samples: 

137 if s.name == name and s.labels == labels: 

138 return s.value 

139 return None 

140 

141 

142class RestrictedRegistry: 

143 def __init__(self, names: Iterable[str], registry: CollectorRegistry): 

144 self._name_set = set(names) 

145 self._registry = registry 

146 

147 def collect(self) -> Iterable[Metric]: 

148 collectors = set() 

149 target_info_metric = None 

150 with self._registry._lock: 

151 if 'target_info' in self._name_set and self._registry._target_info: 

152 target_info_metric = self._registry._target_info_metric() 

153 for name in self._name_set: 

154 if name != 'target_info' and name in self._registry._names_to_collectors: 

155 collectors.add(self._registry._names_to_collectors[name]) 

156 if target_info_metric: 

157 yield target_info_metric 

158 for collector in collectors: 

159 for metric in collector.collect(): 

160 m = metric._restricted_metric(self._name_set) 

161 if m: 

162 yield m 

163 

164 

165REGISTRY = CollectorRegistry(auto_describe=True)