Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prometheus_client/exposition.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

311 statements  

1import base64 

2from contextlib import closing 

3from functools import partial 

4import gzip 

5from http.server import BaseHTTPRequestHandler 

6import os 

7import socket 

8from socketserver import ThreadingMixIn 

9import ssl 

10import sys 

11import threading 

12from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union 

13from urllib.error import HTTPError 

14from urllib.parse import parse_qs, quote_plus, urlparse 

15from urllib.request import ( 

16 BaseHandler, build_opener, HTTPHandler, HTTPRedirectHandler, HTTPSHandler, 

17 Request, 

18) 

19from wsgiref.simple_server import make_server, WSGIRequestHandler, WSGIServer 

20 

21from .openmetrics import exposition as openmetrics 

22from .registry import CollectorRegistry, REGISTRY 

23from .utils import floatToGoString, parse_version 

24 

25__all__ = ( 

26 'CONTENT_TYPE_LATEST', 

27 'CONTENT_TYPE_PLAIN_0_0_4', 

28 'CONTENT_TYPE_PLAIN_1_0_0', 

29 'delete_from_gateway', 

30 'generate_latest', 

31 'instance_ip_grouping_key', 

32 'make_asgi_app', 

33 'make_wsgi_app', 

34 'MetricsHandler', 

35 'push_to_gateway', 

36 'pushadd_to_gateway', 

37 'start_http_server', 

38 'start_wsgi_server', 

39 'write_to_textfile', 

40) 

41 

42CONTENT_TYPE_PLAIN_0_0_4 = 'text/plain; version=0.0.4; charset=utf-8' 

43"""Content type of the compatibility format""" 

44 

45CONTENT_TYPE_PLAIN_1_0_0 = 'text/plain; version=1.0.0; charset=utf-8' 

46"""Content type of the latest format""" 

47 

48CONTENT_TYPE_LATEST = CONTENT_TYPE_PLAIN_1_0_0 

49 

50 

51class _PrometheusRedirectHandler(HTTPRedirectHandler): 

52 """ 

53 Allow additional methods (e.g. PUT) and data forwarding in redirects. 

54 

55 Use of this class constitute a user's explicit agreement to the 

56 redirect responses the Prometheus client will receive when using it. 

57 You should only use this class if you control or otherwise trust the 

58 redirect behavior involved and are certain it is safe to full transfer 

59 the original request (method and data) to the redirected URL. For 

60 example, if you know there is a cosmetic URL redirect in front of a 

61 local deployment of a Prometheus server, and all redirects are safe, 

62 this is the class to use to handle redirects in that case. 

63 

64 The standard HTTPRedirectHandler does not forward request data nor 

65 does it allow redirected PUT requests (which Prometheus uses for some 

66 operations, for example `push_to_gateway`) because these cannot 

67 generically guarantee no violations of HTTP RFC 2616 requirements for 

68 the user to explicitly confirm redirects that could have unexpected 

69 side effects (such as rendering a PUT request non-idempotent or 

70 creating multiple resources not named in the original request). 

71 """ 

72 

73 def redirect_request(self, req, fp, code, msg, headers, newurl): 

74 """ 

75 Apply redirect logic to a request. 

76 

77 See parent HTTPRedirectHandler.redirect_request for parameter info. 

78 

79 If the redirect is disallowed, this raises the corresponding HTTP error. 

80 If the redirect can't be determined, return None to allow other handlers 

81 to try. If the redirect is allowed, return the new request. 

82 

83 This method specialized for the case when (a) the user knows that the 

84 redirect will not cause unacceptable side effects for any request method, 

85 and (b) the user knows that any request data should be passed through to 

86 the redirect. If either condition is not met, this should not be used. 

87 """ 

88 # note that requests being provided by a handler will use get_method to 

89 # indicate the method, by monkeypatching this, instead of setting the 

90 # Request object's method attribute. 

91 m = getattr(req, "method", req.get_method()) 

92 if not (code in (301, 302, 303, 307) and m in ("GET", "HEAD") 

93 or code in (301, 302, 303) and m in ("POST", "PUT")): 

