Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/_internal.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

111 statements  

1from __future__ import annotations 

2 

3import logging 

4import re 

5import sys 

6import typing as t 

7from datetime import datetime 

8from datetime import timezone 

9 

10if t.TYPE_CHECKING: 

11 from _typeshed.wsgi import WSGIEnvironment 

12 

13 from .wrappers.request import Request 

14 

15_logger: logging.Logger | None = None 

16 

17 

18class _Missing: 

19 def __repr__(self) -> str: 

20 return "no value" 

21 

22 def __reduce__(self) -> str: 

23 return "_missing" 

24 

25 

26_missing = _Missing() 

27 

28 

29def _wsgi_decoding_dance(s: str) -> str: 

30 return s.encode("latin1").decode(errors="replace") 

31 

32 

33def _wsgi_encoding_dance(s: str) -> str: 

34 return s.encode().decode("latin1") 

35 

36 

37def _get_environ(obj: WSGIEnvironment | Request) -> WSGIEnvironment: 

38 env = getattr(obj, "environ", obj) 

39 assert isinstance( 

40 env, dict 

41 ), f"{type(obj).__name__!r} is not a WSGI environment (has to be a dict)" 

42 return env 

43 

44 

45def _has_level_handler(logger: logging.Logger) -> bool: 

46 """Check if there is a handler in the logging chain that will handle 

47 the given logger's effective level. 

48 """ 

49 level = logger.getEffectiveLevel() 

50 current = logger 

51 

52 while current: 

53 if any(handler.level <= level for handler in current.handlers): 

54 return True 

55 

56 if not current.propagate: 

57 break 

58 

59 current = current.parent # type: ignore 

60 

61 return False 

62 

63 

64class _ColorStreamHandler(logging.StreamHandler): # type: ignore[type-arg] 

65 """On Windows, wrap stream with Colorama for ANSI style support.""" 

66 

67 def __init__(self) -> None: 

68 try: 

69 import colorama 

70 except ImportError: 

71 stream = None 

72 else: 

73 stream = colorama.AnsiToWin32(sys.stderr) 

74 

75 super().__init__(stream) 

76 

77 

78def _log(type: str, message: str, *args: t.Any, **kwargs: t.Any) -> None: 

79 """Log a message to the 'werkzeug' logger. 

80 

81 The logger is created the first time it is needed. If there is no 

82 level set, it is set to :data:`logging.INFO`. If there is no handler 

83 for the logger's effective level, a :class:`logging.StreamHandler` 

84 is added. 

85 """ 

86 global _logger 

87 

88 if _logger is None: 

89 _logger = logging.getLogger("werkzeug") 

90 

91 if _logger.level == logging.NOTSET: 

92 _logger.setLevel(logging.INFO) 

93 

94 if not _has_level_handler(_logger): 

95 _logger.addHandler(_ColorStreamHandler()) 

96 

97 getattr(_logger, type)(message.rstrip(), *args, **kwargs) 

98 

99 

100@t.overload 

101def _dt_as_utc(dt: None) -> None: ... 

102 

103 

104@t.overload 

105def _dt_as_utc(dt: datetime) -> datetime: ... 

106 

107 

108def _dt_as_utc(dt: datetime | None) -> datetime | None: 

109 if dt is None: 

110 return dt 

111 

112 if dt.tzinfo is None: 

113 return dt.replace(tzinfo=timezone.utc) 

114 elif dt.tzinfo != timezone.utc: 

115 return dt.astimezone(timezone.utc) 

116 

117 return dt 

118 

119 

120_TAccessorValue = t.TypeVar("_TAccessorValue") 

121 

122 

123class _DictAccessorProperty(t.Generic[_TAccessorValue]): 

124 """Baseclass for `environ_property` and `header_property`.""" 

125 

126 read_only = False 

127 

128 def __init__( 

129 self, 

130 name: str, 

131 default: _TAccessorValue | None = None, 

132 load_func: t.Callable[[str], _TAccessorValue] | None = None, 

133 dump_func: t.Callable[[_TAccessorValue], str] | None = None, 

134 read_only: bool | None = None, 

135 doc: str | None = None, 

136 ) -> None: 

137 self.name = name 

138 self.default = default 

139 self.load_func = load_func 

140 self.dump_func = dump_func 

141 if read_only is not None: 

142 self.read_only = read_only 

143 self.__doc__ = doc 

144 

145 def lookup(self, instance: t.Any) -> t.MutableMapping[str, t.Any]: 

146 raise NotImplementedError 

147 

148 @t.overload 

149 def __get__( 

150 self, instance: None, owner: type 

151 ) -> _DictAccessorProperty[_TAccessorValue]: ... 

152 

153 @t.overload 

154 def __get__(self, instance: t.Any, owner: type) -> _TAccessorValue: ... 

155 

156 def __get__( 

157 self, instance: t.Any | None, owner: type 

158 ) -> _TAccessorValue | _DictAccessorProperty[_TAccessorValue]: 

159 if instance is None: 

160 return self 

161 

162 storage = self.lookup(instance) 

163 

164 if self.name not in storage: 

165 return self.default # type: ignore 

166 

167 value = storage[self.name] 

168 

169 if self.load_func is not None: 

170 try: 

171 return self.load_func(value) 

172 except (ValueError, TypeError): 

173 return self.default # type: ignore 

174 

175 return value # type: ignore 

176 

177 def __set__(self, instance: t.Any, value: _TAccessorValue) -> None: 

178 if self.read_only: 

179 raise AttributeError("read only property") 

180 

181 if self.dump_func is not None: 

182 self.lookup(instance)[self.name] = self.dump_func(value) 

183 else: 

184 self.lookup(instance)[self.name] = value 

185 

186 def __delete__(self, instance: t.Any) -> None: 

187 if self.read_only: 

188 raise AttributeError("read only property") 

189 

190 self.lookup(instance).pop(self.name, None) 

191 

192 def __repr__(self) -> str: 

193 return f"<{type(self).__name__} {self.name}>" 

194 

195 

196_plain_int_re = re.compile(r"-?\d+", re.ASCII) 

197 

198 

199def _plain_int(value: str) -> int: 

200 """Parse an int only if it is only ASCII digits and ``-``. 

201 

202 This disallows ``+``, ``_``, and non-ASCII digits, which are accepted by ``int`` but 

203 are not allowed in HTTP header values. 

204 

205 Any leading or trailing whitespace is stripped 

206 """ 

207 value = value.strip() 

208 if _plain_int_re.fullmatch(value) is None: 

209 raise ValueError 

210 

211 return int(value)