Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/prometheus_client/exposition.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

342 statements  

1import base64 

2from contextlib import closing 

3from functools import partial 

4import gzip 

5from http.server import BaseHTTPRequestHandler 

6import os 

7import socket 

8from socketserver import ThreadingMixIn 

9import ssl 

10import sys 

11import threading 

12from typing import ( 

13 Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union, 

14) 

15from urllib.error import HTTPError 

16from urllib.parse import parse_qs, quote_plus, urlparse 

17from urllib.request import ( 

18 BaseHandler, build_opener, HTTPHandler, HTTPRedirectHandler, HTTPSHandler, 

19 Request, 

20) 

21from wsgiref.simple_server import make_server, WSGIRequestHandler, WSGIServer 

22 

23from .openmetrics import exposition as openmetrics 

24from .registry import Collector, REGISTRY 

25from .utils import floatToGoString, parse_version 

26 

27try: 

28 import snappy # type: ignore 

29 SNAPPY_AVAILABLE = True 

30except ImportError: 

31 snappy = None # type: ignore 

32 SNAPPY_AVAILABLE = False 

33 

34__all__ = ( 

35 'CONTENT_TYPE_LATEST', 

36 'CONTENT_TYPE_PLAIN_0_0_4', 

37 'CONTENT_TYPE_PLAIN_1_0_0', 

38 'delete_from_gateway', 

39 'generate_latest', 

40 'instance_ip_grouping_key', 

41 'make_asgi_app', 

42 'make_wsgi_app', 

43 'MetricsHandler', 

44 'push_to_gateway', 

45 'pushadd_to_gateway', 

46 'start_http_server', 

47 'start_wsgi_server', 

48 'write_to_textfile', 

49) 

50 

51CONTENT_TYPE_PLAIN_0_0_4 = 'text/plain; version=0.0.4; charset=utf-8' 

52"""Content type of the compatibility format""" 

53 

54CONTENT_TYPE_PLAIN_1_0_0 = 'text/plain; version=1.0.0; charset=utf-8' 

55"""Content type of the latest format""" 

56 

57CONTENT_TYPE_LATEST = CONTENT_TYPE_PLAIN_1_0_0 

58CompressionType = Optional[Literal['gzip', 'snappy']] 

59 

60 

61class _PrometheusRedirectHandler(HTTPRedirectHandler): 

62 """ 

63 Allow additional methods (e.g. PUT) and data forwarding in redirects. 

64 

65 Use of this class constitute a user's explicit agreement to the 

66 redirect responses the Prometheus client will receive when using it. 

67 You should only use this class if you control or otherwise trust the 

68 redirect behavior involved and are certain it is safe to full transfer 

69 the original request (method and data) to the redirected URL. For 

70 example, if you know there is a cosmetic URL redirect in front of a 

71 local deployment of a Prometheus server, and all redirects are safe, 

72 this is the class to use to handle redirects in that case. 

73 

74 The standard HTTPRedirectHandler does not forward request data nor 

75 does it allow redirected PUT requests (which Prometheus uses for some 

76 operations, for example `push_to_gateway`) because these cannot 

77 generically guarantee no violations of HTTP RFC 2616 requirements for 

78 the user to explicitly confirm redirects that could have unexpected 

79 side effects (such as rendering a PUT request non-idempotent or 

80 creating multiple resources not named in the original request). 

81 """ 

82 

83 def redirect_request(self, req, fp, code, msg, headers, newurl): 

84 """ 

85 Apply redirect logic to a request. 

86 

87 See parent HTTPRedirectHandler.redirect_request for parameter info. 

88 

89 If the redirect is disallowed, this raises the corresponding HTTP error. 

90 If the redirect can't be determined, return None to allow other handlers 

91 to try. If the redirect is allowed, return the new request. 

92 

93 This method specialized for the case when (a) the user knows that the 

94 redirect will not cause unacceptable side effects for any request method, 

95 and (b) the user knows that any request data should be passed through to 

96 the redirect. If either condition is not met, this should not be used. 

97 """ 

98 # note that requests being provided by a handler will use get_method to 

99 # indicate the method, by monkeypatching this, instead of setting the 

100 # Request object's method attribute. 

101 m = getattr(req, "method", req.get_method()) 