94 raise HTTPError(req.full_url, code, msg, headers, fp) 

95 new_request = Request( 

96 newurl.replace(' ', '%20'), # space escaping in new url if needed. 

97 headers=req.headers, 

98 origin_req_host=req.origin_req_host, 

99 unverifiable=True, 

100 data=req.data, 

101 ) 

102 new_request.method = m 

103 return new_request 

104 

105 

106def _bake_output(registry, accept_header, accept_encoding_header, params, disable_compression): 

107 """Bake output for metrics output.""" 

108 # Choose the correct plain text format of the output. 

109 encoder, content_type = choose_encoder(accept_header) 

110 if 'name[]' in params: 

111 registry = registry.restricted_registry(params['name[]']) 

112 output = encoder(registry) 

113 headers = [('Content-Type', content_type)] 

114 # If gzip encoding required, gzip the output. 

115 if not disable_compression and gzip_accepted(accept_encoding_header): 

116 output = gzip.compress(output) 

117 headers.append(('Content-Encoding', 'gzip')) 

118 return '200 OK', headers, output 

119 

120 

121def make_wsgi_app(registry: CollectorRegistry = REGISTRY, disable_compression: bool = False) -> Callable: 

122 """Create a WSGI app which serves the metrics from a registry.""" 

123 

124 def prometheus_app(environ, start_response): 

125 # Prepare parameters 

126 accept_header = environ.get('HTTP_ACCEPT') 

127 accept_encoding_header = environ.get('HTTP_ACCEPT_ENCODING') 

128 params = parse_qs(environ.get('QUERY_STRING', '')) 

129 method = environ['REQUEST_METHOD'] 

130 

131 if method == 'OPTIONS': 

132 status = '200 OK' 

133 headers = [('Allow', 'OPTIONS,GET')] 

134 output = b'' 

135 elif method != 'GET': 

136 status = '405 Method Not Allowed' 

137 headers = [('Allow', 'OPTIONS,GET')] 

138 output = '# HTTP {}: {}; use OPTIONS or GET\n'.format(status, method).encode() 

139 elif environ['PATH_INFO'] == '/favicon.ico': 

140 # Serve empty response for browsers 

141 status = '200 OK' 

142 headers = [] 

143 output = b'' 

144 else: 

145 # Note: For backwards compatibility, the URI path for GET is not 

146 # constrained to the documented /metrics, but any path is allowed. 

147 # Bake output 

148 status, headers, output = _bake_output(registry, accept_header, accept_encoding_header, params, disable_compression) 

149 # Return output 

150 start_response(status, headers) 

151 return [output] 

152 

153 return prometheus_app 

154 

155 

156class _SilentHandler(WSGIRequestHandler): 

157 """WSGI handler that does not log requests.""" 

158 

159 def log_message(self, format, *args): 

160 """Log nothing.""" 

161 

162 

163class ThreadingWSGIServer(ThreadingMixIn, WSGIServer): 

164 """Thread per request HTTP server.""" 

165 # Make worker threads "fire and forget". Beginning with Python 3.7 this 

166 # prevents a memory leak because ``ThreadingMixIn`` starts to gather all 

167 # non-daemon threads in a list in order to join on them at server close. 

168 daemon_threads = True 

169 

170 

171def _get_best_family(address, port): 

172 """Automatically select address family depending on address""" 

173 # HTTPServer defaults to AF_INET, which will not start properly if 

174 # binding an ipv6 address is requested. 

175 # This function is based on what upstream python did for http.server 

176 # in https://github.com/python/cpython/pull/11767 

177 infos = socket.getaddrinfo(address, port, type=socket.SOCK_STREAM, flags=socket.AI_PASSIVE) 

178 family, _, _, _, sockaddr = next(iter(infos)) 

179 return family, sockaddr[0] 

180 

181 

