Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_log.py: 43%
102 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
1import datetime
2import functools
3import logging
4import os
5import re
6import time as time_mod
7from collections import namedtuple
8from typing import Any, Callable, Dict, Iterable, List, Tuple # noqa
10from .abc import AbstractAccessLogger
11from .web_request import BaseRequest
12from .web_response import StreamResponse
14KeyMethod = namedtuple("KeyMethod", "key method")
17class AccessLogger(AbstractAccessLogger):
18 """Helper object to log access.
20 Usage:
21 log = logging.getLogger("spam")
22 log_format = "%a %{User-Agent}i"
23 access_logger = AccessLogger(log, log_format)
24 access_logger.log(request, response, time)
26 Format:
27 %% The percent sign
28 %a Remote IP-address (IP-address of proxy if using reverse proxy)
29 %t Time when the request was started to process
30 %P The process ID of the child that serviced the request
31 %r First line of request
32 %s Response status code
33 %b Size of response in bytes, including HTTP headers
34 %T Time taken to serve the request, in seconds
35 %Tf Time taken to serve the request, in seconds with floating fraction
36 in .06f format
37 %D Time taken to serve the request, in microseconds
38 %{FOO}i request.headers['FOO']
39 %{FOO}o response.headers['FOO']
40 %{FOO}e os.environ['FOO']
42 """
44 LOG_FORMAT_MAP = {
45 "a": "remote_address",
46 "t": "request_start_time",
47 "P": "process_id",
48 "r": "first_request_line",
49 "s": "response_status",
50 "b": "response_size",
51 "T": "request_time",
52 "Tf": "request_time_frac",
53 "D": "request_time_micro",
54 "i": "request_header",
55 "o": "response_header",
56 }
58 LOG_FORMAT = '%a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i"'
59 FORMAT_RE = re.compile(r"%(\{([A-Za-z0-9\-_]+)\}([ioe])|[atPrsbOD]|Tf?)")
60 CLEANUP_RE = re.compile(r"(%[^s])")
61 _FORMAT_CACHE: Dict[str, Tuple[str, List[KeyMethod]]] = {}
63 def __init__(self, logger: logging.Logger, log_format: str = LOG_FORMAT) -> None:
64 """Initialise the logger.
66 logger is a logger object to be used for logging.
67 log_format is a string with apache compatible log format description.
69 """
70 super().__init__(logger, log_format=log_format)
72 _compiled_format = AccessLogger._FORMAT_CACHE.get(log_format)
73 if not _compiled_format:
74 _compiled_format = self.compile_format(log_format)
75 AccessLogger._FORMAT_CACHE[log_format] = _compiled_format
77 self._log_format, self._methods = _compiled_format
79 def compile_format(self, log_format: str) -> Tuple[str, List[KeyMethod]]:
80 """Translate log_format into form usable by modulo formatting
82 All known atoms will be replaced with %s
83 Also methods for formatting of those atoms will be added to
84 _methods in appropriate order
86 For example we have log_format = "%a %t"
87 This format will be translated to "%s %s"
88 Also contents of _methods will be
89 [self._format_a, self._format_t]
90 These method will be called and results will be passed
91 to translated string format.
93 Each _format_* method receive 'args' which is list of arguments
94 given to self.log
96 Exceptions are _format_e, _format_i and _format_o methods which
97 also receive key name (by functools.partial)
99 """
100 # list of (key, method) tuples, we don't use an OrderedDict as users
101 # can repeat the same key more than once
102 methods = list()
104 for atom in self.FORMAT_RE.findall(log_format):
105 if atom[1] == "":
106 format_key1 = self.LOG_FORMAT_MAP[atom[0]]
107 m = getattr(AccessLogger, "_format_%s" % atom[0])
108 key_method = KeyMethod(format_key1, m)
109 else:
110 format_key2 = (self.LOG_FORMAT_MAP[atom[2]], atom[1])
111 m = getattr(AccessLogger, "_format_%s" % atom[2])
112 key_method = KeyMethod(format_key2, functools.partial(m, atom[1]))
114 methods.append(key_method)
116 log_format = self.FORMAT_RE.sub(r"%s", log_format)
117 log_format = self.CLEANUP_RE.sub(r"%\1", log_format)
118 return log_format, methods
120 @staticmethod
121 def _format_i(
122 key: str, request: BaseRequest, response: StreamResponse, time: float
123 ) -> str:
124 if request is None:
125 return "(no headers)"
127 # suboptimal, make istr(key) once
128 return request.headers.get(key, "-")
130 @staticmethod
131 def _format_o(
132 key: str, request: BaseRequest, response: StreamResponse, time: float
133 ) -> str:
134 # suboptimal, make istr(key) once
135 return response.headers.get(key, "-")
137 @staticmethod
138 def _format_a(request: BaseRequest, response: StreamResponse, time: float) -> str:
139 if request is None:
140 return "-"
141 ip = request.remote
142 return ip if ip is not None else "-"
144 @staticmethod
145 def _format_t(request: BaseRequest, response: StreamResponse, time: float) -> str:
146 tz = datetime.timezone(datetime.timedelta(seconds=-time_mod.timezone))
147 now = datetime.datetime.now(tz)
148 start_time = now - datetime.timedelta(seconds=time)
149 return start_time.strftime("[%d/%b/%Y:%H:%M:%S %z]")
151 @staticmethod
152 def _format_P(request: BaseRequest, response: StreamResponse, time: float) -> str:
153 return "<%s>" % os.getpid()
155 @staticmethod
156 def _format_r(request: BaseRequest, response: StreamResponse, time: float) -> str:
157 if request is None:
158 return "-"
159 return "{} {} HTTP/{}.{}".format(
160 request.method,
161 request.path_qs,
162 request.version.major,
163 request.version.minor,
164 )
166 @staticmethod
167 def _format_s(request: BaseRequest, response: StreamResponse, time: float) -> int:
168 return response.status
170 @staticmethod
171 def _format_b(request: BaseRequest, response: StreamResponse, time: float) -> int:
172 return response.body_length
174 @staticmethod
175 def _format_T(request: BaseRequest, response: StreamResponse, time: float) -> str:
176 return str(round(time))
178 @staticmethod
179 def _format_Tf(request: BaseRequest, response: StreamResponse, time: float) -> str:
180 return "%06f" % time
182 @staticmethod
183 def _format_D(request: BaseRequest, response: StreamResponse, time: float) -> str:
184 return str(round(time * 1000000))
186 def _format_line(
187 self, request: BaseRequest, response: StreamResponse, time: float
188 ) -> Iterable[Tuple[str, Callable[[BaseRequest, StreamResponse, float], str]]]:
189 return [(key, method(request, response, time)) for key, method in self._methods]
191 def log(self, request: BaseRequest, response: StreamResponse, time: float) -> None:
192 if not self.logger.isEnabledFor(logging.INFO):
193 # Avoid formatting the log line if it will not be emitted.
194 return
195 try:
196 fmt_info = self._format_line(request, response, time)
198 values = list()
199 extra = dict()
200 for key, value in fmt_info:
201 values.append(value)
203 if key.__class__ is str:
204 extra[key] = value
205 else:
206 k1, k2 = key # type: ignore[misc]
207 dct = extra.get(k1, {}) # type: ignore[var-annotated,has-type]
208 dct[k2] = value # type: ignore[index,has-type]
209 extra[k1] = dct # type: ignore[has-type,assignment]
211 self.logger.info(self._log_format % tuple(values), extra=extra)
212 except Exception:
213 self.logger.exception("Error in logging")