Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_log.py: 47%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import datetime
2import functools
3import logging
4import os
5import re
6import time as time_mod
7from collections.abc import Iterable
8from typing import Callable, ClassVar, NamedTuple
10from .abc import AbstractAccessLogger
11from .web_request import BaseRequest
12from .web_response import StreamResponse
15class KeyMethod(NamedTuple):
16 key: str | tuple[str, str]
17 method: Callable[[BaseRequest, StreamResponse, float], str]
20class AccessLogger(AbstractAccessLogger):
21 """Helper object to log access.
23 Usage:
24 log = logging.getLogger("spam")
25 log_format = "%a %{User-Agent}i"
26 access_logger = AccessLogger(log, log_format)
27 access_logger.log(request, response, time)
29 Format:
30 %% The percent sign
31 %a Remote IP-address (IP-address of proxy if using reverse proxy)
32 %t Time when the request was started to process
33 %P The process ID of the child that serviced the request
34 %r First line of request
35 %s Response status code
36 %b Size of response in bytes, including HTTP headers
37 %T Time taken to serve the request, in seconds
38 %Tf Time taken to serve the request, in seconds with floating fraction
39 in .06f format
40 %D Time taken to serve the request, in microseconds
41 %{FOO}i request.headers['FOO']
42 %{FOO}o response.headers['FOO']
43 %{FOO}e os.environ['FOO']
45 """
47 LOG_FORMAT_MAP = {
48 "a": "remote_address",
49 "t": "request_start_time",
50 "P": "process_id",
51 "r": "first_request_line",
52 "s": "response_status",
53 "b": "response_size",
54 "T": "request_time",
55 "Tf": "request_time_frac",
56 "D": "request_time_micro",
57 "i": "request_header",
58 "o": "response_header",
59 }
61 LOG_FORMAT = '%a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i"'
62 FORMAT_RE = re.compile(r"%(\{([A-Za-z0-9\-_]+)\}([ioe])|[atPrsbOD]|Tf?)")
63 CLEANUP_RE = re.compile(r"(%[^s])")
64 _FORMAT_CACHE: dict[str, tuple[str, list[KeyMethod]]] = {}
66 _cached_tz: ClassVar[datetime.timezone | None] = None
67 _cached_tz_expires: ClassVar[float] = 0.0
69 def __init__(self, logger: logging.Logger, log_format: str = LOG_FORMAT) -> None:
70 """Initialise the logger.
72 logger is a logger object to be used for logging.
73 log_format is a string with apache compatible log format description.
75 """
76 super().__init__(logger, log_format=log_format)
78 _compiled_format = AccessLogger._FORMAT_CACHE.get(log_format)
79 if not _compiled_format:
80 _compiled_format = self.compile_format(log_format)
81 AccessLogger._FORMAT_CACHE[log_format] = _compiled_format
83 self._log_format, self._methods = _compiled_format
85 def compile_format(self, log_format: str) -> tuple[str, list[KeyMethod]]:
86 """Translate log_format into form usable by modulo formatting
88 All known atoms will be replaced with %s
89 Also methods for formatting of those atoms will be added to
90 _methods in appropriate order
92 For example we have log_format = "%a %t"
93 This format will be translated to "%s %s"
94 Also contents of _methods will be
95 [self._format_a, self._format_t]
96 These method will be called and results will be passed
97 to translated string format.
99 Each _format_* method receive 'args' which is list of arguments
100 given to self.log
102 Exceptions are _format_e, _format_i and _format_o methods which
103 also receive key name (by functools.partial)
105 """
106 # list of (key, method) tuples, we don't use an OrderedDict as users
107 # can repeat the same key more than once
108 methods = list()
110 for atom in self.FORMAT_RE.findall(log_format):
111 if atom[1] == "":
112 format_key1 = self.LOG_FORMAT_MAP[atom[0]]
113 m = getattr(AccessLogger, "_format_%s" % atom[0])
114 key_method = KeyMethod(format_key1, m)
115 else:
116 format_key2 = (self.LOG_FORMAT_MAP[atom[2]], atom[1])
117 m = getattr(AccessLogger, "_format_%s" % atom[2])
118 key_method = KeyMethod(format_key2, functools.partial(m, atom[1]))
120 methods.append(key_method)
122 log_format = self.FORMAT_RE.sub(r"%s", log_format)
123 log_format = self.CLEANUP_RE.sub(r"%\1", log_format)
124 return log_format, methods
126 @staticmethod
127 def _format_i(
128 key: str, request: BaseRequest, response: StreamResponse, time: float
129 ) -> str:
130 # suboptimal, make istr(key) once
131 return request.headers.get(key, "-")
133 @staticmethod
134 def _format_o(
135 key: str, request: BaseRequest, response: StreamResponse, time: float
136 ) -> str:
137 # suboptimal, make istr(key) once
138 return response.headers.get(key, "-")
140 @staticmethod
141 def _format_a(request: BaseRequest, response: StreamResponse, time: float) -> str:
142 ip = request.remote
143 return ip if ip is not None else "-"
145 @classmethod
146 def _get_local_time(cls) -> datetime.datetime:
147 if cls._cached_tz is None or time_mod.time() >= cls._cached_tz_expires:
148 gmtoff = time_mod.localtime().tm_gmtoff
149 cls._cached_tz = tz = datetime.timezone(datetime.timedelta(seconds=gmtoff))
151 now = datetime.datetime.now(tz)
152 # Expire at every 30 mins, as any DST change should occur at 0/30 mins past.
153 d = now + datetime.timedelta(minutes=30)
154 d = d.replace(minute=30 if d.minute >= 30 else 0, second=0, microsecond=0)
155 cls._cached_tz_expires = d.timestamp()
156 return now
158 return datetime.datetime.now(cls._cached_tz)
160 @staticmethod
161 def _format_t(request: BaseRequest, response: StreamResponse, time: float) -> str:
162 now = AccessLogger._get_local_time()
163 start_time = now - datetime.timedelta(seconds=time)
164 return start_time.strftime("[%d/%b/%Y:%H:%M:%S %z]")
166 @staticmethod
167 def _format_P(request: BaseRequest, response: StreamResponse, time: float) -> str:
168 return "<%s>" % os.getpid()
170 @staticmethod
171 def _format_r(request: BaseRequest, response: StreamResponse, time: float) -> str:
172 return f"{request.method} {request.path_qs} HTTP/{request.version.major}.{request.version.minor}"
174 @staticmethod
175 def _format_s(request: BaseRequest, response: StreamResponse, time: float) -> int:
176 return response.status
178 @staticmethod
179 def _format_b(request: BaseRequest, response: StreamResponse, time: float) -> int:
180 return response.body_length
182 @staticmethod
183 def _format_T(request: BaseRequest, response: StreamResponse, time: float) -> str:
184 return str(round(time))
186 @staticmethod
187 def _format_Tf(request: BaseRequest, response: StreamResponse, time: float) -> str:
188 return "%06f" % time
190 @staticmethod
191 def _format_D(request: BaseRequest, response: StreamResponse, time: float) -> str:
192 return str(round(time * 1000000))
194 def _format_line(
195 self, request: BaseRequest, response: StreamResponse, time: float
196 ) -> Iterable[tuple[str | tuple[str, str], str]]:
197 return [(key, method(request, response, time)) for key, method in self._methods]
199 @property
200 def enabled(self) -> bool:
201 """Check if logger is enabled."""
202 # Avoid formatting the log line if it will not be emitted.
203 return self.logger.isEnabledFor(logging.INFO)
205 def log(self, request: BaseRequest, response: StreamResponse, time: float) -> None:
206 try:
207 fmt_info = self._format_line(request, response, time)
209 values = list()
210 extra: dict[str, str | dict[str, str]] = dict()
211 for key, value in fmt_info:
212 values.append(value)
214 if isinstance(key, str):
215 extra[key] = value
216 else:
217 k1, k2 = key
218 dct: dict[str, str] = extra.get(k1, {}) # type: ignore[assignment]
219 dct[k2] = value
220 extra[k1] = dct
222 self.logger.info(self._log_format % tuple(values), extra=extra)
223 except Exception:
224 self.logger.exception("Error in logging")