182def _get_ssl_ctx( 

183 certfile: str, 

184 keyfile: str, 

185 protocol: int, 

186 cafile: Optional[str] = None, 

187 capath: Optional[str] = None, 

188 client_auth_required: bool = False, 

189) -> ssl.SSLContext: 

190 """Load context supports SSL.""" 

191 ssl_cxt = ssl.SSLContext(protocol=protocol) 

192 

193 if cafile is not None or capath is not None: 

194 try: 

195 ssl_cxt.load_verify_locations(cafile, capath) 

196 except IOError as exc: 

197 exc_type = type(exc) 

198 msg = str(exc) 

199 raise exc_type(f"Cannot load CA certificate chain from file " 

200 f"{cafile!r} or directory {capath!r}: {msg}") 

201 else: 

202 try: 

203 ssl_cxt.load_default_certs(purpose=ssl.Purpose.CLIENT_AUTH) 

204 except IOError as exc: 

205 exc_type = type(exc) 

206 msg = str(exc) 

207 raise exc_type(f"Cannot load default CA certificate chain: {msg}") 

208 

209 if client_auth_required: 

210 ssl_cxt.verify_mode = ssl.CERT_REQUIRED 

211 

212 try: 

213 ssl_cxt.load_cert_chain(certfile=certfile, keyfile=keyfile) 

214 except IOError as exc: 

215 exc_type = type(exc) 

216 msg = str(exc) 

217 raise exc_type(f"Cannot load server certificate file {certfile!r} or " 

218 f"its private key file {keyfile!r}: {msg}") 

219 

220 return ssl_cxt 

221 

222 

223def start_wsgi_server( 

224 port: int, 

225 addr: str = '0.0.0.0', 

226 registry: CollectorRegistry = REGISTRY, 

227 certfile: Optional[str] = None, 

228 keyfile: Optional[str] = None, 

229 client_cafile: Optional[str] = None, 

230 client_capath: Optional[str] = None, 

231 protocol: int = ssl.PROTOCOL_TLS_SERVER, 

232 client_auth_required: bool = False, 

233) -> Tuple[WSGIServer, threading.Thread]: 

234 """Starts a WSGI server for prometheus metrics as a daemon thread.""" 

235 

236 class TmpServer(ThreadingWSGIServer): 

237 """Copy of ThreadingWSGIServer to update address_family locally""" 

238 

239 TmpServer.address_family, addr = _get_best_family(addr, port) 

240 app = make_wsgi_app(registry) 

241 httpd = make_server(addr, port, app, TmpServer, handler_class=_SilentHandler) 

242 if certfile and keyfile: 

243 context = _get_ssl_ctx(certfile, keyfile, protocol, client_cafile, client_capath, client_auth_required) 

244 httpd.socket = context.wrap_socket(httpd.socket, server_side=True) 

245 t = threading.Thread(target=httpd.serve_forever) 

246 t.daemon = True 

247 t.start() 

248 

249 return httpd, t 

250 

251 

252start_http_server = start_wsgi_server 

253 

254 

255def generate_latest(registry: CollectorRegistry = REGISTRY, escaping: str = openmetrics.UNDERSCORES) -> bytes: 

256 """ 

257 Generates the exposition format using the basic Prometheus text format. 

258 

259 Params: 

260 registry: CollectorRegistry to export data from. 

261 escaping: Escaping scheme used for metric and label names. 

262 

263 Returns: UTF-8 encoded string containing the metrics in text format. 

264 """ 

265 

266 def sample_line(samples): 

267 if samples.labels: 

268 labelstr = '{0}'.format(','.join( 

269 # Label values always support UTF-8 

270 ['{}="{}"'.format( 

271 openmetrics.escape_label_name(k, escaping), openmetrics._escape(v, openmetrics.ALLOWUTF8, False)) 

272 for k, v in sorted(samples.labels.items())])) 

273 else: 

274 labelstr = '' 

275 timestamp = '' 

276 if samples.timestamp is not None: 

277 # Convert to milliseconds. 

278 timestamp = f' {int(float(samples.timestamp) * 1000):d}' 

