Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prometheus_client/asgi.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

19 statements  

1from typing import Callable 

2from urllib.parse import parse_qs 

3 

4from .exposition import _bake_output 

5from .registry import CollectorRegistry, REGISTRY 

6 

7 

8def make_asgi_app(registry: CollectorRegistry = REGISTRY, disable_compression: bool = False) -> Callable: 

9 """Create a ASGI app which serves the metrics from a registry.""" 

10 

11 async def prometheus_app(scope, receive, send): 

12 assert scope.get("type") == "http" 

13 # Prepare parameters 

14 params = parse_qs(scope.get('query_string', b'')) 

15 accept_header = ",".join([ 

16 value.decode("utf8") for (name, value) in scope.get('headers') 

17 if name.decode("utf8").lower() == 'accept' 

18 ]) 

19 accept_encoding_header = ",".join([ 

20 value.decode("utf8") for (name, value) in scope.get('headers') 

21 if name.decode("utf8").lower() == 'accept-encoding' 

22 ]) 

23 # Bake output 

24 status, headers, output = _bake_output(registry, accept_header, accept_encoding_header, params, disable_compression) 

25 formatted_headers = [] 

26 for header in headers: 

27 formatted_headers.append(tuple(x.encode('utf8') for x in header)) 

28 # Return output 

29 payload = await receive() 

30 if payload.get("type") == "http.request": 

31 await send( 

32 { 

33 "type": "http.response.start", 

34 "status": int(status.split(' ')[0]), 

35 "headers": formatted_headers, 

36 } 

37 ) 

38 await send({"type": "http.response.body", "body": output}) 

39 

40 return prometheus_app