102 if not (code in (301, 302, 303, 307) and m in ("GET", "HEAD") 

103 or code in (301, 302, 303) and m in ("POST", "PUT")): 

104 raise HTTPError(req.full_url, code, msg, headers, fp) 

105 new_request = Request( 

106 newurl.replace(' ', '%20'), # space escaping in new url if needed. 

107 headers=req.headers, 

108 origin_req_host=req.origin_req_host, 

109 unverifiable=True, 

110 data=req.data, 

111 ) 

112 new_request.method = m 

113 return new_request 

114 

115 

116def _bake_output(registry, accept_header, accept_encoding_header, params, disable_compression): 

117 """Bake output for metrics output.""" 

118 # Choose the correct plain text format of the output. 

119 encoder, content_type = choose_encoder(accept_header) 

120 if 'name[]' in params: 

121 registry = registry.restricted_registry(params['name[]']) 

122 output = encoder(registry) 

123 headers = [('Content-Type', content_type)] 

124 # If gzip encoding required, gzip the output. 

125 if not disable_compression and gzip_accepted(accept_encoding_header): 

126 output = gzip.compress(output) 

127 headers.append(('Content-Encoding', 'gzip')) 

128 return '200 OK', headers, output 

129 

130 

131def make_wsgi_app(registry: Collector = REGISTRY, disable_compression: bool = False) -> Callable: 

132 """Create a WSGI app which serves the metrics from a registry.""" 

133 

134 def prometheus_app(environ, start_response): 

135 # Prepare parameters 

136 accept_header = environ.get('HTTP_ACCEPT') 

137 accept_encoding_header = environ.get('HTTP_ACCEPT_ENCODING') 

138 params = parse_qs(environ.get('QUERY_STRING', '')) 

139 method = environ['REQUEST_METHOD'] 

140 

141 if method == 'OPTIONS': 

142 status = '200 OK' 

143 headers = [('Allow', 'OPTIONS,GET')] 

144 output = b'' 

145 elif method != 'GET': 

146 status = '405 Method Not Allowed' 

147 headers = [('Allow', 'OPTIONS,GET')] 

148 output = '# HTTP {}: {}; use OPTIONS or GET\n'.format(status, method).encode() 

149 elif environ['PATH_INFO'] == '/favicon.ico': 

150 # Serve empty response for browsers 

151 status = '200 OK' 

152 headers = [] 

153 output = b'' 

154 else: 

155 # Note: For backwards compatibility, the URI path for GET is not 

156 # constrained to the documented /metrics, but any path is allowed. 

157 # Bake output 

158 status, headers, output = _bake_output(registry, accept_header, accept_encoding_header, params, disable_compression) 

159 # Return output 

160 start_response(status, headers) 

161 return [output] 

162 

163 return prometheus_app 

164 

165 

166class _SilentHandler(WSGIRequestHandler): 

167 """WSGI handler that does not log requests.""" 

168 

169 def log_message(self, format, *args): 

170 """Log nothing.""" 

171 

172 

173class ThreadingWSGIServer(ThreadingMixIn, WSGIServer): 

174 """Thread per request HTTP server.""" 

175 # Make worker threads "fire and forget". Beginning with Python 3.7 this 

176 # prevents a memory leak because ``ThreadingMixIn`` starts to gather all 

177 # non-daemon threads in a list in order to join on them at server close. 

178 daemon_threads = True 

179 

180 

181def _get_best_family(address, port): 

182 """Automatically select address family depending on address""" 

183 # HTTPServer defaults to AF_INET, which will not start properly if 

184 # binding an ipv6 address is requested. 

185 # This function is based on what upstream python did for http.server 

186 # in https://github.com/python/cpython/pull/11767 

187 infos = socket.getaddrinfo(address, port, type=socket.SOCK_STREAM, flags=socket.AI_PASSIVE) 

188 family, _, _, _, sockaddr = next(iter(infos)) 

189 return family, sockaddr[0] 

190 

191 

192def _get_ssl_ctx( 

193 certfile: str, 

194 keyfile: str, 

195 protocol: int, 

196 cafile: Optional[str] = None, 

197 capath: Optional[str] = None, 

198 client_auth_required: bool = False, 

199) -> ssl.SSLContext: 