279 if escaping != openmetrics.ALLOWUTF8 or openmetrics._is_valid_legacy_metric_name(samples.name): 

280 if labelstr: 

281 labelstr = '{{{0}}}'.format(labelstr) 

282 return f'{openmetrics.escape_metric_name(samples.name, escaping)}{labelstr} {floatToGoString(samples.value)}{timestamp}\n' 

283 maybe_comma = '' 

284 if labelstr: 

285 maybe_comma = ',' 

286 return f'{{{openmetrics.escape_metric_name(samples.name, escaping)}{maybe_comma}{labelstr}}} {floatToGoString(samples.value)}{timestamp}\n' 

287 

288 output = [] 

289 for metric in registry.collect(): 

290 try: 

291 mname = metric.name 

292 mtype = metric.type 

293 # Munging from OpenMetrics into Prometheus format. 

294 if mtype == 'counter': 

295 mname = mname + '_total' 

296 elif mtype == 'info': 

297 mname = mname + '_info' 

298 mtype = 'gauge' 

299 elif mtype == 'stateset': 

300 mtype = 'gauge' 

301 elif mtype == 'gaugehistogram': 

302 # A gauge histogram is really a gauge, 

303 # but this captures the structure better. 

304 mtype = 'histogram' 

305 elif mtype == 'unknown': 

306 mtype = 'untyped' 

307 

308 output.append('# HELP {} {}\n'.format( 

309 openmetrics.escape_metric_name(mname, escaping), metric.documentation.replace('\\', r'\\').replace('\n', r'\n'))) 

310 output.append(f'# TYPE {openmetrics.escape_metric_name(mname, escaping)} {mtype}\n') 

311 

312 om_samples: Dict[str, List[str]] = {} 

313 for s in metric.samples: 

314 for suffix in ['_created', '_gsum', '_gcount']: 

315 if s.name == metric.name + suffix: 

316 # OpenMetrics specific sample, put in a gauge at the end. 

317 om_samples.setdefault(suffix, []).append(sample_line(s)) 

318 break 

319 else: 

320 output.append(sample_line(s)) 

321 except Exception as exception: 

322 exception.args = (exception.args or ('',)) + (metric,) 

323 raise 

324 

325 for suffix, lines in sorted(om_samples.items()): 

326 output.append('# HELP {} {}\n'.format(openmetrics.escape_metric_name(metric.name + suffix, escaping), 

327 metric.documentation.replace('\\', r'\\').replace('\n', r'\n'))) 

328 output.append(f'# TYPE {openmetrics.escape_metric_name(metric.name + suffix, escaping)} gauge\n') 

329 output.extend(lines) 

330 return ''.join(output).encode('utf-8') 

331 

332 

333def choose_encoder(accept_header: str) -> Tuple[Callable[[CollectorRegistry], bytes], str]: 

334 # Python client library accepts a narrower range of content-types than 

335 # Prometheus does. 

336 accept_header = accept_header or '' 

337 escaping = openmetrics.UNDERSCORES 

338 for accepted in accept_header.split(','): 

339 if accepted.split(';')[0].strip() == 'application/openmetrics-text': 

340 toks = accepted.split(';') 

341 version = _get_version(toks) 

342 escaping = _get_escaping(toks) 

343 # Only return an escaping header if we have a good version and 

344 # mimetype. 

345 if not version: 

346 return (partial(openmetrics.generate_latest, escaping=openmetrics.UNDERSCORES, version="1.0.0"), openmetrics.CONTENT_TYPE_LATEST) 

347 if version and parse_version(version) >= (1, 0, 0): 

348 return (partial(openmetrics.generate_latest, escaping=escaping, version=version), 

349 f'application/openmetrics-text; version={version}; charset=utf-8; escaping=' + str(escaping)) 

350 elif accepted.split(';')[0].strip() == 'text/plain': 

351 toks = accepted.split(';') 

352 version = _get_version(toks) 

353 escaping = _get_escaping(toks) 

354 # Only return an escaping header if we have a good version and 

355 # mimetype. 

356 if version and parse_version(version) >= (1, 0, 0): 

