Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/lambda_sources.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

183 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import ast 

12import hashlib 

13import inspect 

14import linecache 

15import sys 

16import textwrap 

17from collections.abc import MutableMapping 

18from inspect import Parameter 

19from typing import Any, Callable, Optional 

20from weakref import WeakKeyDictionary 

21 

22from hypothesis.internal import reflection 

23from hypothesis.internal.cache import LRUCache 

24 

25# we have several levels of caching for lambda descriptions. 

26# * LAMBDA_DESCRIPTION_CACHE maps a lambda f to its description _lambda_description(f). 

27# Note that _lambda_description(f) may not be identical to f as it appears in the 

28# source code file. 

29# * LAMBDA_DIGEST_DESCRIPTION_CACHE maps _function_key(f) to _lambda_description(f). 

30# _function_key implements something close to "ast equality": 

31# two syntactically identical (minus whitespace etc) lambdas appearing in 

32# different files have the same key. Cache hits here provide a fast path which 

33# avoids ast-parsing syntactic lambdas we've seen before. Two lambdas with the 

34# same _function_key will not have different _lambda_descriptions - if 

35# they do, that's a bug here. 

36# * AST_LAMBDAS_CACHE maps source code lines to a list of the lambdas found in 

37# that source code. A cache hit here avoids reparsing the ast. 

38LAMBDA_DESCRIPTION_CACHE: MutableMapping[Callable, str] = WeakKeyDictionary() 

39LAMBDA_DIGEST_DESCRIPTION_CACHE: LRUCache[tuple[Any], str] = LRUCache(max_size=1000) 

40AST_LAMBDAS_CACHE: LRUCache[tuple[str], list[ast.Lambda]] = LRUCache(max_size=100) 

41 

42 

43def extract_all_lambdas(tree): 

44 lambdas = [] 

45 

46 class Visitor(ast.NodeVisitor): 

47 

48 def visit_Lambda(self, node): 

49 lambdas.append(node) 

50 self.visit(node.body) 

51 

52 Visitor().visit(tree) 

53 return lambdas 

54 

55 

56def extract_all_attributes(tree): 

57 attributes = [] 

58 

59 class Visitor(ast.NodeVisitor): 

60 def visit_Attribute(self, node): 

61 attributes.append(node) 

62 self.visit(node.value) 

63 

64 Visitor().visit(tree) 

65 return attributes 

66 

67 

68def _function_key(f, *, bounded_size=False): 

69 """Returns a digest that differentiates functions that have different sources. 

70 

71 Either a function or a code object may be passed. If code object, default 

72 arg/kwarg values are not recoverable - this is the best we can do, and is 

73 sufficient for the use case of comparing nested lambdas. 

74 """ 

75 try: 

76 code = f.__code__ 

77 defaults_repr = repr((f.__defaults__, f.__kwdefaults__)) 

78 except AttributeError: 

79 code = f 

80 defaults_repr = () 

81 consts_repr = repr(code.co_consts) 

82 if bounded_size: 

83 # Compress repr to avoid keeping arbitrarily large strings pinned as cache 

84 # keys. We don't do this unconditionally because hashing takes time, and is 

85 # not necessary if the key is used just for comparison (and is not stored). 

86 if len(consts_repr) > 48: 

87 consts_repr = hashlib.sha384(consts_repr.encode()).digest() 

88 if len(defaults_repr) > 48: 

89 defaults_repr = hashlib.sha384(defaults_repr.encode()).digest() 

90 return ( 

91 consts_repr, 

92 defaults_repr, 

93 code.co_argcount, 

94 code.co_kwonlyargcount, 

95 code.co_code, 

96 code.co_names, 

97 code.co_varnames, 

98 code.co_freevars, 

99 code.co_name, 

100 ) 

101 

102 

103class _op: 

104 # Opcodes, from dis.opmap. These may change between major versions. 

105 NOP = 9 

106 LOAD_FAST = 85 

107 LOAD_FAST_LOAD_FAST = 88 

108 LOAD_FAST_BORROW = 86 

109 LOAD_FAST_BORROW_LOAD_FAST_BORROW = 87 

110 

111 

112def _normalize_code(f, l): 

113 # A small selection of possible peephole code transformations, based on what 

114 # is actually seen to differ between compilations in our test suite. Each 

115 # entry contains two equivalent opcode sequences, plus a condition 

116 # function called with their respective oparg sequences, which must return 