200 """Load context supports SSL.""" 

201 ssl_cxt = ssl.SSLContext(protocol=protocol) 

202 

203 if cafile is not None or capath is not None: 

204 try: 

205 ssl_cxt.load_verify_locations(cafile, capath) 

206 except IOError as exc: 

207 exc_type = type(exc) 

208 msg = str(exc) 

209 raise exc_type(f"Cannot load CA certificate chain from file " 

210 f"{cafile!r} or directory {capath!r}: {msg}") 

211 else: 

212 try: 

213 ssl_cxt.load_default_certs(purpose=ssl.Purpose.CLIENT_AUTH) 

214 except IOError as exc: 

215 exc_type = type(exc) 

216 msg = str(exc) 

217 raise exc_type(f"Cannot load default CA certificate chain: {msg}") 

218 

219 if client_auth_required: 

220 ssl_cxt.verify_mode = ssl.CERT_REQUIRED 

221 

222 try: 

223 ssl_cxt.load_cert_chain(certfile=certfile, keyfile=keyfile) 

224 except IOError as exc: 

225 exc_type = type(exc) 

226 msg = str(exc) 

227 raise exc_type(f"Cannot load server certificate file {certfile!r} or " 

228 f"its private key file {keyfile!r}: {msg}") 

229 

230 return ssl_cxt 

231 

232 

233def start_wsgi_server( 

234 port: int, 

235 addr: str = '0.0.0.0', 

236 registry: Collector = REGISTRY, 

237 certfile: Optional[str] = None, 

238 keyfile: Optional[str] = None, 

239 client_cafile: Optional[str] = None, 

240 client_capath: Optional[str] = None, 

241 protocol: int = ssl.PROTOCOL_TLS_SERVER, 

242 client_auth_required: bool = False, 

243) -> Tuple[WSGIServer, threading.Thread]: 

244 """Starts a WSGI server for prometheus metrics as a daemon thread.""" 

245 

246 class TmpServer(ThreadingWSGIServer): 

247 """Copy of ThreadingWSGIServer to update address_family locally""" 

248 

249 TmpServer.address_family, addr = _get_best_family(addr, port) 

250 app = make_wsgi_app(registry) 

251 httpd = make_server(addr, port, app, TmpServer, handler_class=_SilentHandler) 

252 if certfile and keyfile: 

253 context = _get_ssl_ctx(certfile, keyfile, protocol, client_cafile, client_capath, client_auth_required) 

254 httpd.socket = context.wrap_socket(httpd.socket, server_side=True) 

255 t = threading.Thread(target=httpd.serve_forever) 

256 t.daemon = True 

257 t.start() 

258 

259 return httpd, t 

260 

261 

262start_http_server = start_wsgi_server 

263 

264 

265def generate_latest(registry: Collector = REGISTRY, escaping: str = openmetrics.UNDERSCORES) -> bytes: 

266 """ 

267 Generates the exposition format using the basic Prometheus text format. 

268 

269 Params: 

270 registry: Collector to export data from. 

271 escaping: Escaping scheme used for metric and label names. 

272 

273 Returns: UTF-8 encoded string containing the metrics in text format. 

274 """ 

275 

276 def sample_line(samples): 

277 if samples.labels: 

278 labelstr = '{0}'.format(','.join( 

279 # Label values always support UTF-8 

280 ['{}="{}"'.format( 

281 openmetrics.escape_label_name(k, escaping), openmetrics._escape(v, openmetrics.ALLOWUTF8, False)) 

282 for k, v in sorted(samples.labels.items())])) 

283 else: 

284 labelstr = '' 

285 timestamp = '' 

286 if samples.timestamp is not None: 

287 # Convert to milliseconds. 

288 timestamp = f' {int(float(samples.timestamp) * 1000):d}' 

289 if escaping != openmetrics.ALLOWUTF8 or openmetrics._is_valid_legacy_metric_name(samples.name): 

290 if labelstr: 

291 labelstr = '{{{0}}}'.format(labelstr) 

292 return f'{openmetrics.escape_metric_name(samples.name, escaping)}{labelstr} {floatToGoString(samples.value)}{timestamp}\n' 

293 maybe_comma = '' 

294 if labelstr: 

295 maybe_comma = ',' 