357 return (partial(generate_latest, escaping=escaping), 

358 CONTENT_TYPE_LATEST + '; escaping=' + str(escaping)) 

359 return generate_latest, CONTENT_TYPE_PLAIN_0_0_4 

360 

361 

362def _get_version(accept_header: List[str]) -> str: 

363 """Return the version tag from the Accept header. 

364 

365 If no version is specified, returns empty string.""" 

366 

367 for tok in accept_header: 

368 if '=' not in tok: 

369 continue 

370 key, value = tok.strip().split('=', 1) 

371 if key == 'version': 

372 return value 

373 return "" 

374 

375 

376def _get_escaping(accept_header: List[str]) -> str: 

377 """Return the escaping scheme from the Accept header. 

378 

379 If no escaping scheme is specified or the scheme is not one of the allowed 

380 strings, defaults to UNDERSCORES.""" 

381 

382 for tok in accept_header: 

383 if '=' not in tok: 

384 continue 

385 key, value = tok.strip().split('=', 1) 

386 if key != 'escaping': 

387 continue 

388 if value == openmetrics.ALLOWUTF8: 

389 return openmetrics.ALLOWUTF8 

390 elif value == openmetrics.UNDERSCORES: 

391 return openmetrics.UNDERSCORES 

392 elif value == openmetrics.DOTS: 

393 return openmetrics.DOTS 

394 elif value == openmetrics.VALUES: 

395 return openmetrics.VALUES 

396 else: 

397 return openmetrics.UNDERSCORES 

398 return openmetrics.UNDERSCORES 

399 

400 

401def gzip_accepted(accept_encoding_header: str) -> bool: 

402 accept_encoding_header = accept_encoding_header or '' 

403 for accepted in accept_encoding_header.split(','): 

404 if accepted.split(';')[0].strip().lower() == 'gzip': 

405 return True 

406 return False 

407 

408 

409class MetricsHandler(BaseHTTPRequestHandler): 

410 """HTTP handler that gives metrics from ``REGISTRY``.""" 

411 registry: CollectorRegistry = REGISTRY 

412 

413 def do_GET(self) -> None: 

414 # Prepare parameters 

415 registry = self.registry 

416 accept_header = self.headers.get('Accept') 

417 accept_encoding_header = self.headers.get('Accept-Encoding') 

418 params = parse_qs(urlparse(self.path).query) 

419 # Bake output 

420 status, headers, output = _bake_output(registry, accept_header, accept_encoding_header, params, False) 

421 # Return output 

422 self.send_response(int(status.split(' ')[0])) 

423 for header in headers: 

424 self.send_header(*header) 

425 self.end_headers() 

426 self.wfile.write(output) 

427 

428 def log_message(self, format: str, *args: Any) -> None: 

429 """Log nothing.""" 

430 

431 @classmethod 

432 def factory(cls, registry: CollectorRegistry) -> type: 

433 """Returns a dynamic MetricsHandler class tied 

434 to the passed registry. 

435 """ 

436 # This implementation relies on MetricsHandler.registry 

437 # (defined above and defaulted to REGISTRY). 

438 

439 # As we have unicode_literals, we need to create a str() 

440 # object for type(). 

441 cls_name = str(cls.__name__) 

442 MyMetricsHandler = type(cls_name, (cls, object), 

443 {"registry": registry}) 

444 return MyMetricsHandler 

445 

446 

447def write_to_textfile(path: str, registry: CollectorRegistry, escaping: str = openmetrics.ALLOWUTF8, tmpdir: Optional[str] = None) -> None: 

448 """Write metrics to the given path. 

449 

450 This is intended for use with the Node exporter textfile collector. 

451 The path must end in .prom for the textfile collector to process it. 

452 

453 An optional tmpdir parameter can be set to determine where the 

454 metrics will be temporarily written to. If not set, it will be in 

455 the same directory as the .prom file. If provided, the path MUST be 

456 on the same filesystem.""" 

457 if tmpdir is not None: 

