Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/_internal.py: 45%

115 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-09 07:17 +0000

1from __future__ import annotations 

2 

3import logging 

4import re 

5import sys 

6import typing as t 

7from datetime import datetime 

8from datetime import timezone 

9 

10if t.TYPE_CHECKING: 

11 from _typeshed.wsgi import WSGIEnvironment 

12 from .wrappers.request import Request 

13 

14_logger: logging.Logger | None = None 

15 

16 

17class _Missing: 

18 def __repr__(self) -> str: 

19 return "no value" 

20 

21 def __reduce__(self) -> str: 

22 return "_missing" 

23 

24 

25_missing = _Missing() 

26 

27 

28def _wsgi_decoding_dance(s: str) -> str: 

29 return s.encode("latin1").decode(errors="replace") 

30 

31 

32def _wsgi_encoding_dance(s: str) -> str: 

33 return s.encode().decode("latin1") 

34 

35 

36def _get_environ(obj: WSGIEnvironment | Request) -> WSGIEnvironment: 

37 env = getattr(obj, "environ", obj) 

38 assert isinstance( 

39 env, dict 

40 ), f"{type(obj).__name__!r} is not a WSGI environment (has to be a dict)" 

41 return env 

42 

43 

44def _has_level_handler(logger: logging.Logger) -> bool: 

45 """Check if there is a handler in the logging chain that will handle 

46 the given logger's effective level. 

47 """ 

48 level = logger.getEffectiveLevel() 

49 current = logger 

50 

51 while current: 

52 if any(handler.level <= level for handler in current.handlers): 

53 return True 

54 

55 if not current.propagate: 

56 break 

57 

58 current = current.parent # type: ignore 

59 

60 return False 

61 

62 

63class _ColorStreamHandler(logging.StreamHandler): 

64 """On Windows, wrap stream with Colorama for ANSI style support.""" 

65 

66 def __init__(self) -> None: 

67 try: 

68 import colorama 

69 except ImportError: 

70 stream = None 

71 else: 

72 stream = colorama.AnsiToWin32(sys.stderr) 

73 

74 super().__init__(stream) 

75 

76 

77def _log(type: str, message: str, *args: t.Any, **kwargs: t.Any) -> None: 

78 """Log a message to the 'werkzeug' logger. 

79 

80 The logger is created the first time it is needed. If there is no 

81 level set, it is set to :data:`logging.INFO`. If there is no handler 

82 for the logger's effective level, a :class:`logging.StreamHandler` 

83 is added. 

84 """ 

85 global _logger 

86 

87 if _logger is None: 

88 _logger = logging.getLogger("werkzeug") 

89 

90 if _logger.level == logging.NOTSET: 

91 _logger.setLevel(logging.INFO) 

92 

93 if not _has_level_handler(_logger): 

94 _logger.addHandler(_ColorStreamHandler()) 

95 

96 getattr(_logger, type)(message.rstrip(), *args, **kwargs) 

97 

98 

99@t.overload 

100def _dt_as_utc(dt: None) -> None: 

101 ... 

102 

103 

104@t.overload 

105def _dt_as_utc(dt: datetime) -> datetime: 

106 ... 

107 

108 

109def _dt_as_utc(dt: datetime | None) -> datetime | None: 

110 if dt is None: 

111 return dt 

112 

113 if dt.tzinfo is None: 

114 return dt.replace(tzinfo=timezone.utc) 

115 elif dt.tzinfo != timezone.utc: 

116 return dt.astimezone(timezone.utc) 

117 

118 return dt 

119 

120 

121_TAccessorValue = t.TypeVar("_TAccessorValue") 

122 

123 

124class _DictAccessorProperty(t.Generic[_TAccessorValue]): 

125 """Baseclass for `environ_property` and `header_property`.""" 

126 

127 read_only = False 

128 

129 def __init__( 

130 self, 

131 name: str, 

132 default: _TAccessorValue | None = None, 

133 load_func: t.Callable[[str], _TAccessorValue] | None = None, 

134 dump_func: t.Callable[[_TAccessorValue], str] | None = None, 

135 read_only: bool | None = None, 

136 doc: str | None = None, 

137 ) -> None: 

138 self.name = name 

139 self.default = default 

140 self.load_func = load_func 

141 self.dump_func = dump_func 

142 if read_only is not None: 

143 self.read_only = read_only 

144 self.__doc__ = doc 

145 

146 def lookup(self, instance: t.Any) -> t.MutableMapping[str, t.Any]: 

147 raise NotImplementedError 

148 

149 @t.overload 

150 def __get__( 

151 self, instance: None, owner: type 

152 ) -> _DictAccessorProperty[_TAccessorValue]: 

153 ... 

154 

155 @t.overload 

156 def __get__(self, instance: t.Any, owner: type) -> _TAccessorValue: 

157 ... 

158 

159 def __get__( 

160 self, instance: t.Any | None, owner: type 

161 ) -> _TAccessorValue | _DictAccessorProperty[_TAccessorValue]: 

162 if instance is None: 

163 return self 

164 

165 storage = self.lookup(instance) 

166 

167 if self.name not in storage: 

168 return self.default # type: ignore 

169 

170 value = storage[self.name] 

171 

172 if self.load_func is not None: 

173 try: 

174 return self.load_func(value) 

175 except (ValueError, TypeError): 

176 return self.default # type: ignore 

177 

178 return value # type: ignore 

179 

180 def __set__(self, instance: t.Any, value: _TAccessorValue) -> None: 

181 if self.read_only: 

182 raise AttributeError("read only property") 

183 

184 if self.dump_func is not None: 

185 self.lookup(instance)[self.name] = self.dump_func(value) 

186 else: 

187 self.lookup(instance)[self.name] = value 

188 

189 def __delete__(self, instance: t.Any) -> None: 

190 if self.read_only: 

191 raise AttributeError("read only property") 

192 

193 self.lookup(instance).pop(self.name, None) 

194 

195 def __repr__(self) -> str: 

196 return f"<{type(self).__name__} {self.name}>" 

197 

198 

199_plain_int_re = re.compile(r"-?\d+", re.ASCII) 

200 

201 

202def _plain_int(value: str) -> int: 

203 """Parse an int only if it is only ASCII digits and ``-``. 

204 

205 This disallows ``+``, ``_``, and non-ASCII digits, which are accepted by ``int`` but 

206 are not allowed in HTTP header values. 

207 

208 Any leading or trailing whitespace is stripped 

209 """ 

210 value = value.strip() 

211 if _plain_int_re.fullmatch(value) is None: 

212 raise ValueError 

213 

214 return int(value)