Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/mako/util.py: 21%

219 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:02 +0000

1# mako/util.py 

2# Copyright 2006-2023 the Mako authors and contributors <see AUTHORS file> 

3# 

4# This module is part of Mako and is released under 

5# the MIT License: http://www.opensource.org/licenses/mit-license.php 

6from ast import parse 

7import codecs 

8import collections 

9import operator 

10import os 

11import re 

12import timeit 

13 

14from .compat import importlib_metadata_get 

15 

16 

17def update_wrapper(decorated, fn): 

18 decorated.__wrapped__ = fn 

19 decorated.__name__ = fn.__name__ 

20 return decorated 

21 

22 

23class PluginLoader: 

24 def __init__(self, group): 

25 self.group = group 

26 self.impls = {} 

27 

28 def load(self, name): 

29 if name in self.impls: 

30 return self.impls[name]() 

31 

32 for impl in importlib_metadata_get(self.group): 

33 if impl.name == name: 

34 self.impls[name] = impl.load 

35 return impl.load() 

36 

37 from mako import exceptions 

38 

39 raise exceptions.RuntimeException( 

40 "Can't load plugin %s %s" % (self.group, name) 

41 ) 

42 

43 def register(self, name, modulepath, objname): 

44 def load(): 

45 mod = __import__(modulepath) 

46 for token in modulepath.split(".")[1:]: 

47 mod = getattr(mod, token) 

48 return getattr(mod, objname) 

49 

50 self.impls[name] = load 

51 

52 

53def verify_directory(dir_): 

54 """create and/or verify a filesystem directory.""" 

55 

56 tries = 0 

57 

58 while not os.path.exists(dir_): 

59 try: 

60 tries += 1 

61 os.makedirs(dir_, 0o755) 

62 except: 

63 if tries > 5: 

64 raise 

65 

66 

67def to_list(x, default=None): 

68 if x is None: 

69 return default 

70 if not isinstance(x, (list, tuple)): 

71 return [x] 

72 else: 

73 return x 

74 

75 

76class memoized_property: 

77 

78 """A read-only @property that is only evaluated once.""" 

79 

80 def __init__(self, fget, doc=None): 

81 self.fget = fget 

82 self.__doc__ = doc or fget.__doc__ 

83 self.__name__ = fget.__name__ 

84 

85 def __get__(self, obj, cls): 

86 if obj is None: 

87 return self 

88 obj.__dict__[self.__name__] = result = self.fget(obj) 

89 return result 

90 

91 

92class memoized_instancemethod: 

93 

94 """Decorate a method memoize its return value. 

95 

96 Best applied to no-arg methods: memoization is not sensitive to 

97 argument values, and will always return the same value even when 

98 called with different arguments. 

99 

100 """ 

101 

102 def __init__(self, fget, doc=None): 

103 self.fget = fget 

104 self.__doc__ = doc or fget.__doc__ 

105 self.__name__ = fget.__name__ 

106 

107 def __get__(self, obj, cls): 

108 if obj is None: 

109 return self 

110 

111 def oneshot(*args, **kw): 

112 result = self.fget(obj, *args, **kw) 

113 

114 def memo(*a, **kw): 

115 return result 

116 

117 memo.__name__ = self.__name__ 

118 memo.__doc__ = self.__doc__ 

119 obj.__dict__[self.__name__] = memo 

120 return result 

121 

122 oneshot.__name__ = self.__name__ 

123 oneshot.__doc__ = self.__doc__ 

124 return oneshot 

125 

126 

127class SetLikeDict(dict): 

128 

129 """a dictionary that has some setlike methods on it""" 

130 

131 def union(self, other): 

132 """produce a 'union' of this dict and another (at the key level). 

133 

134 values in the second dict take precedence over that of the first""" 

135 x = SetLikeDict(**self) 

136 x.update(other) 

137 return x 

138 

139 

140class FastEncodingBuffer: 

141 

142 """a very rudimentary buffer that is faster than StringIO, 

143 and supports unicode data.""" 

144 

145 def __init__(self, encoding=None, errors="strict"): 

146 self.data = collections.deque() 

147 self.encoding = encoding 

148 self.delim = "" 

149 self.errors = errors 

150 self.write = self.data.append 