458 filename = os.path.basename(path) 

459 tmppath = f'{os.path.join(tmpdir, filename)}.{os.getpid()}.{threading.current_thread().ident}' 

460 else: 

461 tmppath = f'{path}.{os.getpid()}.{threading.current_thread().ident}' 

462 try: 

463 with open(tmppath, 'wb') as f: 

464 f.write(generate_latest(registry, escaping)) 

465 

466 # rename(2) is atomic but fails on Windows if the destination file exists 

467 if os.name == 'nt': 

468 os.replace(tmppath, path) 

469 else: 

470 os.rename(tmppath, path) 

471 except Exception: 

472 if os.path.exists(tmppath): 

473 os.remove(tmppath) 

474 raise 

475 

476 

477def _make_handler( 

478 url: str, 

479 method: str, 

480 timeout: Optional[float], 

481 headers: Sequence[Tuple[str, str]], 

482 data: bytes, 

483 base_handler: Union[BaseHandler, type], 

484) -> Callable[[], None]: 

485 def handle() -> None: 

486 request = Request(url, data=data) 

487 request.get_method = lambda: method # type: ignore 

488 for k, v in headers: 

489 request.add_header(k, v) 

490 resp = build_opener(base_handler).open(request, timeout=timeout) 

491 if resp.code >= 400: 

492 raise OSError(f"error talking to pushgateway: {resp.code} {resp.msg}") 

493 

494 return handle 

495 

496 

497def default_handler( 

498 url: str, 

499 method: str, 

500 timeout: Optional[float], 

501 headers: List[Tuple[str, str]], 

502 data: bytes, 

503) -> Callable[[], None]: 

504 """Default handler that implements HTTP/HTTPS connections. 

505 

506 Used by the push_to_gateway functions. Can be re-used by other handlers.""" 

507 

508 return _make_handler(url, method, timeout, headers, data, HTTPHandler) 

509 

510 

511def passthrough_redirect_handler( 

512 url: str, 

513 method: str, 

514 timeout: Optional[float], 

515 headers: List[Tuple[str, str]], 

516 data: bytes, 

517) -> Callable[[], None]: 

518 """ 

519 Handler that automatically trusts redirect responses for all HTTP methods. 

520 

521 Augments standard HTTPRedirectHandler capability by permitting PUT requests, 

522 preserving the method upon redirect, and passing through all headers and 

523 data from the original request. Only use this handler if you control or 

524 trust the source of redirect responses you encounter when making requests 

525 via the Prometheus client. This handler will simply repeat the identical 

526 request, including same method and data, to the new redirect URL.""" 

527 

528 return _make_handler(url, method, timeout, headers, data, _PrometheusRedirectHandler) 

529 

530 

531def basic_auth_handler( 

532 url: str, 

533 method: str, 

534 timeout: Optional[float], 

535 headers: List[Tuple[str, str]], 

536 data: bytes, 

537 username: Optional[str] = None, 

538 password: Optional[str] = None, 

539) -> Callable[[], None]: 

540 """Handler that implements HTTP/HTTPS connections with Basic Auth. 

541 

542 Sets auth headers using supplied 'username' and 'password', if set. 

543 Used by the push_to_gateway functions. Can be re-used by other handlers.""" 

544 

545 def handle(): 

546 """Handler that implements HTTP Basic Auth. 

547 """ 

548 if username is not None and password is not None: 

549 auth_value = f'{username}:{password}'.encode() 

550 auth_token = base64.b64encode(auth_value) 

551 auth_header = b'Basic ' + auth_token 

552 headers.append(('Authorization', auth_header)) 

553 default_handler(url, method, timeout, headers, data)() 

554 

555 return handle 

556 

557 

558def tls_auth_handler( 

559 url: str, 

560 method: str, 

561 timeout: Optional[float], 

562 headers: List[Tuple[str, str]], 

563 data: bytes, 

564 certfile: str, 

565 keyfile: str, 

566 cafile: Optional[str] = None, 

567 protocol: int = ssl.PROTOCOL_TLS_CLIENT, 

568 insecure_skip_verify: bool = False, 

569) -> Callable[[], None]: 

