Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prometheus_client/openmetrics/exposition.py: 13%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

186 statements  

1#!/usr/bin/env python 

2 

3from io import StringIO 

4from sys import maxunicode 

5from typing import Callable 

6 

7from ..utils import floatToGoString, parse_version 

8from ..validation import ( 

9 _is_valid_legacy_labelname, _is_valid_legacy_metric_name, 

10) 

11 

12CONTENT_TYPE_LATEST = 'application/openmetrics-text; version=1.0.0; charset=utf-8' 

13"""Content type of the latest OpenMetrics 1.0 text format""" 

14CONTENT_TYPE_LATEST_2_0 = 'application/openmetrics-text; version=2.0.0; charset=utf-8' 

15"""Content type of the OpenMetrics 2.0 text format""" 

16ESCAPING_HEADER_TAG = 'escaping' 

17 

18 

19ALLOWUTF8 = 'allow-utf-8' 

20UNDERSCORES = 'underscores' 

21DOTS = 'dots' 

22VALUES = 'values' 

23 

24 

25def _is_valid_exemplar_metric(metric, sample): 

26 if metric.type == 'counter' and sample.name.endswith('_total'): 

27 return True 

28 if metric.type in ('gaugehistogram') and sample.name.endswith('_bucket'): 

29 return True 

30 if metric.type in ('histogram') and sample.name.endswith('_bucket') or sample.name == metric.name: 

31 return True 

32 return False 

33 

34 

35def _compose_exemplar_string(metric, sample, exemplar): 

36 """Constructs an exemplar string.""" 

37 if not _is_valid_exemplar_metric(metric, sample): 

38 raise ValueError(f"Metric {metric.name} has exemplars, but is not a histogram bucket or counter") 

