1#!/usr/bin/env python
2
3from io import StringIO
4from sys import maxunicode
5from typing import Callable
6
7from ..utils import floatToGoString, parse_version
8from ..validation import (
9 _is_valid_legacy_labelname, _is_valid_legacy_metric_name,
10)
11
12CONTENT_TYPE_LATEST = 'application/openmetrics-text; version=1.0.0; charset=utf-8'
13"""Content type of the latest OpenMetrics 1.0 text format"""
14CONTENT_TYPE_LATEST_2_0 = 'application/openmetrics-text; version=2.0.0; charset=utf-8'
15"""Content type of the OpenMetrics 2.0 text format"""
16ESCAPING_HEADER_TAG = 'escaping'
17
18
19ALLOWUTF8 = 'allow-utf-8'
20UNDERSCORES = 'underscores'
21DOTS = 'dots'
22VALUES = 'values'
23
24
25def _is_valid_exemplar_metric(metric, sample):
26 if metric.type == 'counter' and sample.name.endswith('_total'):
27 return True
28 if metric.type in ('gaugehistogram') and sample.name.endswith('_bucket'):
29 return True
30 if metric.type in ('histogram') and sample.name.endswith('_bucket') or sample.name == metric.name:
31 return True
32 return False
33
34
35def _compose_exemplar_string(metric, sample, exemplar):
36 """Constructs an exemplar string."""
37 if not _is_valid_exemplar_metric(metric, sample):
38 raise ValueError(f"Metric {metric.name} has exemplars, but is not a histogram bucket or counter")
39 labels = '{{{0}}}'.format(','.join(
40 ['{}="{}"'.format(
41 k, v.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"'))
42 for k, v in sorted(exemplar.labels.items())]))
43 if exemplar.timestamp is not None:
44 exemplarstr = ' # {} {} {}'.format(
45 labels,
46 floatToGoString(exemplar.value),
47 exemplar.timestamp,
48 )
49 else:
50 exemplarstr = ' # {} {}'.format(
51 labels,
52 floatToGoString(exemplar.value),
53 )
54
55 return exemplarstr
56
57
58def generate_latest(registry, escaping=UNDERSCORES, version="1.0.0"):
59 '''Returns the metrics from the registry in latest text format as a string.'''
60 output = []
61 for metric in registry.collect():
62 try:
63 mname = metric.name
64 output.append('# HELP {} {}\n'.format(
65 escape_metric_name(mname, escaping), _escape(metric.documentation, ALLOWUTF8, _is_legacy_labelname_rune)))
66 output.append(f'# TYPE {escape_metric_name(mname, escaping)} {metric.type}\n')
67 if metric.unit:
68 output.append(f'# UNIT {escape_metric_name(mname, escaping)} {metric.unit}\n')
69 for s in metric.samples:
70 if escaping == ALLOWUTF8 and not _is_valid_legacy_metric_name(s.name):
71 labelstr = escape_metric_name(s.name, escaping)
72 if s.labels:
73 labelstr += ','
74 else:
75 labelstr = ''
76
77 if s.labels:
78 items = sorted(s.labels.items())
79 # Label values always support UTF-8
80 labelstr += ','.join(
81 ['{}="{}"'.format(
82 escape_label_name(k, escaping), _escape(v, ALLOWUTF8, _is_legacy_labelname_rune))
83 for k, v in items])
84 if labelstr:
85 labelstr = "{" + labelstr + "}"
86 if s.exemplar:
87 exemplarstr = _compose_exemplar_string(metric, s, s.exemplar)
88 else:
89 exemplarstr = ''
90 timestamp = ''
91 if s.timestamp is not None:
92 timestamp = f' {s.timestamp}'
93
94 # Skip native histogram samples entirely if version < 2.0.0
95 if s.native_histogram and parse_version(version) < (2, 0, 0):
96 continue
97
98 native_histogram = ''
99 negative_spans = ''
100 negative_deltas = ''
101 positive_spans = ''
102 positive_deltas = ''
103
104 if s.native_histogram:
105 # Initialize basic nh template
106 nh_sample_template = '{{count:{},sum:{},schema:{},zero_threshold:{},zero_count:{}'
107
108 args = [
109 s.native_histogram.count_value,
110 s.native_histogram.sum_value,
111 s.native_histogram.schema,
112 s.native_histogram.zero_threshold,
113 s.native_histogram.zero_count,
114 ]
115
116 # If there are neg spans, append them and the neg deltas to the template and args
117 if s.native_histogram.neg_spans:
118 negative_spans = ','.join([f'{ns[0]}:{ns[1]}' for ns in s.native_histogram.neg_spans])
119 negative_deltas = ','.join(str(nd) for nd in s.native_histogram.neg_deltas)
120 nh_sample_template += ',negative_spans:[{}]'
121 args.append(negative_spans)
122 nh_sample_template += ',negative_deltas:[{}]'
123 args.append(negative_deltas)
124
125 # If there are pos spans, append them and the pos spans to the template and args
126 if s.native_histogram.pos_spans:
127 positive_spans = ','.join([f'{ps[0]}:{ps[1]}' for ps in s.native_histogram.pos_spans])
128 positive_deltas = ','.join(f'{pd}' for pd in s.native_histogram.pos_deltas)
129 nh_sample_template += ',positive_spans:[{}]'
130 args.append(positive_spans)
131 nh_sample_template += ',positive_deltas:[{}]'
132 args.append(positive_deltas)
133
134 # Add closing brace
135 nh_sample_template += '}}'
136
137 # Format the template with the args
138 native_histogram = nh_sample_template.format(*args)
139
140 if s.native_histogram.nh_exemplars:
141 for nh_ex in s.native_histogram.nh_exemplars:
142 nh_exemplarstr = _compose_exemplar_string(metric, s, nh_ex)
143 exemplarstr += nh_exemplarstr
144
145 value = ''
146 if s.native_histogram:
147 value = native_histogram
148 elif s.value is not None:
149 value = floatToGoString(s.value)
150 if (escaping != ALLOWUTF8) or _is_valid_legacy_metric_name(s.name):
151 output.append('{}{} {}{}{}\n'.format(
152 _escape(s.name, escaping, _is_legacy_labelname_rune),
153 labelstr,
154 value,
155 timestamp,
156 exemplarstr
157 ))
158 else:
159 output.append('{} {}{}{}\n'.format(
160 labelstr,
161 value,
162 timestamp,
163 exemplarstr
164 ))
165 except Exception as exception:
166 exception.args = (exception.args or ('',)) + (metric,)
167 raise
168
169 output.append('# EOF\n')
170 return ''.join(output).encode('utf-8')
171
172
173def escape_metric_name(s: str, escaping: str = UNDERSCORES) -> str:
174 """Escapes the metric name and puts it in quotes iff the name does not
175 conform to the legacy Prometheus character set.
176 """
177 if len(s) == 0:
178 return s
179 if escaping == ALLOWUTF8:
180 if not _is_valid_legacy_metric_name(s):
181 return '"{}"'.format(_escape(s, escaping, _is_legacy_metric_rune))
182 return _escape(s, escaping, _is_legacy_metric_rune)
183 elif escaping == UNDERSCORES:
184 if _is_valid_legacy_metric_name(s):
185 return s
186 return _escape(s, escaping, _is_legacy_metric_rune)
187 elif escaping == DOTS:
188 return _escape(s, escaping, _is_legacy_metric_rune)
189 elif escaping == VALUES:
190 if _is_valid_legacy_metric_name(s):
191 return s
192 return _escape(s, escaping, _is_legacy_metric_rune)
193 return s
194
195
196def escape_label_name(s: str, escaping: str = UNDERSCORES) -> str:
197 """Escapes the label name and puts it in quotes iff the name does not
198 conform to the legacy Prometheus character set.
199 """
200 if len(s) == 0:
201 return s
202 if escaping == ALLOWUTF8:
203 if not _is_valid_legacy_labelname(s):
204 return '"{}"'.format(_escape(s, escaping, _is_legacy_labelname_rune))
205 return _escape(s, escaping, _is_legacy_labelname_rune)
206 elif escaping == UNDERSCORES:
207 if _is_valid_legacy_labelname(s):
208 return s
209 return _escape(s, escaping, _is_legacy_labelname_rune)
210 elif escaping == DOTS:
211 return _escape(s, escaping, _is_legacy_labelname_rune)
212 elif escaping == VALUES:
213 if _is_valid_legacy_labelname(s):
214 return s
215 return _escape(s, escaping, _is_legacy_labelname_rune)
216 return s
217
218
219def _escape(s: str, escaping: str, valid_rune_fn: Callable[[str, int], bool]) -> str:
220 """Performs backslash escaping on backslash, newline, and double-quote characters.
221
222 valid_rune_fn takes the input character and its index in the containing string."""
223 if escaping == ALLOWUTF8:
224 return s.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"')
225 elif escaping == UNDERSCORES:
226 escaped = StringIO()
227 for i, b in enumerate(s):
228 if valid_rune_fn(b, i):
229 escaped.write(b)
230 else:
231 escaped.write('_')
232 return escaped.getvalue()
233 elif escaping == DOTS:
234 escaped = StringIO()
235 for i, b in enumerate(s):
236 if b == '_':
237 escaped.write('__')
238 elif b == '.':
239 escaped.write('_dot_')
240 elif valid_rune_fn(b, i):
241 escaped.write(b)
242 else:
243 escaped.write('__')
244 return escaped.getvalue()
245 elif escaping == VALUES:
246 escaped = StringIO()
247 escaped.write("U__")
248 for i, b in enumerate(s):
249 if b == '_':
250 escaped.write("__")
251 elif valid_rune_fn(b, i):
252 escaped.write(b)
253 elif not _is_valid_utf8(b):
254 escaped.write("_FFFD_")
255 else:
256 escaped.write('_')
257 escaped.write(format(ord(b), 'x'))
258 escaped.write('_')
259 return escaped.getvalue()
260 return s
261
262
263def _is_legacy_metric_rune(b: str, i: int) -> bool:
264 return _is_legacy_labelname_rune(b, i) or b == ':'
265
266
267def _is_legacy_labelname_rune(b: str, i: int) -> bool:
268 if len(b) != 1:
269 raise ValueError("Input 'b' must be a single character.")
270 return (
271 ('a' <= b <= 'z')
272 or ('A' <= b <= 'Z')
273 or (b == '_')
274 or ('0' <= b <= '9' and i > 0)
275 )
276
277
278_SURROGATE_MIN = 0xD800
279_SURROGATE_MAX = 0xDFFF
280
281
282def _is_valid_utf8(s: str) -> bool:
283 if 0 <= ord(s) < _SURROGATE_MIN:
284 return True
285 if _SURROGATE_MAX < ord(s) <= maxunicode:
286 return True
287 return False