Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prometheus_client/registry.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

110 statements  

1from abc import ABC, abstractmethod 

2import copy 

3from threading import Lock 

4from typing import Dict, Iterable, List, Optional 

5 

6from .metrics_core import Metric 

7 

8 

9# Ideally this would be a Protocol, but Protocols are only available in Python >= 3.8. 

10class Collector(ABC): 

11 @abstractmethod 

12 def collect(self) -> Iterable[Metric]: 

13 pass 

14 

15 

16class _EmptyCollector(Collector): 

17 def collect(self) -> Iterable[Metric]: 

18 return [] 

19 

20 

21class CollectorRegistry(Collector): 

22 """Metric collector registry. 

23 

24 Collectors must have a no-argument method 'collect' that returns a list of 

25 Metric objects. The returned metrics should be consistent with the Prometheus 

26 exposition formats. 

27 """ 

28 

29 def __init__(self, auto_describe: bool = False, target_info: Optional[Dict[str, str]] = None): 

30 self._collector_to_names: Dict[Collector, List[str]] = {} 

31 self._names_to_collectors: Dict[str, Collector] = {} 

32 self._auto_describe = auto_describe 

33 self._lock = Lock() 

34 self._target_info: Optional[Dict[str, str]] = {} 

35 self.set_target_info(target_info) 

36 

37 def register(self, collector: Collector) -> None: 

38 """Add a collector to the registry.""" 

39 with self._lock: 

40 names = self._get_names(collector) 

41 duplicates = set(self._names_to_collectors).intersection(names) 

42 if duplicates: 

43 raise ValueError( 

44 'Duplicated timeseries in CollectorRegistry: {}'.format( 

45 duplicates)) 

46 for name in names: 

47 self._names_to_collectors[name] = collector 

48 self._collector_to_names[collector] = names 

49 

50 def unregister(self, collector: Collector) -> None: 

51 """Remove a collector from the registry.""" 

52 with self._lock: 

53 for name in self._collector_to_names[collector]: 

54 del self._names_to_collectors[name] 

55 del self._collector_to_names[collector] 

56 

57 def _get_names(self, collector): 

58 """Get names of timeseries the collector produces and clashes with.""" 

59 desc_func = None 

60 # If there's a describe function, use it. 

61 try: 

62 desc_func = collector.describe 

63 except AttributeError: 

64 pass 

65 # Otherwise, if auto describe is enabled use the collect function. 

66 if not desc_func and self._auto_describe: 

67 desc_func = collector.collect 

68 

69 if not desc_func: 

70 return [] 

71 

72 result = [] 

73 type_suffixes = { 

74 'counter': ['_total', '_created'], 

75 'summary': ['_sum', '_count', '_created'], 

76 'histogram': ['_bucket', '_sum', '_count', '_created'], 

77 'gaugehistogram': ['_bucket', '_gsum', '_gcount'], 

78 'info': ['_info'], 

79 } 

80 for metric in desc_func(): 

81 result.append(metric.name) 

82 for suffix in type_suffixes.get(metric.type, []): 

83 result.append(metric.name + suffix) 

84 return result 

85 

86 def collect(self) -> Iterable[Metric]: 

87 """Yields metrics from the collectors in the registry.""" 

88 collectors = None 

89 ti = None 

90 with self._lock: 

91 collectors = copy.copy(self._collector_to_names) 

92 if self._target_info: 

93 ti = self._target_info_metric() 

94 if ti: 

95 yield ti 

96 for collector in collectors: 

97 yield from collector.collect() 

98 

99 def restricted_registry(self, names: Iterable[str]) -> "RestrictedRegistry": 

100 """Returns object that only collects some metrics. 

101 

102 Returns an object which upon collect() will return 

103 only samples with the given names. 

104 

105 Intended usage is: 

106 generate_latest(REGISTRY.restricted_registry(['a_timeseries'])) 

107 

108 Experimental.""" 

109 names = set(names) 

110 return RestrictedRegistry(names, self) 

111 

112 def set_target_info(self, labels: Optional[Dict[str, str]]) -> None: 

113 with self._lock: 

114 if labels: 

115 if not self._target_info and 'target_info' in self._names_to_collectors: 

116 raise ValueError('CollectorRegistry already contains a target_info metric') 

117 self._names_to_collectors['target_info'] = _EmptyCollector() 

118 elif self._target_info: 

119 self._names_to_collectors.pop('target_info', None) 

120 self._target_info = labels 

121 

122 def get_target_info(self) -> Optional[Dict[str, str]]: 

123 with self._lock: 

124 return self._target_info 

125 

126 def _target_info_metric(self): 

127 m = Metric('target', 'Target metadata', 'info') 

128 m.add_sample('target_info', self._target_info, 1) 

129 return m 

130 

131 def get_sample_value(self, name: str, labels: Optional[Dict[str, str]] = None) -> Optional[float]: 

132 """Returns the sample value, or None if not found. 

133 

134 This is inefficient, and intended only for use in unittests. 

135 """ 

136 if labels is None: 

137 labels = {} 

138 for metric in self.collect(): 

139 for s in metric.samples: 

140 if s.name == name and s.labels == labels: 

141 return s.value 

142 return None 

143 

144 

145class RestrictedRegistry: 

146 def __init__(self, names: Iterable[str], registry: CollectorRegistry): 

147 self._name_set = set(names) 

148 self._registry = registry 

149 

150 def collect(self) -> Iterable[Metric]: 

151 collectors = set() 

152 target_info_metric = None 

153 with self._registry._lock: 

154 if 'target_info' in self._name_set and self._registry._target_info: 

155 target_info_metric = self._registry._target_info_metric() 

156 for name in self._name_set: 

157 if name != 'target_info' and name in self._registry._names_to_collectors: 

158 collectors.add(self._registry._names_to_collectors[name]) 

159 if target_info_metric: 

160 yield target_info_metric 

161 for collector in collectors: 

162 for metric in collector.collect(): 

163 m = metric._restricted_metric(self._name_set) 

164 if m: 

165 yield m 

166 

167 

168REGISTRY = CollectorRegistry(auto_describe=True)