1#!/usr/bin/env python
2
3
4from ..utils import floatToGoString
5from ..validation import (
6 _is_valid_legacy_labelname, _is_valid_legacy_metric_name,
7)
8
9CONTENT_TYPE_LATEST = 'application/openmetrics-text; version=1.0.0; charset=utf-8'
10"""Content type of the latest OpenMetrics text format"""
11
12
13def _is_valid_exemplar_metric(metric, sample):
14 if metric.type == 'counter' and sample.name.endswith('_total'):
15 return True
16 if metric.type in ('gaugehistogram') and sample.name.endswith('_bucket'):
17 return True
18 if metric.type in ('histogram') and sample.name.endswith('_bucket') or sample.name == metric.name:
19 return True
20 return False
21
22
23def generate_latest(registry):
24 '''Returns the metrics from the registry in latest text format as a string.'''
25 output = []
26 for metric in registry.collect():
27 try:
28 mname = metric.name
29 output.append('# HELP {} {}\n'.format(
30 escape_metric_name(mname), _escape(metric.documentation)))
31 output.append(f'# TYPE {escape_metric_name(mname)} {metric.type}\n')
32 if metric.unit:
33 output.append(f'# UNIT {escape_metric_name(mname)} {metric.unit}\n')
34 for s in metric.samples:
35 if not _is_valid_legacy_metric_name(s.name):
36 labelstr = escape_metric_name(s.name)
37 if s.labels:
38 labelstr += ', '
39 else:
40 labelstr = ''
41
42 if s.labels:
43 items = sorted(s.labels.items())
44 labelstr += ','.join(
45 ['{}="{}"'.format(
46 escape_label_name(k), _escape(v))
47 for k, v in items])
48 if labelstr:
49 labelstr = "{" + labelstr + "}"
50
51 if s.exemplar:
52 if not _is_valid_exemplar_metric(metric, s):
53 raise ValueError(f"Metric {metric.name} has exemplars, but is not a histogram bucket or counter")
54 labels = '{{{0}}}'.format(','.join(
55 ['{}="{}"'.format(
56 k, v.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"'))
57 for k, v in sorted(s.exemplar.labels.items())]))
58 if s.exemplar.timestamp is not None:
59 exemplarstr = ' # {} {} {}'.format(
60 labels,
61 floatToGoString(s.exemplar.value),
62 s.exemplar.timestamp,
63 )
64 else:
65 exemplarstr = ' # {} {}'.format(
66 labels,
67 floatToGoString(s.exemplar.value),
68 )
69 else:
70 exemplarstr = ''
71 timestamp = ''
72 if s.timestamp is not None:
73 timestamp = f' {s.timestamp}'
74 if _is_valid_legacy_metric_name(s.name):
75 output.append('{}{} {}{}{}\n'.format(
76 s.name,
77 labelstr,
78 floatToGoString(s.value),
79 timestamp,
80 exemplarstr,
81 ))
82 else:
83 output.append('{} {}{}{}\n'.format(
84 labelstr,
85 floatToGoString(s.value),
86 timestamp,
87 exemplarstr,
88 ))
89 except Exception as exception:
90 exception.args = (exception.args or ('',)) + (metric,)
91 raise
92
93 output.append('# EOF\n')
94 return ''.join(output).encode('utf-8')
95
96
97def escape_metric_name(s: str) -> str:
98 """Escapes the metric name and puts it in quotes iff the name does not
99 conform to the legacy Prometheus character set.
100 """
101 if _is_valid_legacy_metric_name(s):
102 return s
103 return '"{}"'.format(_escape(s))
104
105
106def escape_label_name(s: str) -> str:
107 """Escapes the label name and puts it in quotes iff the name does not
108 conform to the legacy Prometheus character set.
109 """
110 if _is_valid_legacy_labelname(s):
111 return s
112 return '"{}"'.format(_escape(s))
113
114
115def _escape(s: str) -> str:
116 """Performs backslash escaping on backslash, newline, and double-quote characters."""
117 return s.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"')