151 

152 def truncate(self): 

153 self.data = collections.deque() 

154 self.write = self.data.append 

155 

156 def getvalue(self): 

157 if self.encoding: 

158 return self.delim.join(self.data).encode( 

159 self.encoding, self.errors 

160 ) 

161 else: 

162 return self.delim.join(self.data) 

163 

164 

165class LRUCache(dict): 

166 

167 """A dictionary-like object that stores a limited number of items, 

168 discarding lesser used items periodically. 

169 

170 this is a rewrite of LRUCache from Myghty to use a periodic timestamp-based 

171 paradigm so that synchronization is not really needed. the size management 

172 is inexact. 

173 """ 

174 

175 class _Item: 

176 def __init__(self, key, value): 

177 self.key = key 

178 self.value = value 

179 self.timestamp = timeit.default_timer() 

180 

181 def __repr__(self): 

182 return repr(self.value) 

183 

184 def __init__(self, capacity, threshold=0.5): 

185 self.capacity = capacity 

186 self.threshold = threshold 

187 

188 def __getitem__(self, key): 

189 item = dict.__getitem__(self, key) 

190 item.timestamp = timeit.default_timer() 

191 return item.value 

192 

193 def values(self): 

194 return [i.value for i in dict.values(self)] 

195 

196 def setdefault(self, key, value): 

197 if key in self: 

198 return self[key] 

199 self[key] = value 

200 return value 

201 

202 def __setitem__(self, key, value): 

203 item = dict.get(self, key) 

204 if item is None: 

205 item = self._Item(key, value) 

206 dict.__setitem__(self, key, item) 

207 else: 

208 item.value = value 

209 self._manage_size() 

210 

211 def _manage_size(self): 

212 while len(self) > self.capacity + self.capacity * self.threshold: 

213 bytime = sorted( 

214 dict.values(self), 

215 key=operator.attrgetter("timestamp"), 

216 reverse=True, 

217 ) 

218 for item in bytime[self.capacity :]: 

219 try: 

220 del self[item.key] 

221 except KeyError: 

222 # if we couldn't find a key, most likely some other thread 

223 # broke in on us. loop around and try again 

224 break 

225 

226 

227# Regexp to match python magic encoding line 

228_PYTHON_MAGIC_COMMENT_re = re.compile( 

229 r"[ \t\f]* \# .* coding[=:][ \t]*([-\w.]+)", re.VERBOSE 

230) 

231 

232 

233def parse_encoding(fp): 

234 """Deduce the encoding of a Python source file (binary mode) from magic 

235 comment. 

236 

237 It does this in the same way as the `Python interpreter`__ 

238 

239 .. __: http://docs.python.org/ref/encodings.html 

240 

241 The ``fp`` argument should be a seekable file object in binary mode. 

242 """ 

243 pos = fp.tell() 

244 fp.seek(0) 

245 try: 

246 line1 = fp.readline() 

247 has_bom = line1.startswith(codecs.BOM_UTF8) 

248 if has_bom: 

249 line1 = line1[len(codecs.BOM_UTF8) :] 

250 

251 m = _PYTHON_MAGIC_COMMENT_re.match(line1.decode("ascii", "ignore")) 

252 if not m: 

253 try: 

254 parse(line1.decode("ascii", "ignore")) 

255 except (ImportError, SyntaxError): 

256 # Either it's a real syntax error, in which case the source 

257 # is not valid python source, or line2 is a continuation of 

258 # line1, in which case we don't want to scan line2 for a magic 

259 # comment. 

260 pass 

261 else: 

262 line2 = fp.readline() 

263 m = _PYTHON_MAGIC_COMMENT_re.match( 

264 line2.decode("ascii", "ignore") 

265 ) 

266 

267 if has_bom: 

268 if m: 

269 raise SyntaxError( 

270 "python refuses to compile code with both a UTF8" 

271 " byte-order-mark and a magic encoding comment" 

272 ) 

273 return "utf_8" 

274 elif m: 

275 return m.group(1) 

276 else: 

277 return None 

278 finally: 

279 fp.seek(pos) 

280 

281 

282def sorted_dict_repr(d): 

283 """repr() a dictionary with the keys in order. 

284 

285 Used by the lexer unit test to compare parse trees based on strings. 

286 

287 """ 

