Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/web_log.py: 46%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import datetime
2import functools
3import logging
4import os
5import re
6import time as time_mod
7from collections import namedtuple
8from collections.abc import Iterable
9from typing import Callable, ClassVar
11from .abc import AbstractAccessLogger
12from .web_request import BaseRequest
13from .web_response import StreamResponse
15KeyMethod = namedtuple("KeyMethod", "key method")
18class AccessLogger(AbstractAccessLogger):
19 """Helper object to log access.
21 Usage:
22 log = logging.getLogger("spam")
23 log_format = "%a %{User-Agent}i"
24 access_logger = AccessLogger(log, log_format)
25 access_logger.log(request, response, time)
27 Format:
28 %% The percent sign
29 %a Remote IP-address (IP-address of proxy if using reverse proxy)
30 %t Time when the request was started to process
31 %P The process ID of the child that serviced the request
32 %r First line of request
33 %s Response status code
34 %b Size of response in bytes, including HTTP headers
35 %T Time taken to serve the request, in seconds
36 %Tf Time taken to serve the request, in seconds with floating fraction
37 in .06f format
38 %D Time taken to serve the request, in microseconds
39 %{FOO}i request.headers['FOO']
40 %{FOO}o response.headers['FOO']
41 %{FOO}e os.environ['FOO']
43 """
45 LOG_FORMAT_MAP = {
46 "a": "remote_address",
47 "t": "request_start_time",
48 "P": "process_id",
49 "r": "first_request_line",
50 "s": "response_status",
51 "b": "response_size",
52 "T": "request_time",
53 "Tf": "request_time_frac",
54 "D": "request_time_micro",
55 "i": "request_header",
56 "o": "response_header",
57 }
59 LOG_FORMAT = '%a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i"'
60 FORMAT_RE = re.compile(r"%(\{([A-Za-z0-9\-_]+)\}([ioe])|[atPrsbOD]|Tf?)")
61 CLEANUP_RE = re.compile(r"(%[^s])")
62 _FORMAT_CACHE: dict[str, tuple[str, list[KeyMethod]]] = {}
64 _cached_tz: ClassVar[datetime.timezone | None] = None
65 _cached_tz_expires: ClassVar[float] = 0.0
67 def __init__(self, logger: logging.Logger, log_format: str = LOG_FORMAT) -> None:
68 """Initialise the logger.
70 logger is a logger object to be used for logging.
71 log_format is a string with apache compatible log format description.
73 """
74 super().__init__(logger, log_format=log_format)
76 _compiled_format = AccessLogger._FORMAT_CACHE.get(log_format)
77 if not _compiled_format:
78 _compiled_format = self.compile_format(log_format)
79 AccessLogger._FORMAT_CACHE[log_format] = _compiled_format
81 self._log_format, self._methods = _compiled_format
83 def compile_format(self, log_format: str) -> tuple[str, list[KeyMethod]]:
84 """Translate log_format into form usable by modulo formatting
86 All known atoms will be replaced with %s
87 Also methods for formatting of those atoms will be added to
88 _methods in appropriate order
90 For example we have log_format = "%a %t"
91 This format will be translated to "%s %s"
92 Also contents of _methods will be
93 [self._format_a, self._format_t]
94 These method will be called and results will be passed
95 to translated string format.
97 Each _format_* method receive 'args' which is list of arguments
98 given to self.log
100 Exceptions are _format_e, _format_i and _format_o methods which
101 also receive key name (by functools.partial)
103 """
104 # list of (key, method) tuples, we don't use an OrderedDict as users
105 # can repeat the same key more than once
106 methods = list()
108 for atom in self.FORMAT_RE.findall(log_format):
109 if atom[1] == "":
110 format_key1 = self.LOG_FORMAT_MAP[atom[0]]
111 m = getattr(AccessLogger, "_format_%s" % atom[0])
112 key_method = KeyMethod(format_key1, m)
113 else:
114 format_key2 = (self.LOG_FORMAT_MAP[atom[2]], atom[1])
115 m = getattr(AccessLogger, "_format_%s" % atom[2])
116 key_method = KeyMethod(format_key2, functools.partial(m, atom[1]))
118 methods.append(key_method)
120 log_format = self.FORMAT_RE.sub(r"%s", log_format)
121 log_format = self.CLEANUP_RE.sub(r"%\1", log_format)
122 return log_format, methods
124 @staticmethod
125 def _format_i(
126 key: str, request: BaseRequest, response: StreamResponse, time: float
127 ) -> str:
128 # suboptimal, make istr(key) once
129 return request.headers.get(key, "-")
131 @staticmethod
132 def _format_o(
133 key: str, request: BaseRequest, response: StreamResponse, time: float
134 ) -> str:
135 # suboptimal, make istr(key) once
136 return response.headers.get(key, "-")
138 @staticmethod
139 def _format_a(request: BaseRequest, response: StreamResponse, time: float) -> str:
140 ip = request.remote
141 return ip if ip is not None else "-"
143 @classmethod
144 def _get_local_time(cls) -> datetime.datetime:
145 if cls._cached_tz is None or time_mod.time() >= cls._cached_tz_expires:
146 gmtoff = time_mod.localtime().tm_gmtoff
147 cls._cached_tz = tz = datetime.timezone(datetime.timedelta(seconds=gmtoff))
149 now = datetime.datetime.now(tz)
150 # Expire at every 30 mins, as any DST change should occur at 0/30 mins past.
151 d = now + datetime.timedelta(minutes=30)
152 d = d.replace(minute=30 if d.minute >= 30 else 0, second=0, microsecond=0)
153 cls._cached_tz_expires = d.timestamp()
154 return now
156 return datetime.datetime.now(cls._cached_tz)
158 @staticmethod
159 def _format_t(request: BaseRequest, response: StreamResponse, time: float) -> str:
160 now = AccessLogger._get_local_time()
161 start_time = now - datetime.timedelta(seconds=time)
162 return start_time.strftime("[%d/%b/%Y:%H:%M:%S %z]")
164 @staticmethod
165 def _format_P(request: BaseRequest, response: StreamResponse, time: float) -> str:
166 return "<%s>" % os.getpid()
168 @staticmethod
169 def _format_r(request: BaseRequest, response: StreamResponse, time: float) -> str:
170 return f"{request.method} {request.path_qs} HTTP/{request.version.major}.{request.version.minor}"
172 @staticmethod
173 def _format_s(request: BaseRequest, response: StreamResponse, time: float) -> int:
174 return response.status
176 @staticmethod
177 def _format_b(request: BaseRequest, response: StreamResponse, time: float) -> int:
178 return response.body_length
180 @staticmethod
181 def _format_T(request: BaseRequest, response: StreamResponse, time: float) -> str:
182 return str(round(time))
184 @staticmethod
185 def _format_Tf(request: BaseRequest, response: StreamResponse, time: float) -> str:
186 return "%06f" % time
188 @staticmethod
189 def _format_D(request: BaseRequest, response: StreamResponse, time: float) -> str:
190 return str(round(time * 1000000))
192 def _format_line(
193 self, request: BaseRequest, response: StreamResponse, time: float
194 ) -> Iterable[tuple[str, Callable[[BaseRequest, StreamResponse, float], str]]]:
195 return [(key, method(request, response, time)) for key, method in self._methods]
197 @property
198 def enabled(self) -> bool:
199 """Check if logger is enabled."""
200 # Avoid formatting the log line if it will not be emitted.
201 return self.logger.isEnabledFor(logging.INFO)
203 def log(self, request: BaseRequest, response: StreamResponse, time: float) -> None:
204 try:
205 fmt_info = self._format_line(request, response, time)
207 values = list()
208 extra = dict()
209 for key, value in fmt_info:
210 values.append(value)
212 if key.__class__ is str:
213 extra[key] = value
214 else:
215 k1, k2 = key # type: ignore[misc]
216 dct = extra.get(k1, {}) # type: ignore[var-annotated,has-type]
217 dct[k2] = value # type: ignore[index,has-type]
218 extra[k1] = dct # type: ignore[has-type,assignment]
220 self.logger.info(self._log_format % tuple(values), extra=extra)
221 except Exception:
222 self.logger.exception("Error in logging")