Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prometheus_client/openmetrics/exposition.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

62 statements  

1#!/usr/bin/env python 

2 

3 

4from ..utils import floatToGoString 

5from ..validation import ( 

6 _is_valid_legacy_labelname, _is_valid_legacy_metric_name, 

7) 

8 

9CONTENT_TYPE_LATEST = 'application/openmetrics-text; version=1.0.0; charset=utf-8' 

10"""Content type of the latest OpenMetrics text format""" 

11 

12 

13def _is_valid_exemplar_metric(metric, sample): 

14 if metric.type == 'counter' and sample.name.endswith('_total'): 

15 return True 

16 if metric.type in ('gaugehistogram') and sample.name.endswith('_bucket'): 

17 return True 

18 if metric.type in ('histogram') and sample.name.endswith('_bucket') or sample.name == metric.name: 

19 return True 

20 return False 

21 

22 

23def generate_latest(registry): 

24 '''Returns the metrics from the registry in latest text format as a string.''' 

25 output = [] 

26 for metric in registry.collect(): 

27 try: 

28 mname = metric.name 

29 output.append('# HELP {} {}\n'.format( 

30 escape_metric_name(mname), _escape(metric.documentation))) 

31 output.append(f'# TYPE {escape_metric_name(mname)} {metric.type}\n') 

32 if metric.unit: 

33 output.append(f'# UNIT {escape_metric_name(mname)} {metric.unit}\n') 

34 for s in metric.samples: 

35 if not _is_valid_legacy_metric_name(s.name): 

36 labelstr = escape_metric_name(s.name) 

37 if s.labels: 

38 labelstr += ', ' 

39 else: 

40 labelstr = '' 

41 

42 if s.labels: 

43 items = sorted(s.labels.items()) 

44 labelstr += ','.join( 

45 ['{}="{}"'.format( 

46 escape_label_name(k), _escape(v)) 

47 for k, v in items]) 

48 if labelstr: 

49 labelstr = "{" + labelstr + "}" 

50 

51 if s.exemplar: 

52 if not _is_valid_exemplar_metric(metric, s): 

53 raise ValueError(f"Metric {metric.name} has exemplars, but is not a histogram bucket or counter") 

54 labels = '{{{0}}}'.format(','.join( 

55 ['{}="{}"'.format( 

56 k, v.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"')) 

57 for k, v in sorted(s.exemplar.labels.items())])) 

58 if s.exemplar.timestamp is not None: 

59 exemplarstr = ' # {} {} {}'.format( 

60 labels, 

61 floatToGoString(s.exemplar.value), 

62 s.exemplar.timestamp, 

63 ) 

64 else: 

65 exemplarstr = ' # {} {}'.format( 

66 labels, 

67 floatToGoString(s.exemplar.value), 

68 ) 

69 else: 

70 exemplarstr = '' 

71 timestamp = '' 

72 if s.timestamp is not None: 

73 timestamp = f' {s.timestamp}' 

74 if _is_valid_legacy_metric_name(s.name): 

75 output.append('{}{} {}{}{}\n'.format( 

76 s.name, 

77 labelstr, 

78 floatToGoString(s.value), 

79 timestamp, 

80 exemplarstr, 

81 )) 

82 else: 

83 output.append('{} {}{}{}\n'.format( 

84 labelstr, 

85 floatToGoString(s.value), 

86 timestamp, 

87 exemplarstr, 

88 )) 

89 except Exception as exception: 

90 exception.args = (exception.args or ('',)) + (metric,) 

91 raise 

92 

93 output.append('# EOF\n') 

94 return ''.join(output).encode('utf-8') 

95 

96 

97def escape_metric_name(s: str) -> str: 

98 """Escapes the metric name and puts it in quotes iff the name does not 

99 conform to the legacy Prometheus character set. 

100 """ 

101 if _is_valid_legacy_metric_name(s): 

102 return s 

103 return '"{}"'.format(_escape(s)) 

104 

105 

106def escape_label_name(s: str) -> str: 

107 """Escapes the label name and puts it in quotes iff the name does not 

108 conform to the legacy Prometheus character set. 

109 """ 

110 if _is_valid_legacy_labelname(s): 

111 return s 

112 return '"{}"'.format(_escape(s)) 

113 

114 

115def _escape(s: str) -> str: 

116 """Performs backslash escaping on backslash, newline, and double-quote characters.""" 

117 return s.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"')