296 return f'{{{openmetrics.escape_metric_name(samples.name, escaping)}{maybe_comma}{labelstr}}} {floatToGoString(samples.value)}{timestamp}\n' 

297 

298 output = [] 

299 for metric in registry.collect(): 

300 try: 

301 mname = metric.name 

302 mtype = metric.type 

303 # Munging from OpenMetrics into Prometheus format. 

304 if mtype == 'counter': 

305 mname = mname + '_total' 

306 elif mtype == 'info': 

307 mname = mname + '_info' 

308 mtype = 'gauge' 

309 elif mtype == 'stateset': 

310 mtype = 'gauge' 

311 elif mtype == 'gaugehistogram': 

312 # A gauge histogram is really a gauge, 

313 # but this captures the structure better. 

314 mtype = 'histogram' 

315 elif mtype == 'unknown': 

316 mtype = 'untyped' 

317 

318 output.append('# HELP {} {}\n'.format( 

319 openmetrics.escape_metric_name(mname, escaping), metric.documentation.replace('\\', r'\\').replace('\n', r'\n'))) 

320 output.append(f'# TYPE {openmetrics.escape_metric_name(mname, escaping)} {mtype}\n') 

321 

322 om_samples: Dict[str, List[str]] = {} 

323 for s in metric.samples: 

324 for suffix in ['_created', '_gsum', '_gcount']: 

325 if s.name == metric.name + suffix: 

326 # OpenMetrics specific sample, put in a gauge at the end. 

327 om_samples.setdefault(suffix, []).append(sample_line(s)) 

328 break 

329 else: 

330 output.append(sample_line(s)) 

331 except Exception as exception: 

332 exception.args = (exception.args or ('',)) + (metric,) 

333 raise 

334 

335 for suffix, lines in sorted(om_samples.items()): 

336 output.append('# HELP {} {}\n'.format(openmetrics.escape_metric_name(metric.name + suffix, escaping), 

337 metric.documentation.replace('\\', r'\\').replace('\n', r'\n'))) 

338 output.append(f'# TYPE {openmetrics.escape_metric_name(metric.name + suffix, escaping)} gauge\n') 

339 output.extend(lines) 

340 return ''.join(output).encode('utf-8') 

341 

342 

343def choose_encoder(accept_header: str) -> Tuple[Callable[[Collector], bytes], str]: 

344 # Python client library accepts a narrower range of content-types than 

345 # Prometheus does. 

346 accept_header = accept_header or '' 

347 escaping = openmetrics.UNDERSCORES 

348 for accepted in accept_header.split(','): 

349 if accepted.split(';')[0].strip() == 'application/openmetrics-text': 

350 toks = accepted.split(';') 

351 version = _get_version(toks) 

352 escaping = _get_escaping(toks) 

353 # Only return an escaping header if we have a good version and 

354 # mimetype. 

355 if not version: 

356 return (partial(openmetrics.generate_latest, escaping=openmetrics.UNDERSCORES, version="1.0.0"), openmetrics.CONTENT_TYPE_LATEST) 

357 if version and parse_version(version) >= (1, 0, 0): 

358 return (partial(openmetrics.generate_latest, escaping=escaping, version=version), 

359 f'application/openmetrics-text; version={version}; charset=utf-8; escaping=' + str(escaping)) 

360 elif accepted.split(';')[0].strip() == 'text/plain': 

361 toks = accepted.split(';') 

362 version = _get_version(toks) 

363 escaping = _get_escaping(toks) 

364 # Only return an escaping header if we have a good version and 

365 # mimetype. 

366 if version and parse_version(version) >= (1, 0, 0): 

367 return (partial(generate_latest, escaping=escaping), 

368 CONTENT_TYPE_LATEST + '; escaping=' + str(escaping)) 

369 return generate_latest, CONTENT_TYPE_PLAIN_0_0_4 

370 

371 

372def _get_version(accept_header: List[str]) -> str: 

373 """Return the version tag from the Accept header. 

374 

375 If no version is specified, returns empty string.""" 

376 

377 for tok in accept_header: 

378 if '=' not in tok: 

379 continue 

380 key, value = tok.strip().split('=', 1) 

381 if key == 'version': 

382 return value 

383 return "" 

384 

385 