117 # true for the transformation to be valid. 

118 Checker = Callable[[list[int], list[int]], bool] 

119 transforms: tuple[list[int], list[int], Optional[Checker]] = [ 

120 ([_op.NOP], [], lambda a, b: True), 

121 ( 

122 [_op.LOAD_FAST, _op.LOAD_FAST], 

123 [_op.LOAD_FAST_LOAD_FAST], 

124 lambda a, b: a == [b[0] >> 4, b[0] & 15], 

125 ), 

126 ( 

127 [_op.LOAD_FAST_BORROW, _op.LOAD_FAST_BORROW], 

128 [_op.LOAD_FAST_BORROW_LOAD_FAST_BORROW], 

129 lambda a, b: a == [b[0] >> 4, b[0] & 15], 

130 ), 

131 ] 

132 # augment with converse 

133 transforms += [ 

134 ( 

135 ops_b, 

136 ops_a, 

137 condition and (lambda a, b, condition=condition: condition(b, a)), 

138 ) 

139 for ops_a, ops_b, condition in transforms 

140 ] 

141 

142 # Normalize equivalent code. We assume that each bytecode op is 2 bytes, 

143 # which is the case since Python 3.6. Since the opcodes values may change 

144 # between version, there is a risk that a transform may not be equivalent 

145 # -- even so, the risk of a bad transform producing a false positive is 

146 # minuscule. 

147 co_code = list(l.__code__.co_code) 

148 f_code = list(f.__code__.co_code) 

149 

150 def alternating(code, i, n): 

151 return code[i : i + 2 * n : 2] 

152 

153 i = 2 

154 while i < max(len(co_code), len(f_code)): 

155 # note that co_code is mutated in loop 

156 if i < min(len(co_code), len(f_code)) and f_code[i] == co_code[i]: 

157 i += 2 

158 else: 

159 for op1, op2, condition in transforms: 

160 if ( 

161 op1 == alternating(f_code, i, len(op1)) 

162 and op2 == alternating(co_code, i, len(op2)) 

163 and condition( 

164 alternating(f_code, i + 1, len(op1)), 

165 alternating(co_code, i + 1, len(op2)), 

166 ) 

167 ): 

168 break 

169 else: 

170 # no point in continuing since the bytecodes are different anyway 

171 break 

172 # Splice in the transform and continue 

173 co_code = ( 

174 co_code[:i] + f_code[i : i + 2 * len(op1)] + co_code[i + 2 * len(op2) :] 

175 ) 

176 i += 2 * len(op1) 

177 

178 # Normalize consts, in particular replace any lambda consts with the 

179 # corresponding const from the template function, IFF they have the same 

180 # source key. 

181 

182 f_consts = f.__code__.co_consts 

183 l_consts = l.__code__.co_consts 

184 if len(f_consts) == len(l_consts) and any( 

185 inspect.iscode(l_const) for l_const in l_consts 

186 ): 

187 normalized_consts = [] 

188 for f_const, l_const in zip(f_consts, l_consts): 

189 if ( 

190 inspect.iscode(l_const) 

191 and inspect.iscode(f_const) 

192 and _function_key(f_const) == _function_key(l_const) 

193 ): 

194 # If the lambdas are compiled from the same source, make them be the 

195 # same object so that the toplevel lambdas end up equal. Note that 

196 # default arguments are not available on the code objects. But if the 

197 # default arguments differ then the lambdas must also differ in other 

198 # ways, since default arguments are set up from bytecode and constants. 

199 # I.e., this appears to be safe wrt false positives. 

200 normalized_consts.append(f_const) 

201 else: 

202 normalized_consts.append(l_const) 

203 else: 

204 normalized_consts = l_consts 

205 

206 return l.__code__.replace( 

207 co_code=bytes(co_code), 

208 co_consts=tuple(normalized_consts), 

209 ) 

210 

211 

212_module_map: dict[int, str] = {} 

213 

214 

215def _mimic_lambda_from_node(f, node): 

216 # Compile the source (represented by an ast.Lambda node) in a context that 

217 # as far as possible mimics the context that f was compiled in. If - and 

218 # only if - this was the source of f then the result is indistinguishable 

219 # from f itself (to a casual observer such as _function_key). 

220 f_globals = f.__globals__.copy() 

221 f_code = f.__code__ 

222 source = ast.unparse(node) 

223 

