1import base64
2from contextlib import closing
3from functools import partial
4import gzip
5from http.server import BaseHTTPRequestHandler
6import os
7import socket
8from socketserver import ThreadingMixIn
9import ssl
10import sys
11import threading
12from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
13from urllib.error import HTTPError
14from urllib.parse import parse_qs, quote_plus, urlparse
15from urllib.request import (
16 BaseHandler, build_opener, HTTPHandler, HTTPRedirectHandler, HTTPSHandler,
17 Request,
18)
19from wsgiref.simple_server import make_server, WSGIRequestHandler, WSGIServer
20
21from .openmetrics import exposition as openmetrics
22from .registry import CollectorRegistry, REGISTRY
23from .utils import floatToGoString, parse_version
24
25__all__ = (
26 'CONTENT_TYPE_LATEST',
27 'CONTENT_TYPE_PLAIN_0_0_4',
28 'CONTENT_TYPE_PLAIN_1_0_0',
29 'delete_from_gateway',
30 'generate_latest',
31 'instance_ip_grouping_key',
32 'make_asgi_app',
33 'make_wsgi_app',
34 'MetricsHandler',
35 'push_to_gateway',
36 'pushadd_to_gateway',
37 'start_http_server',
38 'start_wsgi_server',
39 'write_to_textfile',
40)
41
42CONTENT_TYPE_PLAIN_0_0_4 = 'text/plain; version=0.0.4; charset=utf-8'
43"""Content type of the compatibility format"""
44
45CONTENT_TYPE_PLAIN_1_0_0 = 'text/plain; version=1.0.0; charset=utf-8'
46"""Content type of the latest format"""
47
48CONTENT_TYPE_LATEST = CONTENT_TYPE_PLAIN_1_0_0
49
50
51class _PrometheusRedirectHandler(HTTPRedirectHandler):
52 """
53 Allow additional methods (e.g. PUT) and data forwarding in redirects.
54
55 Use of this class constitute a user's explicit agreement to the
56 redirect responses the Prometheus client will receive when using it.
57 You should only use this class if you control or otherwise trust the
58 redirect behavior involved and are certain it is safe to full transfer
59 the original request (method and data) to the redirected URL. For
60 example, if you know there is a cosmetic URL redirect in front of a
61 local deployment of a Prometheus server, and all redirects are safe,
62 this is the class to use to handle redirects in that case.
63
64 The standard HTTPRedirectHandler does not forward request data nor
65 does it allow redirected PUT requests (which Prometheus uses for some
66 operations, for example `push_to_gateway`) because these cannot
67 generically guarantee no violations of HTTP RFC 2616 requirements for
68 the user to explicitly confirm redirects that could have unexpected
69 side effects (such as rendering a PUT request non-idempotent or
70 creating multiple resources not named in the original request).
71 """
72
73 def redirect_request(self, req, fp, code, msg, headers, newurl):
74 """
75 Apply redirect logic to a request.
76
77 See parent HTTPRedirectHandler.redirect_request for parameter info.
78
79 If the redirect is disallowed, this raises the corresponding HTTP error.
80 If the redirect can't be determined, return None to allow other handlers
81 to try. If the redirect is allowed, return the new request.
82
83 This method specialized for the case when (a) the user knows that the
84 redirect will not cause unacceptable side effects for any request method,
85 and (b) the user knows that any request data should be passed through to
86 the redirect. If either condition is not met, this should not be used.
87 """
88 # note that requests being provided by a handler will use get_method to
89 # indicate the method, by monkeypatching this, instead of setting the
90 # Request object's method attribute.
91 m = getattr(req, "method", req.get_method())
92 if not (code in (301, 302, 303, 307) and m in ("GET", "HEAD")
93 or code in (301, 302, 303) and m in ("POST", "PUT")):
94 raise HTTPError(req.full_url, code, msg, headers, fp)
95 new_request = Request(
96 newurl.replace(' ', '%20'), # space escaping in new url if needed.
97 headers=req.headers,
98 origin_req_host=req.origin_req_host,
99 unverifiable=True,
100 data=req.data,
101 )
102 new_request.method = m
103 return new_request
104
105
106def _bake_output(registry, accept_header, accept_encoding_header, params, disable_compression):
107 """Bake output for metrics output."""
108 # Choose the correct plain text format of the output.
109 encoder, content_type = choose_encoder(accept_header)
110 if 'name[]' in params:
111 registry = registry.restricted_registry(params['name[]'])
112 output = encoder(registry)
113 headers = [('Content-Type', content_type)]
114 # If gzip encoding required, gzip the output.
115 if not disable_compression and gzip_accepted(accept_encoding_header):
116 output = gzip.compress(output)
117 headers.append(('Content-Encoding', 'gzip'))
118 return '200 OK', headers, output
119
120
121def make_wsgi_app(registry: CollectorRegistry = REGISTRY, disable_compression: bool = False) -> Callable:
122 """Create a WSGI app which serves the metrics from a registry."""
123
124 def prometheus_app(environ, start_response):
125 # Prepare parameters
126 accept_header = environ.get('HTTP_ACCEPT')
127 accept_encoding_header = environ.get('HTTP_ACCEPT_ENCODING')
128 params = parse_qs(environ.get('QUERY_STRING', ''))
129 method = environ['REQUEST_METHOD']
130
131 if method == 'OPTIONS':
132 status = '200 OK'
133 headers = [('Allow', 'OPTIONS,GET')]
134 output = b''
135 elif method != 'GET':
136 status = '405 Method Not Allowed'
137 headers = [('Allow', 'OPTIONS,GET')]
138 output = '# HTTP {}: {}; use OPTIONS or GET\n'.format(status, method).encode()
139 elif environ['PATH_INFO'] == '/favicon.ico':
140 # Serve empty response for browsers
141 status = '200 OK'
142 headers = []
143 output = b''
144 else:
145 # Note: For backwards compatibility, the URI path for GET is not
146 # constrained to the documented /metrics, but any path is allowed.
147 # Bake output
148 status, headers, output = _bake_output(registry, accept_header, accept_encoding_header, params, disable_compression)
149 # Return output
150 start_response(status, headers)
151 return [output]
152
153 return prometheus_app
154
155
156class _SilentHandler(WSGIRequestHandler):
157 """WSGI handler that does not log requests."""
158
159 def log_message(self, format, *args):
160 """Log nothing."""
161
162
163class ThreadingWSGIServer(ThreadingMixIn, WSGIServer):
164 """Thread per request HTTP server."""
165 # Make worker threads "fire and forget". Beginning with Python 3.7 this
166 # prevents a memory leak because ``ThreadingMixIn`` starts to gather all
167 # non-daemon threads in a list in order to join on them at server close.
168 daemon_threads = True
169
170
171def _get_best_family(address, port):
172 """Automatically select address family depending on address"""
173 # HTTPServer defaults to AF_INET, which will not start properly if
174 # binding an ipv6 address is requested.
175 # This function is based on what upstream python did for http.server
176 # in https://github.com/python/cpython/pull/11767
177 infos = socket.getaddrinfo(address, port, type=socket.SOCK_STREAM, flags=socket.AI_PASSIVE)
178 family, _, _, _, sockaddr = next(iter(infos))
179 return family, sockaddr[0]
180
181
182def _get_ssl_ctx(
183 certfile: str,
184 keyfile: str,
185 protocol: int,
186 cafile: Optional[str] = None,
187 capath: Optional[str] = None,
188 client_auth_required: bool = False,
189) -> ssl.SSLContext:
190 """Load context supports SSL."""
191 ssl_cxt = ssl.SSLContext(protocol=protocol)
192
193 if cafile is not None or capath is not None:
194 try:
195 ssl_cxt.load_verify_locations(cafile, capath)
196 except IOError as exc:
197 exc_type = type(exc)
198 msg = str(exc)
199 raise exc_type(f"Cannot load CA certificate chain from file "
200 f"{cafile!r} or directory {capath!r}: {msg}")
201 else:
202 try:
203 ssl_cxt.load_default_certs(purpose=ssl.Purpose.CLIENT_AUTH)
204 except IOError as exc:
205 exc_type = type(exc)
206 msg = str(exc)
207 raise exc_type(f"Cannot load default CA certificate chain: {msg}")
208
209 if client_auth_required:
210 ssl_cxt.verify_mode = ssl.CERT_REQUIRED
211
212 try:
213 ssl_cxt.load_cert_chain(certfile=certfile, keyfile=keyfile)
214 except IOError as exc:
215 exc_type = type(exc)
216 msg = str(exc)
217 raise exc_type(f"Cannot load server certificate file {certfile!r} or "
218 f"its private key file {keyfile!r}: {msg}")
219
220 return ssl_cxt
221
222
223def start_wsgi_server(
224 port: int,
225 addr: str = '0.0.0.0',
226 registry: CollectorRegistry = REGISTRY,
227 certfile: Optional[str] = None,
228 keyfile: Optional[str] = None,
229 client_cafile: Optional[str] = None,
230 client_capath: Optional[str] = None,
231 protocol: int = ssl.PROTOCOL_TLS_SERVER,
232 client_auth_required: bool = False,
233) -> Tuple[WSGIServer, threading.Thread]:
234 """Starts a WSGI server for prometheus metrics as a daemon thread."""
235
236 class TmpServer(ThreadingWSGIServer):
237 """Copy of ThreadingWSGIServer to update address_family locally"""
238
239 TmpServer.address_family, addr = _get_best_family(addr, port)
240 app = make_wsgi_app(registry)
241 httpd = make_server(addr, port, app, TmpServer, handler_class=_SilentHandler)
242 if certfile and keyfile:
243 context = _get_ssl_ctx(certfile, keyfile, protocol, client_cafile, client_capath, client_auth_required)
244 httpd.socket = context.wrap_socket(httpd.socket, server_side=True)
245 t = threading.Thread(target=httpd.serve_forever)
246 t.daemon = True
247 t.start()
248
249 return httpd, t
250
251
252start_http_server = start_wsgi_server
253
254
255def generate_latest(registry: CollectorRegistry = REGISTRY, escaping: str = openmetrics.UNDERSCORES) -> bytes:
256 """
257 Generates the exposition format using the basic Prometheus text format.
258
259 Params:
260 registry: CollectorRegistry to export data from.
261 escaping: Escaping scheme used for metric and label names.
262
263 Returns: UTF-8 encoded string containing the metrics in text format.
264 """
265
266 def sample_line(samples):
267 if samples.labels:
268 labelstr = '{0}'.format(','.join(
269 # Label values always support UTF-8
270 ['{}="{}"'.format(
271 openmetrics.escape_label_name(k, escaping), openmetrics._escape(v, openmetrics.ALLOWUTF8, False))
272 for k, v in sorted(samples.labels.items())]))
273 else:
274 labelstr = ''
275 timestamp = ''
276 if samples.timestamp is not None:
277 # Convert to milliseconds.
278 timestamp = f' {int(float(samples.timestamp) * 1000):d}'
279 if escaping != openmetrics.ALLOWUTF8 or openmetrics._is_valid_legacy_metric_name(samples.name):
280 if labelstr:
281 labelstr = '{{{0}}}'.format(labelstr)
282 return f'{openmetrics.escape_metric_name(samples.name, escaping)}{labelstr} {floatToGoString(samples.value)}{timestamp}\n'
283 maybe_comma = ''
284 if labelstr:
285 maybe_comma = ','
286 return f'{{{openmetrics.escape_metric_name(samples.name, escaping)}{maybe_comma}{labelstr}}} {floatToGoString(samples.value)}{timestamp}\n'
287
288 output = []
289 for metric in registry.collect():
290 try:
291 mname = metric.name
292 mtype = metric.type
293 # Munging from OpenMetrics into Prometheus format.
294 if mtype == 'counter':
295 mname = mname + '_total'
296 elif mtype == 'info':
297 mname = mname + '_info'
298 mtype = 'gauge'
299 elif mtype == 'stateset':
300 mtype = 'gauge'
301 elif mtype == 'gaugehistogram':
302 # A gauge histogram is really a gauge,
303 # but this captures the structure better.
304 mtype = 'histogram'
305 elif mtype == 'unknown':
306 mtype = 'untyped'
307
308 output.append('# HELP {} {}\n'.format(
309 openmetrics.escape_metric_name(mname, escaping), metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
310 output.append(f'# TYPE {openmetrics.escape_metric_name(mname, escaping)} {mtype}\n')
311
312 om_samples: Dict[str, List[str]] = {}
313 for s in metric.samples:
314 for suffix in ['_created', '_gsum', '_gcount']:
315 if s.name == metric.name + suffix:
316 # OpenMetrics specific sample, put in a gauge at the end.
317 om_samples.setdefault(suffix, []).append(sample_line(s))
318 break
319 else:
320 output.append(sample_line(s))
321 except Exception as exception:
322 exception.args = (exception.args or ('',)) + (metric,)
323 raise
324
325 for suffix, lines in sorted(om_samples.items()):
326 output.append('# HELP {} {}\n'.format(openmetrics.escape_metric_name(metric.name + suffix, escaping),
327 metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
328 output.append(f'# TYPE {openmetrics.escape_metric_name(metric.name + suffix, escaping)} gauge\n')
329 output.extend(lines)
330 return ''.join(output).encode('utf-8')
331
332
333def choose_encoder(accept_header: str) -> Tuple[Callable[[CollectorRegistry], bytes], str]:
334 # Python client library accepts a narrower range of content-types than
335 # Prometheus does.
336 accept_header = accept_header or ''
337 escaping = openmetrics.UNDERSCORES
338 for accepted in accept_header.split(','):
339 if accepted.split(';')[0].strip() == 'application/openmetrics-text':
340 toks = accepted.split(';')
341 version = _get_version(toks)
342 escaping = _get_escaping(toks)
343 # Only return an escaping header if we have a good version and
344 # mimetype.
345 if not version:
346 return (partial(openmetrics.generate_latest, escaping=openmetrics.UNDERSCORES, version="1.0.0"), openmetrics.CONTENT_TYPE_LATEST)
347 if version and parse_version(version) >= (1, 0, 0):
348 return (partial(openmetrics.generate_latest, escaping=escaping, version=version),
349 f'application/openmetrics-text; version={version}; charset=utf-8; escaping=' + str(escaping))
350 elif accepted.split(';')[0].strip() == 'text/plain':
351 toks = accepted.split(';')
352 version = _get_version(toks)
353 escaping = _get_escaping(toks)
354 # Only return an escaping header if we have a good version and
355 # mimetype.
356 if version and parse_version(version) >= (1, 0, 0):
357 return (partial(generate_latest, escaping=escaping),
358 CONTENT_TYPE_LATEST + '; escaping=' + str(escaping))
359 return generate_latest, CONTENT_TYPE_PLAIN_0_0_4
360
361
362def _get_version(accept_header: List[str]) -> str:
363 """Return the version tag from the Accept header.
364
365 If no version is specified, returns empty string."""
366
367 for tok in accept_header:
368 if '=' not in tok:
369 continue
370 key, value = tok.strip().split('=', 1)
371 if key == 'version':
372 return value
373 return ""
374
375
376def _get_escaping(accept_header: List[str]) -> str:
377 """Return the escaping scheme from the Accept header.
378
379 If no escaping scheme is specified or the scheme is not one of the allowed
380 strings, defaults to UNDERSCORES."""
381
382 for tok in accept_header:
383 if '=' not in tok:
384 continue
385 key, value = tok.strip().split('=', 1)
386 if key != 'escaping':
387 continue
388 if value == openmetrics.ALLOWUTF8:
389 return openmetrics.ALLOWUTF8
390 elif value == openmetrics.UNDERSCORES:
391 return openmetrics.UNDERSCORES
392 elif value == openmetrics.DOTS:
393 return openmetrics.DOTS
394 elif value == openmetrics.VALUES:
395 return openmetrics.VALUES
396 else:
397 return openmetrics.UNDERSCORES
398 return openmetrics.UNDERSCORES
399
400
401def gzip_accepted(accept_encoding_header: str) -> bool:
402 accept_encoding_header = accept_encoding_header or ''
403 for accepted in accept_encoding_header.split(','):
404 if accepted.split(';')[0].strip().lower() == 'gzip':
405 return True
406 return False
407
408
409class MetricsHandler(BaseHTTPRequestHandler):
410 """HTTP handler that gives metrics from ``REGISTRY``."""
411 registry: CollectorRegistry = REGISTRY
412
413 def do_GET(self) -> None:
414 # Prepare parameters
415 registry = self.registry
416 accept_header = self.headers.get('Accept')
417 accept_encoding_header = self.headers.get('Accept-Encoding')
418 params = parse_qs(urlparse(self.path).query)
419 # Bake output
420 status, headers, output = _bake_output(registry, accept_header, accept_encoding_header, params, False)
421 # Return output
422 self.send_response(int(status.split(' ')[0]))
423 for header in headers:
424 self.send_header(*header)
425 self.end_headers()
426 self.wfile.write(output)
427
428 def log_message(self, format: str, *args: Any) -> None:
429 """Log nothing."""
430
431 @classmethod
432 def factory(cls, registry: CollectorRegistry) -> type:
433 """Returns a dynamic MetricsHandler class tied
434 to the passed registry.
435 """
436 # This implementation relies on MetricsHandler.registry
437 # (defined above and defaulted to REGISTRY).
438
439 # As we have unicode_literals, we need to create a str()
440 # object for type().
441 cls_name = str(cls.__name__)
442 MyMetricsHandler = type(cls_name, (cls, object),
443 {"registry": registry})
444 return MyMetricsHandler
445
446
447def write_to_textfile(path: str, registry: CollectorRegistry, escaping: str = openmetrics.ALLOWUTF8, tmpdir: Optional[str] = None) -> None:
448 """Write metrics to the given path.
449
450 This is intended for use with the Node exporter textfile collector.
451 The path must end in .prom for the textfile collector to process it.
452
453 An optional tmpdir parameter can be set to determine where the
454 metrics will be temporarily written to. If not set, it will be in
455 the same directory as the .prom file. If provided, the path MUST be
456 on the same filesystem."""
457 if tmpdir is not None:
458 filename = os.path.basename(path)
459 tmppath = f'{os.path.join(tmpdir, filename)}.{os.getpid()}.{threading.current_thread().ident}'
460 else:
461 tmppath = f'{path}.{os.getpid()}.{threading.current_thread().ident}'
462 try:
463 with open(tmppath, 'wb') as f:
464 f.write(generate_latest(registry, escaping))
465
466 # rename(2) is atomic but fails on Windows if the destination file exists
467 if os.name == 'nt':
468 os.replace(tmppath, path)
469 else:
470 os.rename(tmppath, path)
471 except Exception:
472 if os.path.exists(tmppath):
473 os.remove(tmppath)
474 raise
475
476
477def _make_handler(
478 url: str,
479 method: str,
480 timeout: Optional[float],
481 headers: Sequence[Tuple[str, str]],
482 data: bytes,
483 base_handler: Union[BaseHandler, type],
484) -> Callable[[], None]:
485 def handle() -> None:
486 request = Request(url, data=data)
487 request.get_method = lambda: method # type: ignore
488 for k, v in headers:
489 request.add_header(k, v)
490 resp = build_opener(base_handler).open(request, timeout=timeout)
491 if resp.code >= 400:
492 raise OSError(f"error talking to pushgateway: {resp.code} {resp.msg}")
493
494 return handle
495
496
497def default_handler(
498 url: str,
499 method: str,
500 timeout: Optional[float],
501 headers: List[Tuple[str, str]],
502 data: bytes,
503) -> Callable[[], None]:
504 """Default handler that implements HTTP/HTTPS connections.
505
506 Used by the push_to_gateway functions. Can be re-used by other handlers."""
507
508 return _make_handler(url, method, timeout, headers, data, HTTPHandler)
509
510
511def passthrough_redirect_handler(
512 url: str,
513 method: str,
514 timeout: Optional[float],
515 headers: List[Tuple[str, str]],
516 data: bytes,
517) -> Callable[[], None]:
518 """
519 Handler that automatically trusts redirect responses for all HTTP methods.
520
521 Augments standard HTTPRedirectHandler capability by permitting PUT requests,
522 preserving the method upon redirect, and passing through all headers and
523 data from the original request. Only use this handler if you control or
524 trust the source of redirect responses you encounter when making requests
525 via the Prometheus client. This handler will simply repeat the identical
526 request, including same method and data, to the new redirect URL."""
527
528 return _make_handler(url, method, timeout, headers, data, _PrometheusRedirectHandler)
529
530
531def basic_auth_handler(
532 url: str,
533 method: str,
534 timeout: Optional[float],
535 headers: List[Tuple[str, str]],
536 data: bytes,
537 username: Optional[str] = None,
538 password: Optional[str] = None,
539) -> Callable[[], None]:
540 """Handler that implements HTTP/HTTPS connections with Basic Auth.
541
542 Sets auth headers using supplied 'username' and 'password', if set.
543 Used by the push_to_gateway functions. Can be re-used by other handlers."""
544
545 def handle():
546 """Handler that implements HTTP Basic Auth.
547 """
548 if username is not None and password is not None:
549 auth_value = f'{username}:{password}'.encode()
550 auth_token = base64.b64encode(auth_value)
551 auth_header = b'Basic ' + auth_token
552 headers.append(('Authorization', auth_header))
553 default_handler(url, method, timeout, headers, data)()
554
555 return handle
556
557
558def tls_auth_handler(
559 url: str,
560 method: str,
561 timeout: Optional[float],
562 headers: List[Tuple[str, str]],
563 data: bytes,
564 certfile: str,
565 keyfile: str,
566 cafile: Optional[str] = None,
567 protocol: int = ssl.PROTOCOL_TLS_CLIENT,
568 insecure_skip_verify: bool = False,
569) -> Callable[[], None]:
570 """Handler that implements an HTTPS connection with TLS Auth.
571
572 The default protocol (ssl.PROTOCOL_TLS_CLIENT) will also enable
573 ssl.CERT_REQUIRED and SSLContext.check_hostname by default. This can be
574 disabled by setting insecure_skip_verify to True.
575
576 Both this handler and the TLS feature on pushgateay are experimental."""
577 context = ssl.SSLContext(protocol=protocol)
578 if cafile is not None:
579 context.load_verify_locations(cafile)
580 else:
581 context.load_default_certs()
582
583 if insecure_skip_verify:
584 context.check_hostname = False
585 context.verify_mode = ssl.CERT_NONE
586
587 context.load_cert_chain(certfile=certfile, keyfile=keyfile)
588 handler = HTTPSHandler(context=context)
589 return _make_handler(url, method, timeout, headers, data, handler)
590
591
592def push_to_gateway(
593 gateway: str,
594 job: str,
595 registry: CollectorRegistry,
596 grouping_key: Optional[Dict[str, Any]] = None,
597 timeout: Optional[float] = 30,
598 handler: Callable = default_handler,
599) -> None:
600 """Push metrics to the given pushgateway.
601
602 `gateway` the url for your push gateway. Either of the form
603 'http://pushgateway.local', or 'pushgateway.local'.
604 Scheme defaults to 'http' if none is provided
605 `job` is the job label to be attached to all pushed metrics
606 `registry` is an instance of CollectorRegistry
607 `grouping_key` please see the pushgateway documentation for details.
608 Defaults to None
609 `timeout` is how long push will attempt to connect before giving up.
610 Defaults to 30s, can be set to None for no timeout.
611 `handler` is an optional function which can be provided to perform
612 requests to the 'gateway'.
613 Defaults to None, in which case an http or https request
614 will be carried out by a default handler.
615 If not None, the argument must be a function which accepts
616 the following arguments:
617 url, method, timeout, headers, and content
618 May be used to implement additional functionality not
619 supported by the built-in default handler (such as SSL
620 client certicates, and HTTP authentication mechanisms).
621 'url' is the URL for the request, the 'gateway' argument
622 described earlier will form the basis of this URL.
623 'method' is the HTTP method which should be used when
624 carrying out the request.
625 'timeout' requests not successfully completed after this
626 many seconds should be aborted. If timeout is None, then
627 the handler should not set a timeout.
628 'headers' is a list of ("header-name","header-value") tuples
629 which must be passed to the pushgateway in the form of HTTP
630 request headers.
631 The function should raise an exception (e.g. IOError) on
632 failure.
633 'content' is the data which should be used to form the HTTP
634 Message Body.
635
636 This overwrites all metrics with the same job and grouping_key.
637 This uses the PUT HTTP method."""
638 _use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler)
639
640
641def pushadd_to_gateway(
642 gateway: str,
643 job: str,
644 registry: Optional[CollectorRegistry],
645 grouping_key: Optional[Dict[str, Any]] = None,
646 timeout: Optional[float] = 30,
647 handler: Callable = default_handler,
648) -> None:
649 """PushAdd metrics to the given pushgateway.
650
651 `gateway` the url for your push gateway. Either of the form
652 'http://pushgateway.local', or 'pushgateway.local'.
653 Scheme defaults to 'http' if none is provided
654 `job` is the job label to be attached to all pushed metrics
655 `registry` is an instance of CollectorRegistry
656 `grouping_key` please see the pushgateway documentation for details.
657 Defaults to None
658 `timeout` is how long push will attempt to connect before giving up.
659 Defaults to 30s, can be set to None for no timeout.
660 `handler` is an optional function which can be provided to perform
661 requests to the 'gateway'.
662 Defaults to None, in which case an http or https request
663 will be carried out by a default handler.
664 See the 'prometheus_client.push_to_gateway' documentation
665 for implementation requirements.
666
667 This replaces metrics with the same name, job and grouping_key.
668 This uses the POST HTTP method."""
669 _use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler)
670
671
672def delete_from_gateway(
673 gateway: str,
674 job: str,
675 grouping_key: Optional[Dict[str, Any]] = None,
676 timeout: Optional[float] = 30,
677 handler: Callable = default_handler,
678) -> None:
679 """Delete metrics from the given pushgateway.
680
681 `gateway` the url for your push gateway. Either of the form
682 'http://pushgateway.local', or 'pushgateway.local'.
683 Scheme defaults to 'http' if none is provided
684 `job` is the job label to be attached to all pushed metrics
685 `grouping_key` please see the pushgateway documentation for details.
686 Defaults to None
687 `timeout` is how long delete will attempt to connect before giving up.
688 Defaults to 30s, can be set to None for no timeout.
689 `handler` is an optional function which can be provided to perform
690 requests to the 'gateway'.
691 Defaults to None, in which case an http or https request
692 will be carried out by a default handler.
693 See the 'prometheus_client.push_to_gateway' documentation
694 for implementation requirements.
695
696 This deletes metrics with the given job and grouping_key.
697 This uses the DELETE HTTP method."""
698 _use_gateway('DELETE', gateway, job, None, grouping_key, timeout, handler)
699
700
701def _use_gateway(
702 method: str,
703 gateway: str,
704 job: str,
705 registry: Optional[CollectorRegistry],
706 grouping_key: Optional[Dict[str, Any]],
707 timeout: Optional[float],
708 handler: Callable,
709) -> None:
710 gateway_url = urlparse(gateway)
711 # See https://bugs.python.org/issue27657 for details on urlparse in py>=3.7.6.
712 if not gateway_url.scheme or gateway_url.scheme not in ['http', 'https']:
713 gateway = f'http://{gateway}'
714
715 gateway = gateway.rstrip('/')
716 url = '{}/metrics/{}/{}'.format(gateway, *_escape_grouping_key("job", job))
717
718 data = b''
719 if method != 'DELETE':
720 if registry is None:
721 registry = REGISTRY
722 data = generate_latest(registry)
723
724 if grouping_key is None:
725 grouping_key = {}
726 url += ''.join(
727 '/{}/{}'.format(*_escape_grouping_key(str(k), str(v)))
728 for k, v in sorted(grouping_key.items()))
729
730 handler(
731 url=url, method=method, timeout=timeout,
732 headers=[('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)], data=data,
733 )()
734
735
736def _escape_grouping_key(k, v):
737 if v == "":
738 # Per https://github.com/prometheus/pushgateway/pull/346.
739 return k + "@base64", "="
740 elif '/' in v:
741 # Added in Pushgateway 0.9.0.
742 return k + "@base64", base64.urlsafe_b64encode(v.encode("utf-8")).decode("utf-8")
743 else:
744 return k, quote_plus(v)
745
746
747def instance_ip_grouping_key() -> Dict[str, Any]:
748 """Grouping key with instance set to the IP Address of this host."""
749 with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s:
750 if sys.platform == 'darwin':
751 # This check is done this way only on MacOS devices
752 # it is done this way because the localhost method does
753 # not work.
754 # This method was adapted from this StackOverflow answer:
755 # https://stackoverflow.com/a/28950776
756 s.connect(('10.255.255.255', 1))
757 else:
758 s.connect(('localhost', 0))
759
760 return {'instance': s.getsockname()[0]}
761
762
763from .asgi import make_asgi_app # noqa