386def _get_escaping(accept_header: List[str]) -> str: 

387 """Return the escaping scheme from the Accept header. 

388 

389 If no escaping scheme is specified or the scheme is not one of the allowed 

390 strings, defaults to UNDERSCORES.""" 

391 

392 for tok in accept_header: 

393 if '=' not in tok: 

394 continue 

395 key, value = tok.strip().split('=', 1) 

396 if key != 'escaping': 

397 continue 

398 if value == openmetrics.ALLOWUTF8: 

399 return openmetrics.ALLOWUTF8 

400 elif value == openmetrics.UNDERSCORES: 

401 return openmetrics.UNDERSCORES 

402 elif value == openmetrics.DOTS: 

403 return openmetrics.DOTS 

404 elif value == openmetrics.VALUES: 

405 return openmetrics.VALUES 

406 else: 

407 return openmetrics.UNDERSCORES 

408 return openmetrics.UNDERSCORES 

409 

410 

411def gzip_accepted(accept_encoding_header: str) -> bool: 

412 accept_encoding_header = accept_encoding_header or '' 

413 for accepted in accept_encoding_header.split(','): 

414 if accepted.split(';')[0].strip().lower() == 'gzip': 

415 return True 

416 return False 

417 

418 

419class MetricsHandler(BaseHTTPRequestHandler): 

420 """HTTP handler that gives metrics from ``REGISTRY``.""" 

421 registry: Collector = REGISTRY 

422 

423 def do_GET(self) -> None: 

424 # Prepare parameters 

425 registry = self.registry 

426 accept_header = self.headers.get('Accept') 

427 accept_encoding_header = self.headers.get('Accept-Encoding') 

428 params = parse_qs(urlparse(self.path).query) 

429 # Bake output 

430 status, headers, output = _bake_output(registry, accept_header, accept_encoding_header, params, False) 

431 # Return output 

432 self.send_response(int(status.split(' ')[0])) 

433 for header in headers: 

434 self.send_header(*header) 

435 self.end_headers() 

436 self.wfile.write(output) 

437 

438 def log_message(self, format: str, *args: Any) -> None: 

439 """Log nothing.""" 

440 

441 @classmethod 

442 def factory(cls, registry: Collector) -> type: 

443 """Returns a dynamic MetricsHandler class tied 

444 to the passed registry. 

445 """ 

446 # This implementation relies on MetricsHandler.registry 

447 # (defined above and defaulted to REGISTRY). 

448 

449 # As we have unicode_literals, we need to create a str() 

450 # object for type(). 

451 cls_name = str(cls.__name__) 

452 MyMetricsHandler = type(cls_name, (cls, object), 

453 {"registry": registry}) 

454 return MyMetricsHandler 

455 

456 

457def write_to_textfile(path: str, registry: Collector, escaping: str = openmetrics.ALLOWUTF8, tmpdir: Optional[str] = None) -> None: 

458 """Write metrics to the given path. 

459 

460 This is intended for use with the Node exporter textfile collector. 

461 The path must end in .prom for the textfile collector to process it. 

462 

463 An optional tmpdir parameter can be set to determine where the 

464 metrics will be temporarily written to. If not set, it will be in 

465 the same directory as the .prom file. If provided, the path MUST be 

466 on the same filesystem.""" 

467 if tmpdir is not None: 

468 filename = os.path.basename(path) 

469 tmppath = f'{os.path.join(tmpdir, filename)}.{os.getpid()}.{threading.current_thread().ident}' 

470 else: 

471 tmppath = f'{path}.{os.getpid()}.{threading.current_thread().ident}' 

472 try: 

473 with open(tmppath, 'wb') as f: 

474 f.write(generate_latest(registry, escaping)) 

475 

476 # rename(2) is atomic but fails on Windows if the destination file exists 

477 if os.name == 'nt': 

478 os.replace(tmppath, path) 

479 else: 

480 os.rename(tmppath, path) 

481 except Exception: 

482 if os.path.exists(tmppath): 

483 os.remove(tmppath) 

484 raise 

485 

486 

487def _make_handler( 

488 url: str, 

489 method: str, 

490 timeout: Optional[float], 

491 headers: Sequence[Tuple[str, str]], 

492 data: bytes, 

493 base_handler: Union[BaseHandler, type], 

494) -> Callable[[], None]: 

