Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/mako/util.py: 21%
219 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:02 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:02 +0000
1# mako/util.py
2# Copyright 2006-2023 the Mako authors and contributors <see AUTHORS file>
3#
4# This module is part of Mako and is released under
5# the MIT License: http://www.opensource.org/licenses/mit-license.php
6from ast import parse
7import codecs
8import collections
9import operator
10import os
11import re
12import timeit
14from .compat import importlib_metadata_get
17def update_wrapper(decorated, fn):
18 decorated.__wrapped__ = fn
19 decorated.__name__ = fn.__name__
20 return decorated
23class PluginLoader:
24 def __init__(self, group):
25 self.group = group
26 self.impls = {}
28 def load(self, name):
29 if name in self.impls:
30 return self.impls[name]()
32 for impl in importlib_metadata_get(self.group):
33 if impl.name == name:
34 self.impls[name] = impl.load
35 return impl.load()
37 from mako import exceptions
39 raise exceptions.RuntimeException(
40 "Can't load plugin %s %s" % (self.group, name)
41 )
43 def register(self, name, modulepath, objname):
44 def load():
45 mod = __import__(modulepath)
46 for token in modulepath.split(".")[1:]:
47 mod = getattr(mod, token)
48 return getattr(mod, objname)
50 self.impls[name] = load
53def verify_directory(dir_):
54 """create and/or verify a filesystem directory."""
56 tries = 0
58 while not os.path.exists(dir_):
59 try:
60 tries += 1
61 os.makedirs(dir_, 0o755)
62 except:
63 if tries > 5:
64 raise
67def to_list(x, default=None):
68 if x is None:
69 return default
70 if not isinstance(x, (list, tuple)):
71 return [x]
72 else:
73 return x
76class memoized_property:
78 """A read-only @property that is only evaluated once."""
80 def __init__(self, fget, doc=None):
81 self.fget = fget
82 self.__doc__ = doc or fget.__doc__
83 self.__name__ = fget.__name__
85 def __get__(self, obj, cls):
86 if obj is None:
87 return self
88 obj.__dict__[self.__name__] = result = self.fget(obj)
89 return result
92class memoized_instancemethod:
94 """Decorate a method memoize its return value.
96 Best applied to no-arg methods: memoization is not sensitive to
97 argument values, and will always return the same value even when
98 called with different arguments.
100 """
102 def __init__(self, fget, doc=None):
103 self.fget = fget
104 self.__doc__ = doc or fget.__doc__
105 self.__name__ = fget.__name__
107 def __get__(self, obj, cls):
108 if obj is None:
109 return self
111 def oneshot(*args, **kw):
112 result = self.fget(obj, *args, **kw)
114 def memo(*a, **kw):
115 return result
117 memo.__name__ = self.__name__
118 memo.__doc__ = self.__doc__
119 obj.__dict__[self.__name__] = memo
120 return result
122 oneshot.__name__ = self.__name__
123 oneshot.__doc__ = self.__doc__
124 return oneshot
127class SetLikeDict(dict):
129 """a dictionary that has some setlike methods on it"""
131 def union(self, other):
132 """produce a 'union' of this dict and another (at the key level).
134 values in the second dict take precedence over that of the first"""
135 x = SetLikeDict(**self)
136 x.update(other)
137 return x
140class FastEncodingBuffer:
142 """a very rudimentary buffer that is faster than StringIO,
143 and supports unicode data."""
145 def __init__(self, encoding=None, errors="strict"):
146 self.data = collections.deque()
147 self.encoding = encoding
148 self.delim = ""
149 self.errors = errors
150 self.write = self.data.append
152 def truncate(self):
153 self.data = collections.deque()
154 self.write = self.data.append
156 def getvalue(self):
157 if self.encoding:
158 return self.delim.join(self.data).encode(
159 self.encoding, self.errors
160 )
161 else:
162 return self.delim.join(self.data)
165class LRUCache(dict):
167 """A dictionary-like object that stores a limited number of items,
168 discarding lesser used items periodically.
170 this is a rewrite of LRUCache from Myghty to use a periodic timestamp-based
171 paradigm so that synchronization is not really needed. the size management
172 is inexact.
173 """
175 class _Item:
176 def __init__(self, key, value):
177 self.key = key
178 self.value = value
179 self.timestamp = timeit.default_timer()
181 def __repr__(self):
182 return repr(self.value)
184 def __init__(self, capacity, threshold=0.5):
185 self.capacity = capacity
186 self.threshold = threshold
188 def __getitem__(self, key):
189 item = dict.__getitem__(self, key)
190 item.timestamp = timeit.default_timer()
191 return item.value
193 def values(self):
194 return [i.value for i in dict.values(self)]
196 def setdefault(self, key, value):
197 if key in self:
198 return self[key]
199 self[key] = value
200 return value
202 def __setitem__(self, key, value):
203 item = dict.get(self, key)
204 if item is None:
205 item = self._Item(key, value)
206 dict.__setitem__(self, key, item)
207 else:
208 item.value = value
209 self._manage_size()
211 def _manage_size(self):
212 while len(self) > self.capacity + self.capacity * self.threshold:
213 bytime = sorted(
214 dict.values(self),
215 key=operator.attrgetter("timestamp"),
216 reverse=True,
217 )
218 for item in bytime[self.capacity :]:
219 try:
220 del self[item.key]
221 except KeyError:
222 # if we couldn't find a key, most likely some other thread
223 # broke in on us. loop around and try again
224 break
227# Regexp to match python magic encoding line
228_PYTHON_MAGIC_COMMENT_re = re.compile(
229 r"[ \t\f]* \# .* coding[=:][ \t]*([-\w.]+)", re.VERBOSE
230)
233def parse_encoding(fp):
234 """Deduce the encoding of a Python source file (binary mode) from magic
235 comment.
237 It does this in the same way as the `Python interpreter`__
239 .. __: http://docs.python.org/ref/encodings.html
241 The ``fp`` argument should be a seekable file object in binary mode.
242 """
243 pos = fp.tell()
244 fp.seek(0)
245 try:
246 line1 = fp.readline()
247 has_bom = line1.startswith(codecs.BOM_UTF8)
248 if has_bom:
249 line1 = line1[len(codecs.BOM_UTF8) :]
251 m = _PYTHON_MAGIC_COMMENT_re.match(line1.decode("ascii", "ignore"))
252 if not m:
253 try:
254 parse(line1.decode("ascii", "ignore"))
255 except (ImportError, SyntaxError):
256 # Either it's a real syntax error, in which case the source
257 # is not valid python source, or line2 is a continuation of
258 # line1, in which case we don't want to scan line2 for a magic
259 # comment.
260 pass
261 else:
262 line2 = fp.readline()
263 m = _PYTHON_MAGIC_COMMENT_re.match(
264 line2.decode("ascii", "ignore")
265 )
267 if has_bom:
268 if m:
269 raise SyntaxError(
270 "python refuses to compile code with both a UTF8"
271 " byte-order-mark and a magic encoding comment"
272 )
273 return "utf_8"
274 elif m:
275 return m.group(1)
276 else:
277 return None
278 finally:
279 fp.seek(pos)
282def sorted_dict_repr(d):
283 """repr() a dictionary with the keys in order.
285 Used by the lexer unit test to compare parse trees based on strings.
287 """
288 keys = list(d.keys())
289 keys.sort()
290 return "{" + ", ".join("%r: %r" % (k, d[k]) for k in keys) + "}"
293def restore__ast(_ast):
294 """Attempt to restore the required classes to the _ast module if it
295 appears to be missing them
296 """
297 if hasattr(_ast, "AST"):
298 return
299 _ast.PyCF_ONLY_AST = 2 << 9
300 m = compile(
301 """\
302def foo(): pass
303class Bar: pass
304if False: pass
305baz = 'mako'
3061 + 2 - 3 * 4 / 5
3076 // 7 % 8 << 9 >> 10
30811 & 12 ^ 13 | 14
30915 and 16 or 17
310-baz + (not +18) - ~17
311baz and 'foo' or 'bar'
312(mako is baz == baz) is not baz != mako
313mako > baz < mako >= baz <= mako
314mako in baz not in mako""",
315 "<unknown>",
316 "exec",
317 _ast.PyCF_ONLY_AST,
318 )
319 _ast.Module = type(m)
321 for cls in _ast.Module.__mro__:
322 if cls.__name__ == "mod":
323 _ast.mod = cls
324 elif cls.__name__ == "AST":
325 _ast.AST = cls
327 _ast.FunctionDef = type(m.body[0])
328 _ast.ClassDef = type(m.body[1])
329 _ast.If = type(m.body[2])
331 _ast.Name = type(m.body[3].targets[0])
332 _ast.Store = type(m.body[3].targets[0].ctx)
333 _ast.Str = type(m.body[3].value)
335 _ast.Sub = type(m.body[4].value.op)
336 _ast.Add = type(m.body[4].value.left.op)
337 _ast.Div = type(m.body[4].value.right.op)
338 _ast.Mult = type(m.body[4].value.right.left.op)
340 _ast.RShift = type(m.body[5].value.op)
341 _ast.LShift = type(m.body[5].value.left.op)
342 _ast.Mod = type(m.body[5].value.left.left.op)
343 _ast.FloorDiv = type(m.body[5].value.left.left.left.op)
345 _ast.BitOr = type(m.body[6].value.op)
346 _ast.BitXor = type(m.body[6].value.left.op)
347 _ast.BitAnd = type(m.body[6].value.left.left.op)
349 _ast.Or = type(m.body[7].value.op)
350 _ast.And = type(m.body[7].value.values[0].op)
352 _ast.Invert = type(m.body[8].value.right.op)
353 _ast.Not = type(m.body[8].value.left.right.op)
354 _ast.UAdd = type(m.body[8].value.left.right.operand.op)
355 _ast.USub = type(m.body[8].value.left.left.op)
357 _ast.Or = type(m.body[9].value.op)
358 _ast.And = type(m.body[9].value.values[0].op)
360 _ast.IsNot = type(m.body[10].value.ops[0])
361 _ast.NotEq = type(m.body[10].value.ops[1])
362 _ast.Is = type(m.body[10].value.left.ops[0])
363 _ast.Eq = type(m.body[10].value.left.ops[1])
365 _ast.Gt = type(m.body[11].value.ops[0])
366 _ast.Lt = type(m.body[11].value.ops[1])
367 _ast.GtE = type(m.body[11].value.ops[2])
368 _ast.LtE = type(m.body[11].value.ops[3])
370 _ast.In = type(m.body[12].value.ops[0])
371 _ast.NotIn = type(m.body[12].value.ops[1])
374def read_file(path, mode="rb"):
375 with open(path, mode) as fp:
376 return fp.read()
379def read_python_file(path):
380 fp = open(path, "rb")
381 try:
382 encoding = parse_encoding(fp)
383 data = fp.read()
384 if encoding:
385 data = data.decode(encoding)
386 return data
387 finally:
388 fp.close()