224 # Install values for non-literal argument defaults. Thankfully, these are 

225 # always captured by value - so there is no interaction with the closure. 

226 if f.__defaults__: 

227 for f_default, l_default in zip(f.__defaults__, node.args.defaults): 

228 if isinstance(l_default, ast.Name): 

229 f_globals[l_default.id] = f_default 

230 if f.__kwdefaults__: # pragma: no cover 

231 for l_default, l_varname in zip(node.args.kw_defaults, node.args.kwonlyargs): 

232 if isinstance(l_default, ast.Name): 

233 f_globals[l_default.id] = f.__kwdefaults__[l_varname.arg] 

234 

235 # CPython's compiler treats known imports differently than normal globals, 

236 # so check if we use attributes from globals that are modules (if so, we 

237 # import them explicitly and redundantly in the exec below) 

238 referenced_modules = [ 

239 (local_name, module) 

240 for attr in extract_all_attributes(node) 

241 if ( 

242 isinstance(attr.value, ast.Name) 

243 and (local_name := attr.value.id) 

244 and inspect.ismodule(module := f_globals.get(local_name)) 

245 ) 

246 ] 

247 

248 if not f_code.co_freevars and not referenced_modules: 

249 compiled = eval(source, f_globals) 

250 else: 

251 if f_code.co_freevars: 

252 # We have to reconstruct a local closure. The closure will have 

253 # the same values as the original function, although this is not 

254 # required for source/bytecode equality. 

255 f_globals |= { 

256 f"__lc{i}": c.cell_contents for i, c in enumerate(f.__closure__) 

257 } 

258 captures = [f"{name}=__lc{i}" for i, name in enumerate(f_code.co_freevars)] 

259 capture_str = ";".join(captures) + ";" 

260 else: 

261 capture_str = "" 

262 if referenced_modules: 

263 # We add import statements for all referenced modules, since that 

264 # influences the compiled code. The assumption is that these modules 

265 # were explicitly imported, not assigned, in the source - if not, 

266 # this may/will give a different compilation result. 

267 global _module_map 

268 if len(_module_map) != len(sys.modules): # pragma: no branch 

269 _module_map = {id(module): name for name, module in sys.modules.items()} 

270 imports = [ 

271 (module_name, local_name) 

272 for local_name, module in referenced_modules 

273 if (module_name := _module_map.get(id(module))) is not None 

274 ] 

275 import_fragments = [f"{name} as {asname}" for name, asname in set(imports)] 

276 import_str = f"import {','.join(import_fragments)}\n" 

277 else: 

278 import_str = "" 

279 exec_str = ( 

280 f"{import_str}def __construct_lambda(): {capture_str} return ({source})" 

281 ) 

282 exec(exec_str, f_globals) 

283 compiled = f_globals["__construct_lambda"]() 

284 

285 return compiled 

286 

287 

288def _lambda_code_matches_node(f, node): 

289 try: 

290 compiled = _mimic_lambda_from_node(f, node) 

291 except (NameError, SyntaxError): # pragma: no cover # source is generated from ast 

292 return False 

293 if _function_key(f) == _function_key(compiled): 

294 return True 

295 # Try harder 

296 compiled.__code__ = _normalize_code(f, compiled) 

297 return _function_key(f) == _function_key(compiled) 

298 

299 

300def _check_unknown_perfectly_aligned_lambda(candidate): 

301 # This is a monkeypatch point for our self-tests, to make unknown 

302 # lambdas raise. 

303 pass 

304 

305 

306def _lambda_description(f, leeway=50, *, fail_if_confused_with_perfect_candidate=False): 

307 if hasattr(f, "__wrapped_target"): 

308 f = f.__wrapped_target 

309 

310 # You might be wondering how a lambda can have a return-type annotation? 

311 # The answer is that we add this at runtime, in new_given_signature(), 

312 # and we do support strange choices as applying @given() to a lambda. 

313 sig = inspect.signature(f) 

314 assert sig.return_annotation in (Parameter.empty, None), sig 

315 

316 # Using pytest-xdist on Python 3.13, there's an entry in the linecache for 

317 # file "<string>", which then returns nonsense to getsource. Discard it. 

318 linecache.cache.pop("<string>", None) 

319 

320 def format_lambda(body): 

321 # The signature is more informative than the corresponding ast.unparse 

322 # output in the case of default argument values, so add the signature 

323 # to the unparsed body 