495 def handle() -> None: 

496 request = Request(url, data=data) 

497 request.get_method = lambda: method # type: ignore 

498 for k, v in headers: 

499 request.add_header(k, v) 

500 resp = build_opener(base_handler).open(request, timeout=timeout) 

501 if resp.code >= 400: 

502 raise OSError(f"error talking to pushgateway: {resp.code} {resp.msg}") 

503 

504 return handle 

505 

506 

507def default_handler( 

508 url: str, 

509 method: str, 

510 timeout: Optional[float], 

511 headers: List[Tuple[str, str]], 

512 data: bytes, 

513) -> Callable[[], None]: 

514 """Default handler that implements HTTP/HTTPS connections. 

515 

516 Used by the push_to_gateway functions. Can be re-used by other handlers.""" 

517 

518 return _make_handler(url, method, timeout, headers, data, HTTPHandler) 

519 

520 

521def passthrough_redirect_handler( 

522 url: str, 

523 method: str, 

524 timeout: Optional[float], 

525 headers: List[Tuple[str, str]], 

526 data: bytes, 

527) -> Callable[[], None]: 

528 """ 

529 Handler that automatically trusts redirect responses for all HTTP methods. 

530 

531 Augments standard HTTPRedirectHandler capability by permitting PUT requests, 

532 preserving the method upon redirect, and passing through all headers and 

533 data from the original request. Only use this handler if you control or 

534 trust the source of redirect responses you encounter when making requests 

535 via the Prometheus client. This handler will simply repeat the identical 

536 request, including same method and data, to the new redirect URL.""" 

537 

538 return _make_handler(url, method, timeout, headers, data, _PrometheusRedirectHandler) 

539 

540 

541def basic_auth_handler( 

542 url: str, 

543 method: str, 

544 timeout: Optional[float], 

545 headers: List[Tuple[str, str]], 

546 data: bytes, 

547 username: Optional[str] = None, 

548 password: Optional[str] = None, 

549) -> Callable[[], None]: 

550 """Handler that implements HTTP/HTTPS connections with Basic Auth. 

551 

552 Sets auth headers using supplied 'username' and 'password', if set. 

553 Used by the push_to_gateway functions. Can be re-used by other handlers.""" 

554 

555 def handle(): 

556 """Handler that implements HTTP Basic Auth. 

557 """ 

558 if username is not None and password is not None: 

559 auth_value = f'{username}:{password}'.encode() 

560 auth_token = base64.b64encode(auth_value) 

561 auth_header = b'Basic ' + auth_token 

562 headers.append(('Authorization', auth_header)) 

563 default_handler(url, method, timeout, headers, data)() 

564 

565 return handle 

566 

567 

568def tls_auth_handler( 

569 url: str, 

570 method: str, 

571 timeout: Optional[float], 

572 headers: List[Tuple[str, str]], 

573 data: bytes, 

574 certfile: str, 

575 keyfile: str, 

576 cafile: Optional[str] = None, 

577 protocol: int = ssl.PROTOCOL_TLS_CLIENT, 

578 insecure_skip_verify: bool = False, 

579) -> Callable[[], None]: 

580 """Handler that implements an HTTPS connection with TLS Auth. 

581 

582 The default protocol (ssl.PROTOCOL_TLS_CLIENT) will also enable 

583 ssl.CERT_REQUIRED and SSLContext.check_hostname by default. This can be 

584 disabled by setting insecure_skip_verify to True. 

585 

586 Both this handler and the TLS feature on pushgateay are experimental.""" 

587 context = ssl.SSLContext(protocol=protocol) 

588 if cafile is not None: 

589 context.load_verify_locations(cafile) 

590 else: 

591 context.load_default_certs() 

592 

593 if insecure_skip_verify: 

594 context.check_hostname = False 

595 context.verify_mode = ssl.CERT_NONE 

596 

597 context.load_cert_chain(certfile=certfile, keyfile=keyfile) 

598 handler = HTTPSHandler(context=context) 

599 return _make_handler(url, method, timeout, headers, data, handler) 

600 

601 