39 labels = '{{{0}}}'.format(','.join( 

40 ['{}="{}"'.format( 

41 k, v.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"')) 

42 for k, v in sorted(exemplar.labels.items())])) 

43 if exemplar.timestamp is not None: 

44 exemplarstr = ' # {} {} {}'.format( 

45 labels, 

46 floatToGoString(exemplar.value), 

47 exemplar.timestamp, 

48 ) 

49 else: 

50 exemplarstr = ' # {} {}'.format( 

51 labels, 

52 floatToGoString(exemplar.value), 

53 ) 

54 

55 return exemplarstr 

56 

57 

58def generate_latest(registry, escaping=UNDERSCORES, version="1.0.0"): 

59 '''Returns the metrics from the registry in latest text format as a string.''' 

60 output = [] 

61 for metric in registry.collect(): 

62 try: 

63 mname = metric.name 

64 output.append('# HELP {} {}\n'.format( 

65 escape_metric_name(mname, escaping), _escape(metric.documentation, ALLOWUTF8, _is_legacy_labelname_rune))) 

66 output.append(f'# TYPE {escape_metric_name(mname, escaping)} {metric.type}\n') 

67 if metric.unit: 

68 output.append(f'# UNIT {escape_metric_name(mname, escaping)} {metric.unit}\n') 

69 for s in metric.samples: 

70 if escaping == ALLOWUTF8 and not _is_valid_legacy_metric_name(s.name): 

71 labelstr = escape_metric_name(s.name, escaping) 

72 if s.labels: 

73 labelstr += ',' 

74 else: 

75 labelstr = '' 

76 

77 if s.labels: 

78 items = sorted(s.labels.items()) 

79 # Label values always support UTF-8 

80 labelstr += ','.join( 

81 ['{}="{}"'.format( 

82 escape_label_name(k, escaping), _escape(v, ALLOWUTF8, _is_legacy_labelname_rune)) 

83 for k, v in items]) 

84 if labelstr: 

85 labelstr = "{" + labelstr + "}" 

86 if s.exemplar: 

87 exemplarstr = _compose_exemplar_string(metric, s, s.exemplar) 

88 else: 

89 exemplarstr = '' 

90 timestamp = '' 

91 if s.timestamp is not None: 

92 timestamp = f' {s.timestamp}' 

93 

94 # Skip native histogram samples entirely if version < 2.0.0 

95 if s.native_histogram and parse_version(version) < (2, 0, 0): 

96 continue 

97 

98 native_histogram = '' 

99 negative_spans = '' 

100 negative_deltas = '' 

101 positive_spans = '' 

102 positive_deltas = '' 

103 

104 if s.native_histogram: 

105 # Initialize basic nh template 

106 nh_sample_template = '{{count:{},sum:{},schema:{},zero_threshold:{},zero_count:{}' 

107 

108 args = [ 

109 s.native_histogram.count_value, 

110 s.native_histogram.sum_value, 

111 s.native_histogram.schema, 

112 s.native_histogram.zero_threshold, 

113 s.native_histogram.zero_count, 

114 ] 

115 

116 # If there are neg spans, append them and the neg deltas to the template and args 

117 if s.native_histogram.neg_spans: 

118 negative_spans = ','.join([f'{ns[0]}:{ns[1]}' for ns in s.native_histogram.neg_spans]) 

119 negative_deltas = ','.join(str(nd) for nd in s.native_histogram.neg_deltas) 

120 nh_sample_template += ',negative_spans:[{}]' 

121 args.append(negative_spans) 

122 nh_sample_template += ',negative_deltas:[{}]' 

123 args.append(negative_deltas) 

124 

125 # If there are pos spans, append them and the pos spans to the template and args 

126 if s.native_histogram.pos_spans: 

127 positive_spans = ','.join([f'{ps[0]}:{ps[1]}' for ps in s.native_histogram.pos_spans]) 

128 positive_deltas = ','.join(f'{pd}' for pd in s.native_histogram.pos_deltas) 

129 nh_sample_template += ',positive_spans:[{}]' 

130 args.append(positive_spans) 

131 nh_sample_template += ',positive_deltas:[{}]' 

132 args.append(positive_deltas) 

133 

134 # Add closing brace 

135 nh_sample_template += '}}' 

136 

137 # Format the template with the args 

138 native_histogram = nh_sample_template.format(*args) 

139 

140 if s.native_histogram.nh_exemplars: 

141 for nh_ex in s.native_histogram.nh_exemplars: 

142 nh_exemplarstr = _compose_exemplar_string(metric, s, nh_ex) 

143 exemplarstr += nh_exemplarstr 

144 

145 value = '' 

146 if s.native_histogram: 

147 value = native_histogram 

148 elif s.value is not None: 

149 value = floatToGoString(s.value) 

150 if (escaping != ALLOWUTF8) or _is_valid_legacy_metric_name(s.name): 

151 output.append('{}{} {}{}{}\n'.format( 

152 _escape(s.name, escaping, _is_legacy_labelname_rune), 

153 labelstr, 

154 value, 

155 timestamp, 

156 exemplarstr 

157 )) 

158 else: 

159 output.append('{} {}{}{}\n'.format( 

160 labelstr, 

161 value, 

162 timestamp, 

163 exemplarstr 

164 )) 

165 except Exception as exception: 

166 exception.args = (exception.args or ('',)) + (metric,) 

167 raise 

168 

169 output.append('# EOF\n') 

170 return ''.join(output).encode('utf-8') 

171 

172 

173def escape_metric_name(s: str, escaping: str = UNDERSCORES) -> str: 

174 """Escapes the metric name and puts it in quotes iff the name does not 

175 conform to the legacy Prometheus character set. 

176 """ 

177 if len(s) == 0: 

178 return s 

179 if escaping == ALLOWUTF8: 

180 if not _is_valid_legacy_metric_name(s): 

181 return '"{}"'.format(_escape(s, escaping, _is_legacy_metric_rune)) 

182 return _escape(s, escaping, _is_legacy_metric_rune) 

183 elif escaping == UNDERSCORES: 

184 if _is_valid_legacy_metric_name(s): 

185 return s 

186 return _escape(s, escaping, _is_legacy_metric_rune) 

187 elif escaping == DOTS: 

188 return _escape(s, escaping, _is_legacy_metric_rune) 

189 elif escaping == VALUES: 

190 if _is_valid_legacy_metric_name(s): 

191 return s 

192 return _escape(s, escaping, _is_legacy_metric_rune) 

193 return s 

194 

195 

196def escape_label_name(s: str, escaping: str = UNDERSCORES) -> str: 

197 """Escapes the label name and puts it in quotes iff the name does not 

198 conform to the legacy Prometheus character set. 

199 """ 

200 if len(s) == 0: 

201 return s 

202 if escaping == ALLOWUTF8: 

203 if not _is_valid_legacy_labelname(s): 

204 return '"{}"'.format(_escape(s, escaping, _is_legacy_labelname_rune)) 

205 return _escape(s, escaping, _is_legacy_labelname_rune) 

206 elif escaping == UNDERSCORES: 

207 if _is_valid_legacy_labelname(s): 

208 return s 

209 return _escape(s, escaping, _is_legacy_labelname_rune) 

210 elif escaping == DOTS: 

211 return _escape(s, escaping, _is_legacy_labelname_rune) 

212 elif escaping == VALUES: 

213 if _is_valid_legacy_labelname(s): 

214 return s 

215 return _escape(s, escaping, _is_legacy_labelname_rune) 

216 return s 

217 

218 

219def _escape(s: str, escaping: str, valid_rune_fn: Callable[[str, int], bool]) -> str: 

220 """Performs backslash escaping on backslash, newline, and double-quote characters. 

221 

222 valid_rune_fn takes the input character and its index in the containing string.""" 

223 if escaping == ALLOWUTF8: 

224 return s.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"') 

225 elif escaping == UNDERSCORES: 

226 escaped = StringIO() 

227 for i, b in enumerate(s): 

228 if valid_rune_fn(b, i): 

229 escaped.write(b) 

230 else: 

231 escaped.write('_') 

232 return escaped.getvalue() 

233 elif escaping == DOTS: 

234 escaped = StringIO() 

235 for i, b in enumerate(s): 

236 if b == '_': 

237 escaped.write('__') 

238 elif b == '.': 

239 escaped.write('_dot_') 

240 elif valid_rune_fn(b, i): 

241 escaped.write(b) 

242 else: 

243 escaped.write('__') 

244 return escaped.getvalue() 

245 elif escaping == VALUES: 

246 escaped = StringIO() 

247 escaped.write("U__") 

248 for i, b in enumerate(s): 

249 if b == '_': 

250 escaped.write("__") 

251 elif valid_rune_fn(b, i): 

252 escaped.write(b) 

253 elif not _is_valid_utf8(b): 

254 escaped.write("_FFFD_") 

255 else: 

256 escaped.write('_') 

257 escaped.write(format(ord(b), 'x')) 

258 escaped.write('_') 

259 return escaped.getvalue() 

260 return s 

261 

262 

263def _is_legacy_metric_rune(b: str, i: int) -> bool: 

264 return _is_legacy_labelname_rune(b, i) or b == ':' 

265 

266 

267def _is_legacy_labelname_rune(b: str, i: int) -> bool: 

268 if len(b) != 1: 

269 raise ValueError("Input 'b' must be a single character.") 

270 return ( 

271 ('a' <= b <= 'z') 

272 or ('A' <= b <= 'Z') 

273 or (b == '_') 

274 or ('0' <= b <= '9' and i > 0) 

275 ) 

276 

277 

278_SURROGATE_MIN = 0xD800 

279_SURROGATE_MAX = 0xDFFF 

280 

281 

282def _is_valid_utf8(s: str) -> bool: 

283 if 0 <= ord(s) < _SURROGATE_MIN: 

284 return True 

285 if _SURROGATE_MAX < ord(s) <= maxunicode: 

286 return True 

287 return False