1import base64
2from contextlib import closing
3from functools import partial
4import gzip
5from http.server import BaseHTTPRequestHandler
6import os
7import socket
8from socketserver import ThreadingMixIn
9import ssl
10import sys
11import threading
12from typing import (
13 Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union,
14)
15from urllib.error import HTTPError
16from urllib.parse import parse_qs, quote_plus, urlparse
17from urllib.request import (
18 BaseHandler, build_opener, HTTPHandler, HTTPRedirectHandler, HTTPSHandler,
19 Request,
20)
21from wsgiref.simple_server import make_server, WSGIRequestHandler, WSGIServer
22
23from .openmetrics import exposition as openmetrics
24from .registry import Collector, REGISTRY
25from .utils import floatToGoString, parse_version
26
27try:
28 import snappy # type: ignore
29 SNAPPY_AVAILABLE = True
30except ImportError:
31 snappy = None # type: ignore
32 SNAPPY_AVAILABLE = False
33
34__all__ = (
35 'CONTENT_TYPE_LATEST',
36 'CONTENT_TYPE_PLAIN_0_0_4',
37 'CONTENT_TYPE_PLAIN_1_0_0',
38 'delete_from_gateway',
39 'generate_latest',
40 'instance_ip_grouping_key',
41 'make_asgi_app',
42 'make_wsgi_app',
43 'MetricsHandler',
44 'push_to_gateway',
45 'pushadd_to_gateway',
46 'start_http_server',
47 'start_wsgi_server',
48 'write_to_textfile',
49)
50
51CONTENT_TYPE_PLAIN_0_0_4 = 'text/plain; version=0.0.4; charset=utf-8'
52"""Content type of the compatibility format"""
53
54CONTENT_TYPE_PLAIN_1_0_0 = 'text/plain; version=1.0.0; charset=utf-8'
55"""Content type of the latest format"""
56
57CONTENT_TYPE_LATEST = CONTENT_TYPE_PLAIN_1_0_0
58CompressionType = Optional[Literal['gzip', 'snappy']]
59
60
61class _PrometheusRedirectHandler(HTTPRedirectHandler):
62 """
63 Allow additional methods (e.g. PUT) and data forwarding in redirects.
64
65 Use of this class constitute a user's explicit agreement to the
66 redirect responses the Prometheus client will receive when using it.
67 You should only use this class if you control or otherwise trust the
68 redirect behavior involved and are certain it is safe to full transfer
69 the original request (method and data) to the redirected URL. For
70 example, if you know there is a cosmetic URL redirect in front of a
71 local deployment of a Prometheus server, and all redirects are safe,
72 this is the class to use to handle redirects in that case.
73
74 The standard HTTPRedirectHandler does not forward request data nor
75 does it allow redirected PUT requests (which Prometheus uses for some
76 operations, for example `push_to_gateway`) because these cannot
77 generically guarantee no violations of HTTP RFC 2616 requirements for
78 the user to explicitly confirm redirects that could have unexpected
79 side effects (such as rendering a PUT request non-idempotent or
80 creating multiple resources not named in the original request).
81 """
82
83 def redirect_request(self, req, fp, code, msg, headers, newurl):
84 """
85 Apply redirect logic to a request.
86
87 See parent HTTPRedirectHandler.redirect_request for parameter info.
88
89 If the redirect is disallowed, this raises the corresponding HTTP error.
90 If the redirect can't be determined, return None to allow other handlers
91 to try. If the redirect is allowed, return the new request.
92
93 This method specialized for the case when (a) the user knows that the
94 redirect will not cause unacceptable side effects for any request method,
95 and (b) the user knows that any request data should be passed through to
96 the redirect. If either condition is not met, this should not be used.
97 """
98 # note that requests being provided by a handler will use get_method to
99 # indicate the method, by monkeypatching this, instead of setting the
100 # Request object's method attribute.
101 m = getattr(req, "method", req.get_method())
102 if not (code in (301, 302, 303, 307) and m in ("GET", "HEAD")
103 or code in (301, 302, 303) and m in ("POST", "PUT")):
104 raise HTTPError(req.full_url, code, msg, headers, fp)
105 new_request = Request(
106 newurl.replace(' ', '%20'), # space escaping in new url if needed.
107 headers=req.headers,
108 origin_req_host=req.origin_req_host,
109 unverifiable=True,
110 data=req.data,
111 )
112 new_request.method = m
113 return new_request
114
115
116def _bake_output(registry, accept_header, accept_encoding_header, params, disable_compression):
117 """Bake output for metrics output."""
118 # Choose the correct plain text format of the output.
119 encoder, content_type = choose_encoder(accept_header)
120 if 'name[]' in params:
121 registry = registry.restricted_registry(params['name[]'])
122 output = encoder(registry)
123 headers = [('Content-Type', content_type)]
124 # If gzip encoding required, gzip the output.
125 if not disable_compression and gzip_accepted(accept_encoding_header):
126 output = gzip.compress(output)
127 headers.append(('Content-Encoding', 'gzip'))
128 return '200 OK', headers, output
129
130
131def make_wsgi_app(registry: Collector = REGISTRY, disable_compression: bool = False) -> Callable:
132 """Create a WSGI app which serves the metrics from a registry."""
133
134 def prometheus_app(environ, start_response):
135 # Prepare parameters
136 accept_header = environ.get('HTTP_ACCEPT')
137 accept_encoding_header = environ.get('HTTP_ACCEPT_ENCODING')
138 params = parse_qs(environ.get('QUERY_STRING', ''))
139 method = environ['REQUEST_METHOD']
140
141 if method == 'OPTIONS':
142 status = '200 OK'
143 headers = [('Allow', 'OPTIONS,GET')]
144 output = b''
145 elif method != 'GET':
146 status = '405 Method Not Allowed'
147 headers = [('Allow', 'OPTIONS,GET')]
148 output = '# HTTP {}: {}; use OPTIONS or GET\n'.format(status, method).encode()
149 elif environ['PATH_INFO'] == '/favicon.ico':
150 # Serve empty response for browsers
151 status = '200 OK'
152 headers = []
153 output = b''
154 else:
155 # Note: For backwards compatibility, the URI path for GET is not
156 # constrained to the documented /metrics, but any path is allowed.
157 # Bake output
158 status, headers, output = _bake_output(registry, accept_header, accept_encoding_header, params, disable_compression)
159 # Return output
160 start_response(status, headers)
161 return [output]
162
163 return prometheus_app
164
165
166class _SilentHandler(WSGIRequestHandler):
167 """WSGI handler that does not log requests."""
168
169 def log_message(self, format, *args):
170 """Log nothing."""
171
172
173class ThreadingWSGIServer(ThreadingMixIn, WSGIServer):
174 """Thread per request HTTP server."""
175 # Make worker threads "fire and forget". Beginning with Python 3.7 this
176 # prevents a memory leak because ``ThreadingMixIn`` starts to gather all
177 # non-daemon threads in a list in order to join on them at server close.
178 daemon_threads = True
179
180
181def _get_best_family(address, port):
182 """Automatically select address family depending on address"""
183 # HTTPServer defaults to AF_INET, which will not start properly if
184 # binding an ipv6 address is requested.
185 # This function is based on what upstream python did for http.server
186 # in https://github.com/python/cpython/pull/11767
187 infos = socket.getaddrinfo(address, port, type=socket.SOCK_STREAM, flags=socket.AI_PASSIVE)
188 family, _, _, _, sockaddr = next(iter(infos))
189 return family, sockaddr[0]
190
191
192def _get_ssl_ctx(
193 certfile: str,
194 keyfile: str,
195 protocol: int,
196 cafile: Optional[str] = None,
197 capath: Optional[str] = None,
198 client_auth_required: bool = False,
199) -> ssl.SSLContext:
200 """Load context supports SSL."""
201 ssl_cxt = ssl.SSLContext(protocol=protocol)
202
203 if cafile is not None or capath is not None:
204 try:
205 ssl_cxt.load_verify_locations(cafile, capath)
206 except IOError as exc:
207 exc_type = type(exc)
208 msg = str(exc)
209 raise exc_type(f"Cannot load CA certificate chain from file "
210 f"{cafile!r} or directory {capath!r}: {msg}")
211 else:
212 try:
213 ssl_cxt.load_default_certs(purpose=ssl.Purpose.CLIENT_AUTH)
214 except IOError as exc:
215 exc_type = type(exc)
216 msg = str(exc)
217 raise exc_type(f"Cannot load default CA certificate chain: {msg}")
218
219 if client_auth_required:
220 ssl_cxt.verify_mode = ssl.CERT_REQUIRED
221
222 try:
223 ssl_cxt.load_cert_chain(certfile=certfile, keyfile=keyfile)
224 except IOError as exc:
225 exc_type = type(exc)
226 msg = str(exc)
227 raise exc_type(f"Cannot load server certificate file {certfile!r} or "
228 f"its private key file {keyfile!r}: {msg}")
229
230 return ssl_cxt
231
232
233def start_wsgi_server(
234 port: int,
235 addr: str = '0.0.0.0',
236 registry: Collector = REGISTRY,
237 certfile: Optional[str] = None,
238 keyfile: Optional[str] = None,
239 client_cafile: Optional[str] = None,
240 client_capath: Optional[str] = None,
241 protocol: int = ssl.PROTOCOL_TLS_SERVER,
242 client_auth_required: bool = False,
243) -> Tuple[WSGIServer, threading.Thread]:
244 """Starts a WSGI server for prometheus metrics as a daemon thread."""
245
246 class TmpServer(ThreadingWSGIServer):
247 """Copy of ThreadingWSGIServer to update address_family locally"""
248
249 TmpServer.address_family, addr = _get_best_family(addr, port)
250 app = make_wsgi_app(registry)
251 httpd = make_server(addr, port, app, TmpServer, handler_class=_SilentHandler)
252 if certfile and keyfile:
253 context = _get_ssl_ctx(certfile, keyfile, protocol, client_cafile, client_capath, client_auth_required)
254 httpd.socket = context.wrap_socket(httpd.socket, server_side=True)
255 t = threading.Thread(target=httpd.serve_forever)
256 t.daemon = True
257 t.start()
258
259 return httpd, t
260
261
262start_http_server = start_wsgi_server
263
264
265def generate_latest(registry: Collector = REGISTRY, escaping: str = openmetrics.UNDERSCORES) -> bytes:
266 """
267 Generates the exposition format using the basic Prometheus text format.
268
269 Params:
270 registry: Collector to export data from.
271 escaping: Escaping scheme used for metric and label names.
272
273 Returns: UTF-8 encoded string containing the metrics in text format.
274 """
275
276 def sample_line(samples):
277 if samples.labels:
278 labelstr = '{0}'.format(','.join(
279 # Label values always support UTF-8
280 ['{}="{}"'.format(
281 openmetrics.escape_label_name(k, escaping), openmetrics._escape(v, openmetrics.ALLOWUTF8, False))
282 for k, v in sorted(samples.labels.items())]))
283 else:
284 labelstr = ''
285 timestamp = ''
286 if samples.timestamp is not None:
287 # Convert to milliseconds.
288 timestamp = f' {int(float(samples.timestamp) * 1000):d}'
289 if escaping != openmetrics.ALLOWUTF8 or openmetrics._is_valid_legacy_metric_name(samples.name):
290 if labelstr:
291 labelstr = '{{{0}}}'.format(labelstr)
292 return f'{openmetrics.escape_metric_name(samples.name, escaping)}{labelstr} {floatToGoString(samples.value)}{timestamp}\n'
293 maybe_comma = ''
294 if labelstr:
295 maybe_comma = ','
296 return f'{{{openmetrics.escape_metric_name(samples.name, escaping)}{maybe_comma}{labelstr}}} {floatToGoString(samples.value)}{timestamp}\n'
297
298 output = []
299 for metric in registry.collect():
300 try:
301 mname = metric.name
302 mtype = metric.type
303 # Munging from OpenMetrics into Prometheus format.
304 if mtype == 'counter':
305 mname = mname + '_total'
306 elif mtype == 'info':
307 mname = mname + '_info'
308 mtype = 'gauge'
309 elif mtype == 'stateset':
310 mtype = 'gauge'
311 elif mtype == 'gaugehistogram':
312 # A gauge histogram is really a gauge,
313 # but this captures the structure better.
314 mtype = 'histogram'
315 elif mtype == 'unknown':
316 mtype = 'untyped'
317
318 output.append('# HELP {} {}\n'.format(
319 openmetrics.escape_metric_name(mname, escaping), metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
320 output.append(f'# TYPE {openmetrics.escape_metric_name(mname, escaping)} {mtype}\n')
321
322 om_samples: Dict[str, List[str]] = {}
323 for s in metric.samples:
324 for suffix in ['_created', '_gsum', '_gcount']:
325 if s.name == metric.name + suffix:
326 # OpenMetrics specific sample, put in a gauge at the end.
327 om_samples.setdefault(suffix, []).append(sample_line(s))
328 break
329 else:
330 output.append(sample_line(s))
331 except Exception as exception:
332 exception.args = (exception.args or ('',)) + (metric,)
333 raise
334
335 for suffix, lines in sorted(om_samples.items()):
336 output.append('# HELP {} {}\n'.format(openmetrics.escape_metric_name(metric.name + suffix, escaping),
337 metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
338 output.append(f'# TYPE {openmetrics.escape_metric_name(metric.name + suffix, escaping)} gauge\n')
339 output.extend(lines)
340 return ''.join(output).encode('utf-8')
341
342
343def choose_encoder(accept_header: str) -> Tuple[Callable[[Collector], bytes], str]:
344 # Python client library accepts a narrower range of content-types than
345 # Prometheus does.
346 accept_header = accept_header or ''
347 escaping = openmetrics.UNDERSCORES
348 for accepted in accept_header.split(','):
349 if accepted.split(';')[0].strip() == 'application/openmetrics-text':
350 toks = accepted.split(';')
351 version = _get_version(toks)
352 escaping = _get_escaping(toks)
353 # Only return an escaping header if we have a good version and
354 # mimetype.
355 if not version:
356 return (partial(openmetrics.generate_latest, escaping=openmetrics.UNDERSCORES, version="1.0.0"), openmetrics.CONTENT_TYPE_LATEST)
357 if version and parse_version(version) >= (1, 0, 0):
358 return (partial(openmetrics.generate_latest, escaping=escaping, version=version),
359 f'application/openmetrics-text; version={version}; charset=utf-8; escaping=' + str(escaping))
360 elif accepted.split(';')[0].strip() == 'text/plain':
361 toks = accepted.split(';')
362 version = _get_version(toks)
363 escaping = _get_escaping(toks)
364 # Only return an escaping header if we have a good version and
365 # mimetype.
366 if version and parse_version(version) >= (1, 0, 0):
367 return (partial(generate_latest, escaping=escaping),
368 CONTENT_TYPE_LATEST + '; escaping=' + str(escaping))
369 return generate_latest, CONTENT_TYPE_PLAIN_0_0_4
370
371
372def _get_version(accept_header: List[str]) -> str:
373 """Return the version tag from the Accept header.
374
375 If no version is specified, returns empty string."""
376
377 for tok in accept_header:
378 if '=' not in tok:
379 continue
380 key, value = tok.strip().split('=', 1)
381 if key == 'version':
382 return value
383 return ""
384
385
386def _get_escaping(accept_header: List[str]) -> str:
387 """Return the escaping scheme from the Accept header.
388
389 If no escaping scheme is specified or the scheme is not one of the allowed
390 strings, defaults to UNDERSCORES."""
391
392 for tok in accept_header:
393 if '=' not in tok:
394 continue
395 key, value = tok.strip().split('=', 1)
396 if key != 'escaping':
397 continue
398 if value == openmetrics.ALLOWUTF8:
399 return openmetrics.ALLOWUTF8
400 elif value == openmetrics.UNDERSCORES:
401 return openmetrics.UNDERSCORES
402 elif value == openmetrics.DOTS:
403 return openmetrics.DOTS
404 elif value == openmetrics.VALUES:
405 return openmetrics.VALUES
406 else:
407 return openmetrics.UNDERSCORES
408 return openmetrics.UNDERSCORES
409
410
411def gzip_accepted(accept_encoding_header: str) -> bool:
412 accept_encoding_header = accept_encoding_header or ''
413 for accepted in accept_encoding_header.split(','):
414 if accepted.split(';')[0].strip().lower() == 'gzip':
415 return True
416 return False
417
418
419class MetricsHandler(BaseHTTPRequestHandler):
420 """HTTP handler that gives metrics from ``REGISTRY``."""
421 registry: Collector = REGISTRY
422
423 def do_GET(self) -> None:
424 # Prepare parameters
425 registry = self.registry
426 accept_header = self.headers.get('Accept')
427 accept_encoding_header = self.headers.get('Accept-Encoding')
428 params = parse_qs(urlparse(self.path).query)
429 # Bake output
430 status, headers, output = _bake_output(registry, accept_header, accept_encoding_header, params, False)
431 # Return output
432 self.send_response(int(status.split(' ')[0]))
433 for header in headers:
434 self.send_header(*header)
435 self.end_headers()
436 self.wfile.write(output)
437
438 def log_message(self, format: str, *args: Any) -> None:
439 """Log nothing."""
440
441 @classmethod
442 def factory(cls, registry: Collector) -> type:
443 """Returns a dynamic MetricsHandler class tied
444 to the passed registry.
445 """
446 # This implementation relies on MetricsHandler.registry
447 # (defined above and defaulted to REGISTRY).
448
449 # As we have unicode_literals, we need to create a str()
450 # object for type().
451 cls_name = str(cls.__name__)
452 MyMetricsHandler = type(cls_name, (cls, object),
453 {"registry": registry})
454 return MyMetricsHandler
455
456
457def write_to_textfile(path: str, registry: Collector, escaping: str = openmetrics.ALLOWUTF8, tmpdir: Optional[str] = None) -> None:
458 """Write metrics to the given path.
459
460 This is intended for use with the Node exporter textfile collector.
461 The path must end in .prom for the textfile collector to process it.
462
463 An optional tmpdir parameter can be set to determine where the
464 metrics will be temporarily written to. If not set, it will be in
465 the same directory as the .prom file. If provided, the path MUST be
466 on the same filesystem."""
467 if tmpdir is not None:
468 filename = os.path.basename(path)
469 tmppath = f'{os.path.join(tmpdir, filename)}.{os.getpid()}.{threading.current_thread().ident}'
470 else:
471 tmppath = f'{path}.{os.getpid()}.{threading.current_thread().ident}'
472 try:
473 with open(tmppath, 'wb') as f:
474 f.write(generate_latest(registry, escaping))
475
476 # rename(2) is atomic but fails on Windows if the destination file exists
477 if os.name == 'nt':
478 os.replace(tmppath, path)
479 else:
480 os.rename(tmppath, path)
481 except Exception:
482 if os.path.exists(tmppath):
483 os.remove(tmppath)
484 raise
485
486
487def _make_handler(
488 url: str,
489 method: str,
490 timeout: Optional[float],
491 headers: Sequence[Tuple[str, str]],
492 data: bytes,
493 base_handler: Union[BaseHandler, type],
494) -> Callable[[], None]:
495 def handle() -> None:
496 request = Request(url, data=data)
497 request.get_method = lambda: method # type: ignore
498 for k, v in headers:
499 request.add_header(k, v)
500 resp = build_opener(base_handler).open(request, timeout=timeout)
501 if resp.code >= 400:
502 raise OSError(f"error talking to pushgateway: {resp.code} {resp.msg}")
503
504 return handle
505
506
507def default_handler(
508 url: str,
509 method: str,
510 timeout: Optional[float],
511 headers: List[Tuple[str, str]],
512 data: bytes,
513) -> Callable[[], None]:
514 """Default handler that implements HTTP/HTTPS connections.
515
516 Used by the push_to_gateway functions. Can be re-used by other handlers."""
517
518 return _make_handler(url, method, timeout, headers, data, HTTPHandler)
519
520
521def passthrough_redirect_handler(
522 url: str,
523 method: str,
524 timeout: Optional[float],
525 headers: List[Tuple[str, str]],
526 data: bytes,
527) -> Callable[[], None]:
528 """
529 Handler that automatically trusts redirect responses for all HTTP methods.
530
531 Augments standard HTTPRedirectHandler capability by permitting PUT requests,
532 preserving the method upon redirect, and passing through all headers and
533 data from the original request. Only use this handler if you control or
534 trust the source of redirect responses you encounter when making requests
535 via the Prometheus client. This handler will simply repeat the identical
536 request, including same method and data, to the new redirect URL."""
537
538 return _make_handler(url, method, timeout, headers, data, _PrometheusRedirectHandler)
539
540
541def basic_auth_handler(
542 url: str,
543 method: str,
544 timeout: Optional[float],
545 headers: List[Tuple[str, str]],
546 data: bytes,
547 username: Optional[str] = None,
548 password: Optional[str] = None,
549) -> Callable[[], None]:
550 """Handler that implements HTTP/HTTPS connections with Basic Auth.
551
552 Sets auth headers using supplied 'username' and 'password', if set.
553 Used by the push_to_gateway functions. Can be re-used by other handlers."""
554
555 def handle():
556 """Handler that implements HTTP Basic Auth.
557 """
558 if username is not None and password is not None:
559 auth_value = f'{username}:{password}'.encode()
560 auth_token = base64.b64encode(auth_value)
561 auth_header = b'Basic ' + auth_token
562 headers.append(('Authorization', auth_header))
563 default_handler(url, method, timeout, headers, data)()
564
565 return handle
566
567
568def tls_auth_handler(
569 url: str,
570 method: str,
571 timeout: Optional[float],
572 headers: List[Tuple[str, str]],
573 data: bytes,
574 certfile: str,
575 keyfile: str,
576 cafile: Optional[str] = None,
577 protocol: int = ssl.PROTOCOL_TLS_CLIENT,
578 insecure_skip_verify: bool = False,
579) -> Callable[[], None]:
580 """Handler that implements an HTTPS connection with TLS Auth.
581
582 The default protocol (ssl.PROTOCOL_TLS_CLIENT) will also enable
583 ssl.CERT_REQUIRED and SSLContext.check_hostname by default. This can be
584 disabled by setting insecure_skip_verify to True.
585
586 Both this handler and the TLS feature on pushgateay are experimental."""
587 context = ssl.SSLContext(protocol=protocol)
588 if cafile is not None:
589 context.load_verify_locations(cafile)
590 else:
591 context.load_default_certs()
592
593 if insecure_skip_verify:
594 context.check_hostname = False
595 context.verify_mode = ssl.CERT_NONE
596
597 context.load_cert_chain(certfile=certfile, keyfile=keyfile)
598 handler = HTTPSHandler(context=context)
599 return _make_handler(url, method, timeout, headers, data, handler)
600
601
602def push_to_gateway(
603 gateway: str,
604 job: str,
605 registry: Collector,
606 grouping_key: Optional[Dict[str, Any]] = None,
607 timeout: Optional[float] = 30,
608 handler: Callable = default_handler,
609 compression: CompressionType = None,
610) -> None:
611 """Push metrics to the given pushgateway.
612
613 `gateway` the url for your push gateway. Either of the form
614 'http://pushgateway.local', or 'pushgateway.local'.
615 Scheme defaults to 'http' if none is provided
616 `job` is the job label to be attached to all pushed metrics
617 `registry` is a Collector, normally an instance of CollectorRegistry
618 `grouping_key` please see the pushgateway documentation for details.
619 Defaults to None
620 `timeout` is how long push will attempt to connect before giving up.
621 Defaults to 30s, can be set to None for no timeout.
622 `handler` is an optional function which can be provided to perform
623 requests to the 'gateway'.
624 Defaults to None, in which case an http or https request
625 will be carried out by a default handler.
626 If not None, the argument must be a function which accepts
627 the following arguments:
628 url, method, timeout, headers, and content
629 May be used to implement additional functionality not
630 supported by the built-in default handler (such as SSL
631 client certicates, and HTTP authentication mechanisms).
632 'url' is the URL for the request, the 'gateway' argument
633 described earlier will form the basis of this URL.
634 'method' is the HTTP method which should be used when
635 carrying out the request.
636 'timeout' requests not successfully completed after this
637 many seconds should be aborted. If timeout is None, then
638 the handler should not set a timeout.
639 'headers' is a list of ("header-name","header-value") tuples
640 which must be passed to the pushgateway in the form of HTTP
641 request headers.
642 The function should raise an exception (e.g. IOError) on
643 failure.
644 'content' is the data which should be used to form the HTTP
645 Message Body.
646 `compression` selects the payload compression. Supported values are 'gzip'
647 and 'snappy'. Defaults to None (no compression).
648
649 This overwrites all metrics with the same job and grouping_key.
650 This uses the PUT HTTP method."""
651 _use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler, compression)
652
653
654def pushadd_to_gateway(
655 gateway: str,
656 job: str,
657 registry: Optional[Collector],
658 grouping_key: Optional[Dict[str, Any]] = None,
659 timeout: Optional[float] = 30,
660 handler: Callable = default_handler,
661 compression: CompressionType = None,
662) -> None:
663 """PushAdd metrics to the given pushgateway.
664
665 `gateway` the url for your push gateway. Either of the form
666 'http://pushgateway.local', or 'pushgateway.local'.
667 Scheme defaults to 'http' if none is provided
668 `job` is the job label to be attached to all pushed metrics
669 `registry` is a Collector, normally an instance of CollectorRegistry
670 `grouping_key` please see the pushgateway documentation for details.
671 Defaults to None
672 `timeout` is how long push will attempt to connect before giving up.
673 Defaults to 30s, can be set to None for no timeout.
674 `handler` is an optional function which can be provided to perform
675 requests to the 'gateway'.
676 Defaults to None, in which case an http or https request
677 will be carried out by a default handler.
678 See the 'prometheus_client.push_to_gateway' documentation
679 for implementation requirements.
680 `compression` selects the payload compression. Supported values are 'gzip'
681 and 'snappy'. Defaults to None (no compression).
682
683 This replaces metrics with the same name, job and grouping_key.
684 This uses the POST HTTP method."""
685 _use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler, compression)
686
687
688def delete_from_gateway(
689 gateway: str,
690 job: str,
691 grouping_key: Optional[Dict[str, Any]] = None,
692 timeout: Optional[float] = 30,
693 handler: Callable = default_handler,
694) -> None:
695 """Delete metrics from the given pushgateway.
696
697 `gateway` the url for your push gateway. Either of the form
698 'http://pushgateway.local', or 'pushgateway.local'.
699 Scheme defaults to 'http' if none is provided
700 `job` is the job label to be attached to all pushed metrics
701 `grouping_key` please see the pushgateway documentation for details.
702 Defaults to None
703 `timeout` is how long delete will attempt to connect before giving up.
704 Defaults to 30s, can be set to None for no timeout.
705 `handler` is an optional function which can be provided to perform
706 requests to the 'gateway'.
707 Defaults to None, in which case an http or https request
708 will be carried out by a default handler.
709 See the 'prometheus_client.push_to_gateway' documentation
710 for implementation requirements.
711
712 This deletes metrics with the given job and grouping_key.
713 This uses the DELETE HTTP method."""
714 _use_gateway('DELETE', gateway, job, None, grouping_key, timeout, handler)
715
716
717def _use_gateway(
718 method: str,
719 gateway: str,
720 job: str,
721 registry: Optional[Collector],
722 grouping_key: Optional[Dict[str, Any]],
723 timeout: Optional[float],
724 handler: Callable,
725 compression: CompressionType = None,
726) -> None:
727 gateway_url = urlparse(gateway)
728 # See https://bugs.python.org/issue27657 for details on urlparse in py>=3.7.6.
729 if not gateway_url.scheme or gateway_url.scheme not in ['http', 'https']:
730 gateway = f'http://{gateway}'
731
732 gateway = gateway.rstrip('/')
733 url = '{}/metrics/{}/{}'.format(gateway, *_escape_grouping_key("job", job))
734
735 if grouping_key is None:
736 grouping_key = {}
737 url += ''.join(
738 '/{}/{}'.format(*_escape_grouping_key(str(k), str(v)))
739 for k, v in sorted(grouping_key.items()))
740
741 data = b''
742 headers: List[Tuple[str, str]] = []
743 if method != 'DELETE':
744 if registry is None:
745 registry = REGISTRY
746 data = generate_latest(registry)
747 data, headers = _compress_payload(data, compression)
748 else:
749 # DELETE requests still need Content-Type header per test expectations
750 headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)]
751 if compression is not None:
752 raise ValueError('Compression is not supported for DELETE requests.')
753
754 handler(
755 url=url, method=method, timeout=timeout,
756 headers=headers, data=data,
757 )()
758
759
760def _compress_payload(data: bytes, compression: CompressionType) -> Tuple[bytes, List[Tuple[str, str]]]:
761 headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)]
762 if compression is None:
763 return data, headers
764
765 encoding = compression.lower()
766 if encoding == 'gzip':
767 headers.append(('Content-Encoding', 'gzip'))
768 return gzip.compress(data), headers
769 if encoding == 'snappy':
770 if not SNAPPY_AVAILABLE:
771 raise RuntimeError('Snappy compression requires the python-snappy package to be installed.')
772 headers.append(('Content-Encoding', 'snappy'))
773 compressor = snappy.StreamCompressor()
774 compressed = compressor.compress(data)
775 flush = getattr(compressor, 'flush', None)
776 if callable(flush):
777 compressed += flush()
778 return compressed, headers
779 raise ValueError(f"Unsupported compression type: {compression}")
780
781
782def _escape_grouping_key(k, v):
783 if v == "":
784 # Per https://github.com/prometheus/pushgateway/pull/346.
785 return k + "@base64", "="
786 elif '/' in v:
787 # Added in Pushgateway 0.9.0.
788 return k + "@base64", base64.urlsafe_b64encode(v.encode("utf-8")).decode("utf-8")
789 else:
790 return k, quote_plus(v)
791
792
793def instance_ip_grouping_key() -> Dict[str, Any]:
794 """Grouping key with instance set to the IP Address of this host."""
795 with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s:
796 if sys.platform == 'darwin':
797 # This check is done this way only on MacOS devices
798 # it is done this way because the localhost method does
799 # not work.
800 # This method was adapted from this StackOverflow answer:
801 # https://stackoverflow.com/a/28950776
802 s.connect(('10.255.255.255', 1))
803 else:
804 s.connect(('localhost', 0))
805
806 return {'instance': s.getsockname()[0]}
807
808
809from .asgi import make_asgi_app # noqa