602def push_to_gateway( 

603 gateway: str, 

604 job: str, 

605 registry: Collector, 

606 grouping_key: Optional[Dict[str, Any]] = None, 

607 timeout: Optional[float] = 30, 

608 handler: Callable = default_handler, 

609 compression: CompressionType = None, 

610) -> None: 

611 """Push metrics to the given pushgateway. 

612 

613 `gateway` the url for your push gateway. Either of the form 

614 'http://pushgateway.local', or 'pushgateway.local'. 

615 Scheme defaults to 'http' if none is provided 

616 `job` is the job label to be attached to all pushed metrics 

617 `registry` is a Collector, normally an instance of CollectorRegistry 

618 `grouping_key` please see the pushgateway documentation for details. 

619 Defaults to None 

620 `timeout` is how long push will attempt to connect before giving up. 

621 Defaults to 30s, can be set to None for no timeout. 

622 `handler` is an optional function which can be provided to perform 

623 requests to the 'gateway'. 

624 Defaults to None, in which case an http or https request 

625 will be carried out by a default handler. 

626 If not None, the argument must be a function which accepts 

627 the following arguments: 

628 url, method, timeout, headers, and content 

629 May be used to implement additional functionality not 

630 supported by the built-in default handler (such as SSL 

631 client certicates, and HTTP authentication mechanisms). 

632 'url' is the URL for the request, the 'gateway' argument 

633 described earlier will form the basis of this URL. 

634 'method' is the HTTP method which should be used when 

635 carrying out the request. 

636 'timeout' requests not successfully completed after this 

637 many seconds should be aborted. If timeout is None, then 

638 the handler should not set a timeout. 

639 'headers' is a list of ("header-name","header-value") tuples 

640 which must be passed to the pushgateway in the form of HTTP 

641 request headers. 

642 The function should raise an exception (e.g. IOError) on 

643 failure. 

644 'content' is the data which should be used to form the HTTP 

645 Message Body. 

646 `compression` selects the payload compression. Supported values are 'gzip' 

647 and 'snappy'. Defaults to None (no compression). 

648 

649 This overwrites all metrics with the same job and grouping_key. 

650 This uses the PUT HTTP method.""" 

651 _use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler, compression) 

652 

653 

654def pushadd_to_gateway( 

655 gateway: str, 

656 job: str, 

657 registry: Optional[Collector], 

658 grouping_key: Optional[Dict[str, Any]] = None, 

659 timeout: Optional[float] = 30, 

660 handler: Callable = default_handler, 

661 compression: CompressionType = None, 

662) -> None: 

663 """PushAdd metrics to the given pushgateway. 

664 

665 `gateway` the url for your push gateway. Either of the form 

666 'http://pushgateway.local', or 'pushgateway.local'. 

667 Scheme defaults to 'http' if none is provided 

668 `job` is the job label to be attached to all pushed metrics 

669 `registry` is a Collector, normally an instance of CollectorRegistry 

670 `grouping_key` please see the pushgateway documentation for details. 

671 Defaults to None 

672 `timeout` is how long push will attempt to connect before giving up. 

673 Defaults to 30s, can be set to None for no timeout. 

674 `handler` is an optional function which can be provided to perform 

675 requests to the 'gateway'. 

676 Defaults to None, in which case an http or https request 

677 will be carried out by a default handler. 

678 See the 'prometheus_client.push_to_gateway' documentation 

679 for implementation requirements. 

680 `compression` selects the payload compression. Supported values are 'gzip' 

681 and 'snappy'. Defaults to None (no compression). 

682 

683 This replaces metrics with the same name, job and grouping_key. 

684 This uses the POST HTTP method.""" 

685 _use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler, compression) 

686 

687 

688def delete_from_gateway( 

689 gateway: str, 

690 job: str, 

691 grouping_key: Optional[Dict[str, Any]] = None, 

692 timeout: Optional[float] = 30, 

693 handler: Callable = default_handler, 

694) -> None: 