288 keys = list(d.keys()) 

289 keys.sort() 

290 return "{" + ", ".join("%r: %r" % (k, d[k]) for k in keys) + "}" 

291 

292 

293def restore__ast(_ast): 

294 """Attempt to restore the required classes to the _ast module if it 

295 appears to be missing them 

296 """ 

297 if hasattr(_ast, "AST"): 

298 return 

299 _ast.PyCF_ONLY_AST = 2 << 9 

300 m = compile( 

301 """\ 

302def foo(): pass 

303class Bar: pass 

304if False: pass 

305baz = 'mako' 

3061 + 2 - 3 * 4 / 5 

3076 // 7 % 8 << 9 >> 10 

30811 & 12 ^ 13 | 14 

30915 and 16 or 17 

310-baz + (not +18) - ~17 

311baz and 'foo' or 'bar' 

312(mako is baz == baz) is not baz != mako 

313mako > baz < mako >= baz <= mako 

314mako in baz not in mako""", 

315 "<unknown>", 

316 "exec", 

317 _ast.PyCF_ONLY_AST, 

318 ) 

319 _ast.Module = type(m) 

320 

321 for cls in _ast.Module.__mro__: 

322 if cls.__name__ == "mod": 

323 _ast.mod = cls 

324 elif cls.__name__ == "AST": 

325 _ast.AST = cls 

326 

327 _ast.FunctionDef = type(m.body[0]) 

328 _ast.ClassDef = type(m.body[1]) 

329 _ast.If = type(m.body[2]) 

330 

331 _ast.Name = type(m.body[3].targets[0]) 

332 _ast.Store = type(m.body[3].targets[0].ctx) 

333 _ast.Str = type(m.body[3].value) 

334 

335 _ast.Sub = type(m.body[4].value.op) 

336 _ast.Add = type(m.body[4].value.left.op) 

337 _ast.Div = type(m.body[4].value.right.op) 

338 _ast.Mult = type(m.body[4].value.right.left.op) 

339 

340 _ast.RShift = type(m.body[5].value.op) 

341 _ast.LShift = type(m.body[5].value.left.op) 

342 _ast.Mod = type(m.body[5].value.left.left.op) 

343 _ast.FloorDiv = type(m.body[5].value.left.left.left.op) 

344 

345 _ast.BitOr = type(m.body[6].value.op) 

346 _ast.BitXor = type(m.body[6].value.left.op) 

347 _ast.BitAnd = type(m.body[6].value.left.left.op) 

348 

349 _ast.Or = type(m.body[7].value.op) 

350 _ast.And = type(m.body[7].value.values[0].op) 

351 

352 _ast.Invert = type(m.body[8].value.right.op) 

353 _ast.Not = type(m.body[8].value.left.right.op) 

354 _ast.UAdd = type(m.body[8].value.left.right.operand.op) 

355 _ast.USub = type(m.body[8].value.left.left.op) 

356 

357 _ast.Or = type(m.body[9].value.op) 

358 _ast.And = type(m.body[9].value.values[0].op) 

359 

360 _ast.IsNot = type(m.body[10].value.ops[0]) 

361 _ast.NotEq = type(m.body[10].value.ops[1]) 

362 _ast.Is = type(m.body[10].value.left.ops[0]) 

363 _ast.Eq = type(m.body[10].value.left.ops[1]) 

364 

365 _ast.Gt = type(m.body[11].value.ops[0]) 

366 _ast.Lt = type(m.body[11].value.ops[1]) 

367 _ast.GtE = type(m.body[11].value.ops[2]) 

368 _ast.LtE = type(m.body[11].value.ops[3]) 

369 

370 _ast.In = type(m.body[12].value.ops[0]) 

371 _ast.NotIn = type(m.body[12].value.ops[1]) 

372 

373 

374def read_file(path, mode="rb"): 

375 with open(path, mode) as fp: 

376 return fp.read() 

377 

378 

379def read_python_file(path): 

380 fp = open(path, "rb") 

381 try: 

382 encoding = parse_encoding(fp) 

383 data = fp.read() 

384 if encoding: 

385 data = data.decode(encoding) 

386 return data 

387 finally: 

388 fp.close()