324 return ( 

325 f"lambda {str(sig)[1:-1]}: {body}" if sig.parameters else f"lambda: {body}" 

326 ) 

327 

328 if_confused = format_lambda("<unknown>") 

329 

330 try: 

331 source_lines, lineno0 = inspect.findsource(f) 

332 source_lines = tuple(source_lines) # make it hashable 

333 except OSError: 

334 return if_confused 

335 

336 try: 

337 all_lambdas = AST_LAMBDAS_CACHE[source_lines] 

338 except KeyError: 

339 # The source isn't already parsed, so we try to shortcut by parsing just 

340 # the local block. If that fails to produce a code-identical lambda, 

341 # fall through to the full parse. 

342 local_lines = inspect.getblock(source_lines[lineno0:]) 

343 local_block = textwrap.dedent("".join(local_lines)) 

344 # The fairly common ".map(lambda x: ...)" case. This partial block 

345 # isn't valid syntax, but it might be if we remove the leading ".". 

346 local_block = local_block.removeprefix(".") 

347 

348 try: 

349 local_tree = ast.parse(local_block) 

350 except SyntaxError: 

351 pass 

352 else: 

353 local_lambdas = extract_all_lambdas(local_tree) 

354 for candidate in local_lambdas: 

355 if reflection.ast_arguments_matches_signature( 

356 candidate.args, sig 

357 ) and _lambda_code_matches_node(f, candidate): 

358 return format_lambda(ast.unparse(candidate.body)) 

359 

360 # Local parse failed or didn't produce a match, go ahead with the full parse 

361 try: 

362 tree = ast.parse("".join(source_lines)) 

363 except SyntaxError: 

364 all_lambdas = [] 

365 else: 

366 all_lambdas = extract_all_lambdas(tree) 

367 AST_LAMBDAS_CACHE[source_lines] = all_lambdas 

368 

369 aligned_lambdas = [] 

370 for candidate in all_lambdas: 

371 if ( 

372 candidate.lineno - leeway <= lineno0 + 1 <= candidate.lineno + leeway 

373 and reflection.ast_arguments_matches_signature(candidate.args, sig) 

374 ): 

375 aligned_lambdas.append(candidate) 

376 

377 aligned_lambdas.sort(key=lambda c: abs(lineno0 + 1 - c.lineno)) 

378 for candidate in aligned_lambdas: 

379 if _lambda_code_matches_node(f, candidate): 

380 return format_lambda(ast.unparse(candidate.body)) 

381 

382 # None of the aligned lambdas match perfectly in generated code. 

383 if aligned_lambdas and aligned_lambdas[0].lineno == lineno0 + 1: 

384 _check_unknown_perfectly_aligned_lambda(aligned_lambdas[0]) 

385 

386 return if_confused 

387 

388 

389def lambda_description(f): 

390 """ 

391 Returns a syntactically-valid expression describing `f`. This is often, but 

392 not always, the exact lambda definition string which appears in the source code. 

393 The difference comes from parsing the lambda ast into `tree` and then returning 

394 the result of `ast.unparse(tree)`, which may differ in whitespace, double vs 

395 single quotes, etc. 

396 

397 Returns a string indicating an unknown body if the parsing gets confused in any way. 

398 """ 

399 try: 

400 return LAMBDA_DESCRIPTION_CACHE[f] 

401 except KeyError: 

402 pass 

403 

404 key = _function_key(f, bounded_size=True) 

405 location = (f.__code__.co_filename, f.__code__.co_firstlineno) 

406 try: 

407 description, failed_locations = LAMBDA_DIGEST_DESCRIPTION_CACHE[key] 

408 except KeyError: 

409 failed_locations = set() 

410 else: 

411 # We got a hit in the digests cache, but only use it if either it has 

412 # a good (known) description, or if it is unknown but we already tried 

413 # to parse its exact source location before. 

414 if "<unknown>" not in description or location in failed_locations: 

415 # use the cached result 

416 LAMBDA_DESCRIPTION_CACHE[f] = description 

417 return description 

418 

419 description = _lambda_description(f) 

420 LAMBDA_DESCRIPTION_CACHE[f] = description 

421 if "<unknown>" in description: 

422 failed_locations.add(location) 

423 else: 

424 failed_locations.clear() # we have a good description now 

425 LAMBDA_DIGEST_DESCRIPTION_CACHE[key] = description, failed_locations 

426 return description