570 """Handler that implements an HTTPS connection with TLS Auth. 

571 

572 The default protocol (ssl.PROTOCOL_TLS_CLIENT) will also enable 

573 ssl.CERT_REQUIRED and SSLContext.check_hostname by default. This can be 

574 disabled by setting insecure_skip_verify to True. 

575 

576 Both this handler and the TLS feature on pushgateay are experimental.""" 

577 context = ssl.SSLContext(protocol=protocol) 

578 if cafile is not None: 

579 context.load_verify_locations(cafile) 

580 else: 

581 context.load_default_certs() 

582 

583 if insecure_skip_verify: 

584 context.check_hostname = False 

585 context.verify_mode = ssl.CERT_NONE 

586 

587 context.load_cert_chain(certfile=certfile, keyfile=keyfile) 

588 handler = HTTPSHandler(context=context) 

589 return _make_handler(url, method, timeout, headers, data, handler) 

590 

591 

592def push_to_gateway( 

593 gateway: str, 

594 job: str, 

595 registry: CollectorRegistry, 

596 grouping_key: Optional[Dict[str, Any]] = None, 

597 timeout: Optional[float] = 30, 

598 handler: Callable = default_handler, 

599) -> None: 

600 """Push metrics to the given pushgateway. 

601 

602 `gateway` the url for your push gateway. Either of the form 

603 'http://pushgateway.local', or 'pushgateway.local'. 

604 Scheme defaults to 'http' if none is provided 

605 `job` is the job label to be attached to all pushed metrics 

606 `registry` is an instance of CollectorRegistry 

607 `grouping_key` please see the pushgateway documentation for details. 

608 Defaults to None 

609 `timeout` is how long push will attempt to connect before giving up. 

610 Defaults to 30s, can be set to None for no timeout. 

611 `handler` is an optional function which can be provided to perform 

612 requests to the 'gateway'. 

613 Defaults to None, in which case an http or https request 

614 will be carried out by a default handler. 

615 If not None, the argument must be a function which accepts 

616 the following arguments: 

617 url, method, timeout, headers, and content 

618 May be used to implement additional functionality not 

619 supported by the built-in default handler (such as SSL 

620 client certicates, and HTTP authentication mechanisms). 

621 'url' is the URL for the request, the 'gateway' argument 

622 described earlier will form the basis of this URL. 

623 'method' is the HTTP method which should be used when 

624 carrying out the request. 

625 'timeout' requests not successfully completed after this 

626 many seconds should be aborted. If timeout is None, then 

627 the handler should not set a timeout. 

628 'headers' is a list of ("header-name","header-value") tuples 

629 which must be passed to the pushgateway in the form of HTTP 

630 request headers. 

631 The function should raise an exception (e.g. IOError) on 

632 failure. 

633 'content' is the data which should be used to form the HTTP 

634 Message Body. 

635 

636 This overwrites all metrics with the same job and grouping_key. 

637 This uses the PUT HTTP method.""" 

638 _use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler) 

639 

640 

641def pushadd_to_gateway( 

642 gateway: str, 

643 job: str, 

644 registry: Optional[CollectorRegistry], 

645 grouping_key: Optional[Dict[str, Any]] = None, 

646 timeout: Optional[float] = 30, 

647 handler: Callable = default_handler, 

648) -> None: 

649 """PushAdd metrics to the given pushgateway. 

650 

651 `gateway` the url for your push gateway. Either of the form 

652 'http://pushgateway.local', or 'pushgateway.local'. 

653 Scheme defaults to 'http' if none is provided 

654 `job` is the job label to be attached to all pushed metrics 

655 `registry` is an instance of CollectorRegistry 

656 `grouping_key` please see the pushgateway documentation for details. 

657 Defaults to None 

658 `timeout` is how long push will attempt to connect before giving up. 

659 Defaults to 30s, can be set to None for no timeout. 

660 `handler` is an optional function which can be provided to perform 

661 requests to the 'gateway'. 

662 Defaults to None, in which case an http or https request 

663 will be carried out by a default handler. 

664 See the 'prometheus_client.push_to_gateway' documentation 

665 for implementation requirements. 

666 

667 This replaces metrics with the same name, job and grouping_key. 

668 This uses the POST HTTP method.""" 