695 """Delete metrics from the given pushgateway. 

696 

697 `gateway` the url for your push gateway. Either of the form 

698 'http://pushgateway.local', or 'pushgateway.local'. 

699 Scheme defaults to 'http' if none is provided 

700 `job` is the job label to be attached to all pushed metrics 

701 `grouping_key` please see the pushgateway documentation for details. 

702 Defaults to None 

703 `timeout` is how long delete will attempt to connect before giving up. 

704 Defaults to 30s, can be set to None for no timeout. 

705 `handler` is an optional function which can be provided to perform 

706 requests to the 'gateway'. 

707 Defaults to None, in which case an http or https request 

708 will be carried out by a default handler. 

709 See the 'prometheus_client.push_to_gateway' documentation 

710 for implementation requirements. 

711 

712 This deletes metrics with the given job and grouping_key. 

713 This uses the DELETE HTTP method.""" 

714 _use_gateway('DELETE', gateway, job, None, grouping_key, timeout, handler) 

715 

716 

717def _use_gateway( 

718 method: str, 

719 gateway: str, 

720 job: str, 

721 registry: Optional[Collector], 

722 grouping_key: Optional[Dict[str, Any]], 

723 timeout: Optional[float], 

724 handler: Callable, 

725 compression: CompressionType = None, 

726) -> None: 

727 gateway_url = urlparse(gateway) 

728 # See https://bugs.python.org/issue27657 for details on urlparse in py>=3.7.6. 

729 if not gateway_url.scheme or gateway_url.scheme not in ['http', 'https']: 

730 gateway = f'http://{gateway}' 

731 

732 gateway = gateway.rstrip('/') 

733 url = '{}/metrics/{}/{}'.format(gateway, *_escape_grouping_key("job", job)) 

734 

735 if grouping_key is None: 

736 grouping_key = {} 

737 url += ''.join( 

738 '/{}/{}'.format(*_escape_grouping_key(str(k), str(v))) 

739 for k, v in sorted(grouping_key.items())) 

740 

741 data = b'' 

742 headers: List[Tuple[str, str]] = [] 

743 if method != 'DELETE': 

744 if registry is None: 

745 registry = REGISTRY 

746 data = generate_latest(registry) 

747 data, headers = _compress_payload(data, compression) 

748 else: 

749 # DELETE requests still need Content-Type header per test expectations 

750 headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)] 

751 if compression is not None: 

752 raise ValueError('Compression is not supported for DELETE requests.') 

753 

754 handler( 

755 url=url, method=method, timeout=timeout, 

756 headers=headers, data=data, 

757 )() 

758 

759 

760def _compress_payload(data: bytes, compression: CompressionType) -> Tuple[bytes, List[Tuple[str, str]]]: 

761 headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)] 

762 if compression is None: 

763 return data, headers 

764 

765 encoding = compression.lower() 

766 if encoding == 'gzip': 

767 headers.append(('Content-Encoding', 'gzip')) 

768 return gzip.compress(data), headers 

769 if encoding == 'snappy': 

770 if not SNAPPY_AVAILABLE: 

771 raise RuntimeError('Snappy compression requires the python-snappy package to be installed.') 

772 headers.append(('Content-Encoding', 'snappy')) 

773 compressor = snappy.StreamCompressor() 

774 compressed = compressor.compress(data) 

775 flush = getattr(compressor, 'flush', None) 

776 if callable(flush): 

777 compressed += flush() 

778 return compressed, headers 

779 raise ValueError(f"Unsupported compression type: {compression}") 

780 

781 

782def _escape_grouping_key(k, v): 

783 if v == "": 

784 # Per https://github.com/prometheus/pushgateway/pull/346. 

785 return k + "@base64", "=" 

786 elif '/' in v: 

787 # Added in Pushgateway 0.9.0. 

788 return k + "@base64", base64.urlsafe_b64encode(v.encode("utf-8")).decode("utf-8") 

789 else: 

790 return k, quote_plus(v) 

791 

792 

793def instance_ip_grouping_key() -> Dict[str, Any]: 

794 """Grouping key with instance set to the IP Address of this host.""" 

795 with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s: 

796 if sys.platform == 'darwin': 

797 # This check is done this way only on MacOS devices 

798 # it is done this way because the localhost method does 

799 # not work. 

800 # This method was adapted from this StackOverflow answer: 

801 # https://stackoverflow.com/a/28950776 

802 s.connect(('10.255.255.255', 1)) 

803 else: 

804 s.connect(('localhost', 0)) 

805 

806 return {'instance': s.getsockname()[0]} 

807 

808 

809from .asgi import make_asgi_app # noqa