1import base64
2from contextlib import closing
3import gzip
4from http.server import BaseHTTPRequestHandler
5import os
6import socket
7from socketserver import ThreadingMixIn
8import ssl
9import sys
10import threading
11from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
12from urllib.error import HTTPError
13from urllib.parse import parse_qs, quote_plus, urlparse
14from urllib.request import (
15 BaseHandler, build_opener, HTTPHandler, HTTPRedirectHandler, HTTPSHandler,
16 Request,
17)
18from wsgiref.simple_server import make_server, WSGIRequestHandler, WSGIServer
19
20from .openmetrics import exposition as openmetrics
21from .registry import CollectorRegistry, REGISTRY
22from .utils import floatToGoString
23
24__all__ = (
25 'CONTENT_TYPE_LATEST',
26 'delete_from_gateway',
27 'generate_latest',
28 'instance_ip_grouping_key',
29 'make_asgi_app',
30 'make_wsgi_app',
31 'MetricsHandler',
32 'push_to_gateway',
33 'pushadd_to_gateway',
34 'start_http_server',
35 'start_wsgi_server',
36 'write_to_textfile',
37)
38
39CONTENT_TYPE_LATEST = 'text/plain; version=0.0.4; charset=utf-8'
40"""Content type of the latest text format"""
41
42
43class _PrometheusRedirectHandler(HTTPRedirectHandler):
44 """
45 Allow additional methods (e.g. PUT) and data forwarding in redirects.
46
47 Use of this class constitute a user's explicit agreement to the
48 redirect responses the Prometheus client will receive when using it.
49 You should only use this class if you control or otherwise trust the
50 redirect behavior involved and are certain it is safe to full transfer
51 the original request (method and data) to the redirected URL. For
52 example, if you know there is a cosmetic URL redirect in front of a
53 local deployment of a Prometheus server, and all redirects are safe,
54 this is the class to use to handle redirects in that case.
55
56 The standard HTTPRedirectHandler does not forward request data nor
57 does it allow redirected PUT requests (which Prometheus uses for some
58 operations, for example `push_to_gateway`) because these cannot
59 generically guarantee no violations of HTTP RFC 2616 requirements for
60 the user to explicitly confirm redirects that could have unexpected
61 side effects (such as rendering a PUT request non-idempotent or
62 creating multiple resources not named in the original request).
63 """
64
65 def redirect_request(self, req, fp, code, msg, headers, newurl):
66 """
67 Apply redirect logic to a request.
68
69 See parent HTTPRedirectHandler.redirect_request for parameter info.
70
71 If the redirect is disallowed, this raises the corresponding HTTP error.
72 If the redirect can't be determined, return None to allow other handlers
73 to try. If the redirect is allowed, return the new request.
74
75 This method specialized for the case when (a) the user knows that the
76 redirect will not cause unacceptable side effects for any request method,
77 and (b) the user knows that any request data should be passed through to
78 the redirect. If either condition is not met, this should not be used.
79 """
80 # note that requests being provided by a handler will use get_method to
81 # indicate the method, by monkeypatching this, instead of setting the
82 # Request object's method attribute.
83 m = getattr(req, "method", req.get_method())
84 if not (code in (301, 302, 303, 307) and m in ("GET", "HEAD")
85 or code in (301, 302, 303) and m in ("POST", "PUT")):
86 raise HTTPError(req.full_url, code, msg, headers, fp)
87 new_request = Request(
88 newurl.replace(' ', '%20'), # space escaping in new url if needed.
89 headers=req.headers,
90 origin_req_host=req.origin_req_host,
91 unverifiable=True,
92 data=req.data,
93 )
94 new_request.method = m
95 return new_request
96
97
98def _bake_output(registry, accept_header, accept_encoding_header, params, disable_compression):
99 """Bake output for metrics output."""
100 # Choose the correct plain text format of the output.
101 encoder, content_type = choose_encoder(accept_header)
102 if 'name[]' in params:
103 registry = registry.restricted_registry(params['name[]'])
104 output = encoder(registry)
105 headers = [('Content-Type', content_type)]
106 # If gzip encoding required, gzip the output.
107 if not disable_compression and gzip_accepted(accept_encoding_header):
108 output = gzip.compress(output)
109 headers.append(('Content-Encoding', 'gzip'))
110 return '200 OK', headers, output
111
112
113def make_wsgi_app(registry: CollectorRegistry = REGISTRY, disable_compression: bool = False) -> Callable:
114 """Create a WSGI app which serves the metrics from a registry."""
115
116 def prometheus_app(environ, start_response):
117 # Prepare parameters
118 accept_header = environ.get('HTTP_ACCEPT')
119 accept_encoding_header = environ.get('HTTP_ACCEPT_ENCODING')
120 params = parse_qs(environ.get('QUERY_STRING', ''))
121 if environ['PATH_INFO'] == '/favicon.ico':
122 # Serve empty response for browsers
123 status = '200 OK'
124 headers = [('', '')]
125 output = b''
126 else:
127 # Bake output
128 status, headers, output = _bake_output(registry, accept_header, accept_encoding_header, params, disable_compression)
129 # Return output
130 start_response(status, headers)
131 return [output]
132
133 return prometheus_app
134
135
136class _SilentHandler(WSGIRequestHandler):
137 """WSGI handler that does not log requests."""
138
139 def log_message(self, format, *args):
140 """Log nothing."""
141
142
143class ThreadingWSGIServer(ThreadingMixIn, WSGIServer):
144 """Thread per request HTTP server."""
145 # Make worker threads "fire and forget". Beginning with Python 3.7 this
146 # prevents a memory leak because ``ThreadingMixIn`` starts to gather all
147 # non-daemon threads in a list in order to join on them at server close.
148 daemon_threads = True
149
150
151def _get_best_family(address, port):
152 """Automatically select address family depending on address"""
153 # HTTPServer defaults to AF_INET, which will not start properly if
154 # binding an ipv6 address is requested.
155 # This function is based on what upstream python did for http.server
156 # in https://github.com/python/cpython/pull/11767
157 infos = socket.getaddrinfo(address, port, type=socket.SOCK_STREAM, flags=socket.AI_PASSIVE)
158 family, _, _, _, sockaddr = next(iter(infos))
159 return family, sockaddr[0]
160
161
162def _get_ssl_ctx(
163 certfile: str,
164 keyfile: str,
165 protocol: int,
166 cafile: Optional[str] = None,
167 capath: Optional[str] = None,
168 client_auth_required: bool = False,
169) -> ssl.SSLContext:
170 """Load context supports SSL."""
171 ssl_cxt = ssl.SSLContext(protocol=protocol)
172
173 if cafile is not None or capath is not None:
174 try:
175 ssl_cxt.load_verify_locations(cafile, capath)
176 except IOError as exc:
177 exc_type = type(exc)
178 msg = str(exc)
179 raise exc_type(f"Cannot load CA certificate chain from file "
180 f"{cafile!r} or directory {capath!r}: {msg}")
181 else:
182 try:
183 ssl_cxt.load_default_certs(purpose=ssl.Purpose.CLIENT_AUTH)
184 except IOError as exc:
185 exc_type = type(exc)
186 msg = str(exc)
187 raise exc_type(f"Cannot load default CA certificate chain: {msg}")
188
189 if client_auth_required:
190 ssl_cxt.verify_mode = ssl.CERT_REQUIRED
191
192 try:
193 ssl_cxt.load_cert_chain(certfile=certfile, keyfile=keyfile)
194 except IOError as exc:
195 exc_type = type(exc)
196 msg = str(exc)
197 raise exc_type(f"Cannot load server certificate file {certfile!r} or "
198 f"its private key file {keyfile!r}: {msg}")
199
200 return ssl_cxt
201
202
203def start_wsgi_server(
204 port: int,
205 addr: str = '0.0.0.0',
206 registry: CollectorRegistry = REGISTRY,
207 certfile: Optional[str] = None,
208 keyfile: Optional[str] = None,
209 client_cafile: Optional[str] = None,
210 client_capath: Optional[str] = None,
211 protocol: int = ssl.PROTOCOL_TLS_SERVER,
212 client_auth_required: bool = False,
213) -> Tuple[WSGIServer, threading.Thread]:
214 """Starts a WSGI server for prometheus metrics as a daemon thread."""
215
216 class TmpServer(ThreadingWSGIServer):
217 """Copy of ThreadingWSGIServer to update address_family locally"""
218
219 TmpServer.address_family, addr = _get_best_family(addr, port)
220 app = make_wsgi_app(registry)
221 httpd = make_server(addr, port, app, TmpServer, handler_class=_SilentHandler)
222 if certfile and keyfile:
223 context = _get_ssl_ctx(certfile, keyfile, protocol, client_cafile, client_capath, client_auth_required)
224 httpd.socket = context.wrap_socket(httpd.socket, server_side=True)
225 t = threading.Thread(target=httpd.serve_forever)
226 t.daemon = True
227 t.start()
228
229 return httpd, t
230
231
232start_http_server = start_wsgi_server
233
234
235def generate_latest(registry: CollectorRegistry = REGISTRY) -> bytes:
236 """Returns the metrics from the registry in latest text format as a string."""
237
238 def sample_line(line):
239 if line.labels:
240 labelstr = '{{{0}}}'.format(','.join(
241 ['{}="{}"'.format(
242 k, v.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"'))
243 for k, v in sorted(line.labels.items())]))
244 else:
245 labelstr = ''
246 timestamp = ''
247 if line.timestamp is not None:
248 # Convert to milliseconds.
249 timestamp = f' {int(float(line.timestamp) * 1000):d}'
250 return f'{line.name}{labelstr} {floatToGoString(line.value)}{timestamp}\n'
251
252 output = []
253 for metric in registry.collect():
254 try:
255 mname = metric.name
256 mtype = metric.type
257 # Munging from OpenMetrics into Prometheus format.
258 if mtype == 'counter':
259 mname = mname + '_total'
260 elif mtype == 'info':
261 mname = mname + '_info'
262 mtype = 'gauge'
263 elif mtype == 'stateset':
264 mtype = 'gauge'
265 elif mtype == 'gaugehistogram':
266 # A gauge histogram is really a gauge,
267 # but this captures the structure better.
268 mtype = 'histogram'
269 elif mtype == 'unknown':
270 mtype = 'untyped'
271
272 output.append('# HELP {} {}\n'.format(
273 mname, metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
274 output.append(f'# TYPE {mname} {mtype}\n')
275
276 om_samples: Dict[str, List[str]] = {}
277 for s in metric.samples:
278 for suffix in ['_created', '_gsum', '_gcount']:
279 if s.name == metric.name + suffix:
280 # OpenMetrics specific sample, put in a gauge at the end.
281 om_samples.setdefault(suffix, []).append(sample_line(s))
282 break
283 else:
284 output.append(sample_line(s))
285 except Exception as exception:
286 exception.args = (exception.args or ('',)) + (metric,)
287 raise
288
289 for suffix, lines in sorted(om_samples.items()):
290 output.append('# HELP {}{} {}\n'.format(metric.name, suffix,
291 metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
292 output.append(f'# TYPE {metric.name}{suffix} gauge\n')
293 output.extend(lines)
294 return ''.join(output).encode('utf-8')
295
296
297def choose_encoder(accept_header: str) -> Tuple[Callable[[CollectorRegistry], bytes], str]:
298 accept_header = accept_header or ''
299 for accepted in accept_header.split(','):
300 if accepted.split(';')[0].strip() == 'application/openmetrics-text':
301 return (openmetrics.generate_latest,
302 openmetrics.CONTENT_TYPE_LATEST)
303 return generate_latest, CONTENT_TYPE_LATEST
304
305
306def gzip_accepted(accept_encoding_header: str) -> bool:
307 accept_encoding_header = accept_encoding_header or ''
308 for accepted in accept_encoding_header.split(','):
309 if accepted.split(';')[0].strip().lower() == 'gzip':
310 return True
311 return False
312
313
314class MetricsHandler(BaseHTTPRequestHandler):
315 """HTTP handler that gives metrics from ``REGISTRY``."""
316 registry: CollectorRegistry = REGISTRY
317
318 def do_GET(self) -> None:
319 # Prepare parameters
320 registry = self.registry
321 accept_header = self.headers.get('Accept')
322 accept_encoding_header = self.headers.get('Accept-Encoding')
323 params = parse_qs(urlparse(self.path).query)
324 # Bake output
325 status, headers, output = _bake_output(registry, accept_header, accept_encoding_header, params, False)
326 # Return output
327 self.send_response(int(status.split(' ')[0]))
328 for header in headers:
329 self.send_header(*header)
330 self.end_headers()
331 self.wfile.write(output)
332
333 def log_message(self, format: str, *args: Any) -> None:
334 """Log nothing."""
335
336 @classmethod
337 def factory(cls, registry: CollectorRegistry) -> type:
338 """Returns a dynamic MetricsHandler class tied
339 to the passed registry.
340 """
341 # This implementation relies on MetricsHandler.registry
342 # (defined above and defaulted to REGISTRY).
343
344 # As we have unicode_literals, we need to create a str()
345 # object for type().
346 cls_name = str(cls.__name__)
347 MyMetricsHandler = type(cls_name, (cls, object),
348 {"registry": registry})
349 return MyMetricsHandler
350
351
352def write_to_textfile(path: str, registry: CollectorRegistry) -> None:
353 """Write metrics to the given path.
354
355 This is intended for use with the Node exporter textfile collector.
356 The path must end in .prom for the textfile collector to process it."""
357 tmppath = f'{path}.{os.getpid()}.{threading.current_thread().ident}'
358 with open(tmppath, 'wb') as f:
359 f.write(generate_latest(registry))
360
361 # rename(2) is atomic but fails on Windows if the destination file exists
362 if os.name == 'nt':
363 os.replace(tmppath, path)
364 else:
365 os.rename(tmppath, path)
366
367
368def _make_handler(
369 url: str,
370 method: str,
371 timeout: Optional[float],
372 headers: Sequence[Tuple[str, str]],
373 data: bytes,
374 base_handler: Union[BaseHandler, type],
375) -> Callable[[], None]:
376 def handle() -> None:
377 request = Request(url, data=data)
378 request.get_method = lambda: method # type: ignore
379 for k, v in headers:
380 request.add_header(k, v)
381 resp = build_opener(base_handler).open(request, timeout=timeout)
382 if resp.code >= 400:
383 raise OSError(f"error talking to pushgateway: {resp.code} {resp.msg}")
384
385 return handle
386
387
388def default_handler(
389 url: str,
390 method: str,
391 timeout: Optional[float],
392 headers: List[Tuple[str, str]],
393 data: bytes,
394) -> Callable[[], None]:
395 """Default handler that implements HTTP/HTTPS connections.
396
397 Used by the push_to_gateway functions. Can be re-used by other handlers."""
398
399 return _make_handler(url, method, timeout, headers, data, HTTPHandler)
400
401
402def passthrough_redirect_handler(
403 url: str,
404 method: str,
405 timeout: Optional[float],
406 headers: List[Tuple[str, str]],
407 data: bytes,
408) -> Callable[[], None]:
409 """
410 Handler that automatically trusts redirect responses for all HTTP methods.
411
412 Augments standard HTTPRedirectHandler capability by permitting PUT requests,
413 preserving the method upon redirect, and passing through all headers and
414 data from the original request. Only use this handler if you control or
415 trust the source of redirect responses you encounter when making requests
416 via the Prometheus client. This handler will simply repeat the identical
417 request, including same method and data, to the new redirect URL."""
418
419 return _make_handler(url, method, timeout, headers, data, _PrometheusRedirectHandler)
420
421
422def basic_auth_handler(
423 url: str,
424 method: str,
425 timeout: Optional[float],
426 headers: List[Tuple[str, str]],
427 data: bytes,
428 username: Optional[str] = None,
429 password: Optional[str] = None,
430) -> Callable[[], None]:
431 """Handler that implements HTTP/HTTPS connections with Basic Auth.
432
433 Sets auth headers using supplied 'username' and 'password', if set.
434 Used by the push_to_gateway functions. Can be re-used by other handlers."""
435
436 def handle():
437 """Handler that implements HTTP Basic Auth.
438 """
439 if username is not None and password is not None:
440 auth_value = f'{username}:{password}'.encode()
441 auth_token = base64.b64encode(auth_value)
442 auth_header = b'Basic ' + auth_token
443 headers.append(('Authorization', auth_header))
444 default_handler(url, method, timeout, headers, data)()
445
446 return handle
447
448
449def tls_auth_handler(
450 url: str,
451 method: str,
452 timeout: Optional[float],
453 headers: List[Tuple[str, str]],
454 data: bytes,
455 certfile: str,
456 keyfile: str,
457 cafile: Optional[str] = None,
458 protocol: int = ssl.PROTOCOL_TLS_CLIENT,
459 insecure_skip_verify: bool = False,
460) -> Callable[[], None]:
461 """Handler that implements an HTTPS connection with TLS Auth.
462
463 The default protocol (ssl.PROTOCOL_TLS_CLIENT) will also enable
464 ssl.CERT_REQUIRED and SSLContext.check_hostname by default. This can be
465 disabled by setting insecure_skip_verify to True.
466
467 Both this handler and the TLS feature on pushgateay are experimental."""
468 context = ssl.SSLContext(protocol=protocol)
469 if cafile is not None:
470 context.load_verify_locations(cafile)
471 else:
472 context.load_default_certs()
473
474 if insecure_skip_verify:
475 context.check_hostname = False
476 context.verify_mode = ssl.CERT_NONE
477
478 context.load_cert_chain(certfile=certfile, keyfile=keyfile)
479 handler = HTTPSHandler(context=context)
480 return _make_handler(url, method, timeout, headers, data, handler)
481
482
483def push_to_gateway(
484 gateway: str,
485 job: str,
486 registry: CollectorRegistry,
487 grouping_key: Optional[Dict[str, Any]] = None,
488 timeout: Optional[float] = 30,
489 handler: Callable = default_handler,
490) -> None:
491 """Push metrics to the given pushgateway.
492
493 `gateway` the url for your push gateway. Either of the form
494 'http://pushgateway.local', or 'pushgateway.local'.
495 Scheme defaults to 'http' if none is provided
496 `job` is the job label to be attached to all pushed metrics
497 `registry` is an instance of CollectorRegistry
498 `grouping_key` please see the pushgateway documentation for details.
499 Defaults to None
500 `timeout` is how long push will attempt to connect before giving up.
501 Defaults to 30s, can be set to None for no timeout.
502 `handler` is an optional function which can be provided to perform
503 requests to the 'gateway'.
504 Defaults to None, in which case an http or https request
505 will be carried out by a default handler.
506 If not None, the argument must be a function which accepts
507 the following arguments:
508 url, method, timeout, headers, and content
509 May be used to implement additional functionality not
510 supported by the built-in default handler (such as SSL
511 client certicates, and HTTP authentication mechanisms).
512 'url' is the URL for the request, the 'gateway' argument
513 described earlier will form the basis of this URL.
514 'method' is the HTTP method which should be used when
515 carrying out the request.
516 'timeout' requests not successfully completed after this
517 many seconds should be aborted. If timeout is None, then
518 the handler should not set a timeout.
519 'headers' is a list of ("header-name","header-value") tuples
520 which must be passed to the pushgateway in the form of HTTP
521 request headers.
522 The function should raise an exception (e.g. IOError) on
523 failure.
524 'content' is the data which should be used to form the HTTP
525 Message Body.
526
527 This overwrites all metrics with the same job and grouping_key.
528 This uses the PUT HTTP method."""
529 _use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler)
530
531
532def pushadd_to_gateway(
533 gateway: str,
534 job: str,
535 registry: Optional[CollectorRegistry],
536 grouping_key: Optional[Dict[str, Any]] = None,
537 timeout: Optional[float] = 30,
538 handler: Callable = default_handler,
539) -> None:
540 """PushAdd metrics to the given pushgateway.
541
542 `gateway` the url for your push gateway. Either of the form
543 'http://pushgateway.local', or 'pushgateway.local'.
544 Scheme defaults to 'http' if none is provided
545 `job` is the job label to be attached to all pushed metrics
546 `registry` is an instance of CollectorRegistry
547 `grouping_key` please see the pushgateway documentation for details.
548 Defaults to None
549 `timeout` is how long push will attempt to connect before giving up.
550 Defaults to 30s, can be set to None for no timeout.
551 `handler` is an optional function which can be provided to perform
552 requests to the 'gateway'.
553 Defaults to None, in which case an http or https request
554 will be carried out by a default handler.
555 See the 'prometheus_client.push_to_gateway' documentation
556 for implementation requirements.
557
558 This replaces metrics with the same name, job and grouping_key.
559 This uses the POST HTTP method."""
560 _use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler)
561
562
563def delete_from_gateway(
564 gateway: str,
565 job: str,
566 grouping_key: Optional[Dict[str, Any]] = None,
567 timeout: Optional[float] = 30,
568 handler: Callable = default_handler,
569) -> None:
570 """Delete metrics from the given pushgateway.
571
572 `gateway` the url for your push gateway. Either of the form
573 'http://pushgateway.local', or 'pushgateway.local'.
574 Scheme defaults to 'http' if none is provided
575 `job` is the job label to be attached to all pushed metrics
576 `grouping_key` please see the pushgateway documentation for details.
577 Defaults to None
578 `timeout` is how long delete will attempt to connect before giving up.
579 Defaults to 30s, can be set to None for no timeout.
580 `handler` is an optional function which can be provided to perform
581 requests to the 'gateway'.
582 Defaults to None, in which case an http or https request
583 will be carried out by a default handler.
584 See the 'prometheus_client.push_to_gateway' documentation
585 for implementation requirements.
586
587 This deletes metrics with the given job and grouping_key.
588 This uses the DELETE HTTP method."""
589 _use_gateway('DELETE', gateway, job, None, grouping_key, timeout, handler)
590
591
592def _use_gateway(
593 method: str,
594 gateway: str,
595 job: str,
596 registry: Optional[CollectorRegistry],
597 grouping_key: Optional[Dict[str, Any]],
598 timeout: Optional[float],
599 handler: Callable,
600) -> None:
601 gateway_url = urlparse(gateway)
602 # See https://bugs.python.org/issue27657 for details on urlparse in py>=3.7.6.
603 if not gateway_url.scheme or gateway_url.scheme not in ['http', 'https']:
604 gateway = f'http://{gateway}'
605
606 gateway = gateway.rstrip('/')
607 url = '{}/metrics/{}/{}'.format(gateway, *_escape_grouping_key("job", job))
608
609 data = b''
610 if method != 'DELETE':
611 if registry is None:
612 registry = REGISTRY
613 data = generate_latest(registry)
614
615 if grouping_key is None:
616 grouping_key = {}
617 url += ''.join(
618 '/{}/{}'.format(*_escape_grouping_key(str(k), str(v)))
619 for k, v in sorted(grouping_key.items()))
620
621 handler(
622 url=url, method=method, timeout=timeout,
623 headers=[('Content-Type', CONTENT_TYPE_LATEST)], data=data,
624 )()
625
626
627def _escape_grouping_key(k, v):
628 if v == "":
629 # Per https://github.com/prometheus/pushgateway/pull/346.
630 return k + "@base64", "="
631 elif '/' in v:
632 # Added in Pushgateway 0.9.0.
633 return k + "@base64", base64.urlsafe_b64encode(v.encode("utf-8")).decode("utf-8")
634 else:
635 return k, quote_plus(v)
636
637
638def instance_ip_grouping_key() -> Dict[str, Any]:
639 """Grouping key with instance set to the IP Address of this host."""
640 with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s:
641 if sys.platform == 'darwin':
642 # This check is done this way only on MacOS devices
643 # it is done this way because the localhost method does
644 # not work.
645 # This method was adapted from this StackOverflow answer:
646 # https://stackoverflow.com/a/28950776
647 s.connect(('10.255.255.255', 1))
648 else:
649 s.connect(('localhost', 0))
650
651 return {'instance': s.getsockname()[0]}
652
653
654from .asgi import make_asgi_app # noqa