669 _use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler) 

670 

671 

672def delete_from_gateway( 

673 gateway: str, 

674 job: str, 

675 grouping_key: Optional[Dict[str, Any]] = None, 

676 timeout: Optional[float] = 30, 

677 handler: Callable = default_handler, 

678) -> None: 

679 """Delete metrics from the given pushgateway. 

680 

681 `gateway` the url for your push gateway. Either of the form 

682 'http://pushgateway.local', or 'pushgateway.local'. 

683 Scheme defaults to 'http' if none is provided 

684 `job` is the job label to be attached to all pushed metrics 

685 `grouping_key` please see the pushgateway documentation for details. 

686 Defaults to None 

687 `timeout` is how long delete will attempt to connect before giving up. 

688 Defaults to 30s, can be set to None for no timeout. 

689 `handler` is an optional function which can be provided to perform 

690 requests to the 'gateway'. 

691 Defaults to None, in which case an http or https request 

692 will be carried out by a default handler. 

693 See the 'prometheus_client.push_to_gateway' documentation 

694 for implementation requirements. 

695 

696 This deletes metrics with the given job and grouping_key. 

697 This uses the DELETE HTTP method.""" 

698 _use_gateway('DELETE', gateway, job, None, grouping_key, timeout, handler) 

699 

700 

701def _use_gateway( 

702 method: str, 

703 gateway: str, 

704 job: str, 

705 registry: Optional[CollectorRegistry], 

706 grouping_key: Optional[Dict[str, Any]], 

707 timeout: Optional[float], 

708 handler: Callable, 

709) -> None: 

710 gateway_url = urlparse(gateway) 

711 # See https://bugs.python.org/issue27657 for details on urlparse in py>=3.7.6. 

712 if not gateway_url.scheme or gateway_url.scheme not in ['http', 'https']: 

713 gateway = f'http://{gateway}' 

714 

715 gateway = gateway.rstrip('/') 

716 url = '{}/metrics/{}/{}'.format(gateway, *_escape_grouping_key("job", job)) 

717 

718 data = b'' 

719 if method != 'DELETE': 

720 if registry is None: 

721 registry = REGISTRY 

722 data = generate_latest(registry) 

723 

724 if grouping_key is None: 

725 grouping_key = {} 

726 url += ''.join( 

727 '/{}/{}'.format(*_escape_grouping_key(str(k), str(v))) 

728 for k, v in sorted(grouping_key.items())) 

729 

730 handler( 

731 url=url, method=method, timeout=timeout, 

732 headers=[('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)], data=data, 

733 )() 

734 

735 

736def _escape_grouping_key(k, v): 

737 if v == "": 

738 # Per https://github.com/prometheus/pushgateway/pull/346. 

739 return k + "@base64", "=" 

740 elif '/' in v: 

741 # Added in Pushgateway 0.9.0. 

742 return k + "@base64", base64.urlsafe_b64encode(v.encode("utf-8")).decode("utf-8") 

743 else: 

744 return k, quote_plus(v) 

745 

746 

747def instance_ip_grouping_key() -> Dict[str, Any]: 

748 """Grouping key with instance set to the IP Address of this host.""" 

749 with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s: 

750 if sys.platform == 'darwin': 

751 # This check is done this way only on MacOS devices 

752 # it is done this way because the localhost method does 

753 # not work. 

754 # This method was adapted from this StackOverflow answer: 

755 # https://stackoverflow.com/a/28950776 

756 s.connect(('10.255.255.255', 1)) 

757 else: 

758 s.connect(('localhost', 0)) 

759 

760 return {'instance': s.getsockname()[0]} 

761 

762 

763from .asgi import make_asgi_app # noqa