Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/hypothesis/internal/lambda_sources.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

183 statements  

1# This file is part of Hypothesis, which may be found at 

2# https://github.com/HypothesisWorks/hypothesis/ 

3# 

4# Copyright the Hypothesis Authors. 

5# Individual contributors are listed in AUTHORS.rst and the git log. 

6# 

7# This Source Code Form is subject to the terms of the Mozilla Public License, 

8# v. 2.0. If a copy of the MPL was not distributed with this file, You can 

9# obtain one at https://mozilla.org/MPL/2.0/. 

10 

11import ast 

12import hashlib 

13import inspect 

14import linecache 

15import sys 

16import textwrap 

17from collections.abc import Callable, MutableMapping 

18from inspect import Parameter 

19from typing import Any 

20from weakref import WeakKeyDictionary 

21 

22from hypothesis.internal import reflection 

23from hypothesis.internal.cache import LRUCache 

24 

25# we have several levels of caching for lambda descriptions. 

26# * LAMBDA_DESCRIPTION_CACHE maps a lambda f to its description _lambda_description(f). 

27# Note that _lambda_description(f) may not be identical to f as it appears in the 

28# source code file. 

29# * LAMBDA_DIGEST_DESCRIPTION_CACHE maps _function_key(f) to _lambda_description(f). 

30# _function_key implements something close to "ast equality": 

31# two syntactically identical (minus whitespace etc) lambdas appearing in 

32# different files have the same key. Cache hits here provide a fast path which 

33# avoids ast-parsing syntactic lambdas we've seen before. Two lambdas with the 

34# same _function_key will not have different _lambda_descriptions - if 

35# they do, that's a bug here. 

36# * AST_LAMBDAS_CACHE maps source code lines to a list of the lambdas found in 

37# that source code. A cache hit here avoids reparsing the ast. 

38LAMBDA_DESCRIPTION_CACHE: MutableMapping[Callable, str] = WeakKeyDictionary() 

39LAMBDA_DIGEST_DESCRIPTION_CACHE: LRUCache[tuple[Any], str] = LRUCache(max_size=1000) 

40AST_LAMBDAS_CACHE: LRUCache[tuple[str], list[ast.Lambda]] = LRUCache(max_size=100) 

41 

42 

43def extract_all_lambdas(tree): 

44 lambdas = [] 

45 

46 class Visitor(ast.NodeVisitor): 

47 

48 def visit_Lambda(self, node): 

49 lambdas.append(node) 

50 self.visit(node.body) 

51 

52 Visitor().visit(tree) 

53 return lambdas 

54 

55 

56def extract_all_attributes(tree): 

57 attributes = [] 

58 

59 class Visitor(ast.NodeVisitor): 

60 def visit_Attribute(self, node): 

61 attributes.append(node) 

62 self.visit(node.value) 

63 

64 Visitor().visit(tree) 

65 return attributes 

66 

67 

68def _function_key(f, *, bounded_size=False, ignore_name=False): 

69 """Returns a digest that differentiates functions that have different sources. 

70 

71 Either a function or a code object may be passed. If code object, default 

72 arg/kwarg values are not recoverable - this is the best we can do, and is 

73 sufficient for the use case of comparing nested lambdas. 

74 """ 

75 try: 

76 code = f.__code__ 

77 defaults_repr = repr((f.__defaults__, f.__kwdefaults__)) 

78 except AttributeError: 

79 code = f 

80 defaults_repr = () 

81 consts_repr = repr(code.co_consts) 

82 if bounded_size: 

83 # Compress repr to avoid keeping arbitrarily large strings pinned as cache 

84 # keys. We don't do this unconditionally because hashing takes time, and is 

85 # not necessary if the key is used just for comparison (and is not stored). 

86 if len(consts_repr) > 48: 

87 consts_repr = hashlib.sha384(consts_repr.encode()).digest() 

88 if len(defaults_repr) > 48: 

89 defaults_repr = hashlib.sha384(defaults_repr.encode()).digest() 

90 return ( 

91 consts_repr, 

92 defaults_repr, 

93 code.co_argcount, 

94 code.co_kwonlyargcount, 

95 code.co_code, 

96 code.co_names, 

97 code.co_varnames, 

98 code.co_freevars, 

99 ignore_name or code.co_name, 

100 ) 

101 

102 

103class _op: 

104 # Opcodes, from dis.opmap. These may change between major versions. 

105 NOP = 9 

106 LOAD_FAST = 85 

107 LOAD_FAST_LOAD_FAST = 88 

108 LOAD_FAST_BORROW = 86 

109 LOAD_FAST_BORROW_LOAD_FAST_BORROW = 87 

110 

111 

112def _normalize_code(f, l): 

113 # A small selection of possible peephole code transformations, based on what 

114 # is actually seen to differ between compilations in our test suite. Each 

115 # entry contains two equivalent opcode sequences, plus a condition 

116 # function called with their respective oparg sequences, which must return 

117 # true for the transformation to be valid. 

118 Checker = Callable[[list[int], list[int]], bool] 

119 transforms: tuple[list[int], list[int], Checker | None] = [ 

120 ([_op.NOP], [], lambda a, b: True), 

121 ( 

122 [_op.LOAD_FAST, _op.LOAD_FAST], 

123 [_op.LOAD_FAST_LOAD_FAST], 

124 lambda a, b: a == [b[0] >> 4, b[0] & 15], 

125 ), 

126 ( 

127 [_op.LOAD_FAST_BORROW, _op.LOAD_FAST_BORROW], 

128 [_op.LOAD_FAST_BORROW_LOAD_FAST_BORROW], 

129 lambda a, b: a == [b[0] >> 4, b[0] & 15], 

130 ), 

131 ] 

132 # augment with converse 

133 transforms += [ 

134 ( 

135 ops_b, 

136 ops_a, 

137 condition and (lambda a, b, condition=condition: condition(b, a)), 

138 ) 

139 for ops_a, ops_b, condition in transforms 

140 ] 

141 

142 # Normalize equivalent code. We assume that each bytecode op is 2 bytes, 

143 # which is the case since Python 3.6. Since the opcodes values may change 

144 # between version, there is a risk that a transform may not be equivalent 

145 # -- even so, the risk of a bad transform producing a false positive is 

146 # minuscule. 

147 co_code = list(l.__code__.co_code) 

148 f_code = list(f.__code__.co_code) 

149 

150 def alternating(code, i, n): 

151 return code[i : i + 2 * n : 2] 

152 

153 i = 2 

154 while i < max(len(co_code), len(f_code)): 

155 # note that co_code is mutated in loop 

156 if i < min(len(co_code), len(f_code)) and f_code[i] == co_code[i]: 

157 i += 2 

158 else: 

159 for op1, op2, condition in transforms: 

160 if ( 

161 op1 == alternating(f_code, i, len(op1)) 

162 and op2 == alternating(co_code, i, len(op2)) 

163 and condition( 

164 alternating(f_code, i + 1, len(op1)), 

165 alternating(co_code, i + 1, len(op2)), 

166 ) 

167 ): 

168 break 

169 else: 

170 # no point in continuing since the bytecodes are different anyway 

171 break 

172 # Splice in the transform and continue 

173 co_code = ( 

174 co_code[:i] + f_code[i : i + 2 * len(op1)] + co_code[i + 2 * len(op2) :] 

175 ) 

176 i += 2 * len(op1) 

177 

178 # Normalize consts, in particular replace any lambda consts with the 

179 # corresponding const from the template function, IFF they have the same 

180 # source key. 

181 

182 f_consts = f.__code__.co_consts 

183 l_consts = l.__code__.co_consts 

184 if len(f_consts) == len(l_consts) and any( 

185 inspect.iscode(l_const) for l_const in l_consts 

186 ): 

187 normalized_consts = [] 

188 for f_const, l_const in zip(f_consts, l_consts, strict=True): 

189 if ( 

190 inspect.iscode(l_const) 

191 and inspect.iscode(f_const) 

192 and _function_key(f_const) == _function_key(l_const) 

193 ): 

194 # If the lambdas are compiled from the same source, make them be the 

195 # same object so that the toplevel lambdas end up equal. Note that 

196 # default arguments are not available on the code objects. But if the 

197 # default arguments differ then the lambdas must also differ in other 

198 # ways, since default arguments are set up from bytecode and constants. 

199 # I.e., this appears to be safe wrt false positives. 

200 normalized_consts.append(f_const) 

201 else: 

202 normalized_consts.append(l_const) 

203 else: 

204 normalized_consts = l_consts 

205 

206 return l.__code__.replace( 

207 co_code=bytes(co_code), 

208 co_consts=tuple(normalized_consts), 

209 ) 

210 

211 

212_module_map: dict[int, str] = {} 

213 

214 

215def _mimic_lambda_from_node(f, node): 

216 # Compile the source (represented by an ast.Lambda node) in a context that 

217 # as far as possible mimics the context that f was compiled in. If - and 

218 # only if - this was the source of f then the result is indistinguishable 

219 # from f itself (to a casual observer such as _function_key). 

220 f_globals = f.__globals__.copy() 

221 f_code = f.__code__ 

222 source = ast.unparse(node) 

223 

224 # Install values for non-literal argument defaults. Thankfully, these are 

225 # always captured by value - so there is no interaction with the closure. 

226 if f.__defaults__: 

227 for f_default, l_default in zip( 

228 f.__defaults__, node.args.defaults, strict=True 

229 ): 

230 if isinstance(l_default, ast.Name): 

231 f_globals[l_default.id] = f_default 

232 if f.__kwdefaults__: # pragma: no cover 

233 for l_default, l_varname in zip( 

234 node.args.kw_defaults, node.args.kwonlyargs, strict=True 

235 ): 

236 if isinstance(l_default, ast.Name): 

237 f_globals[l_default.id] = f.__kwdefaults__[l_varname.arg] 

238 

239 # CPython's compiler treats known imports differently than normal globals, 

240 # so check if we use attributes from globals that are modules (if so, we 

241 # import them explicitly and redundantly in the exec below) 

242 referenced_modules = [ 

243 (local_name, module) 

244 for attr in extract_all_attributes(node) 

245 if ( 

246 isinstance(attr.value, ast.Name) 

247 and (local_name := attr.value.id) 

248 and inspect.ismodule(module := f_globals.get(local_name)) 

249 ) 

250 ] 

251 

252 if not f_code.co_freevars and not referenced_modules: 

253 compiled = eval(source, f_globals) 

254 else: 

255 if f_code.co_freevars: 

256 # We have to reconstruct a local closure. The closure will have 

257 # the same values as the original function, although this is not 

258 # required for source/bytecode equality. 

259 f_globals |= { 

260 f"__lc{i}": c.cell_contents for i, c in enumerate(f.__closure__) 

261 } 

262 captures = [f"{name}=__lc{i}" for i, name in enumerate(f_code.co_freevars)] 

263 capture_str = ";".join(captures) + ";" 

264 else: 

265 capture_str = "" 

266 if referenced_modules: 

267 # We add import statements for all referenced modules, since that 

268 # influences the compiled code. The assumption is that these modules 

269 # were explicitly imported, not assigned, in the source - if not, 

270 # this may/will give a different compilation result. 

271 global _module_map 

272 if len(_module_map) != len(sys.modules): # pragma: no branch 

273 _module_map = {id(module): name for name, module in sys.modules.items()} 

274 imports = [ 

275 (module_name, local_name) 

276 for local_name, module in referenced_modules 

277 if (module_name := _module_map.get(id(module))) is not None 

278 ] 

279 import_fragments = [f"{name} as {asname}" for name, asname in set(imports)] 

280 import_str = f"import {','.join(import_fragments)}\n" 

281 else: 

282 import_str = "" 

283 exec_str = ( 

284 f"{import_str}def __construct_lambda(): {capture_str} return ({source})" 

285 ) 

286 exec(exec_str, f_globals) 

287 compiled = f_globals["__construct_lambda"]() 

288 

289 return compiled 

290 

291 

292def _lambda_code_matches_node(f, node): 

293 try: 

294 compiled = _mimic_lambda_from_node(f, node) 

295 except (NameError, SyntaxError): # pragma: no cover # source is generated from ast 

296 return False 

297 if _function_key(f) == _function_key(compiled): 

298 return True 

299 # Try harder 

300 compiled.__code__ = _normalize_code(f, compiled) 

301 return _function_key(f) == _function_key(compiled) 

302 

303 

304def _check_unknown_perfectly_aligned_lambda(candidate): 

305 # This is a monkeypatch point for our self-tests, to make unknown 

306 # lambdas raise. 

307 pass 

308 

309 

310def _lambda_description(f, leeway=50, *, fail_if_confused_with_perfect_candidate=False): 

311 if hasattr(f, "__wrapped_target"): 

312 f = f.__wrapped_target 

313 

314 # You might be wondering how a lambda can have a return-type annotation? 

315 # The answer is that we add this at runtime, in new_given_signature(), 

316 # and we do support strange choices as applying @given() to a lambda. 

317 sig = inspect.signature(f) 

318 assert sig.return_annotation in (Parameter.empty, None), sig 

319 

320 # Using pytest-xdist on Python 3.13, there's an entry in the linecache for 

321 # file "<string>", which then returns nonsense to getsource. Discard it. 

322 linecache.cache.pop("<string>", None) 

323 

324 def format_lambda(body): 

325 # The signature is more informative than the corresponding ast.unparse 

326 # output in the case of default argument values, so add the signature 

327 # to the unparsed body 

328 return ( 

329 f"lambda {str(sig)[1:-1]}: {body}" if sig.parameters else f"lambda: {body}" 

330 ) 

331 

332 if_confused = format_lambda("<unknown>") 

333 

334 try: 

335 source_lines, lineno0 = inspect.findsource(f) 

336 source_lines = tuple(source_lines) # make it hashable 

337 except OSError: 

338 return if_confused 

339 

340 try: 

341 all_lambdas = AST_LAMBDAS_CACHE[source_lines] 

342 except KeyError: 

343 # The source isn't already parsed, so we try to shortcut by parsing just 

344 # the local block. If that fails to produce a code-identical lambda, 

345 # fall through to the full parse. 

346 local_lines = inspect.getblock(source_lines[lineno0:]) 

347 local_block = textwrap.dedent("".join(local_lines)) 

348 # The fairly common ".map(lambda x: ...)" case. This partial block 

349 # isn't valid syntax, but it might be if we remove the leading ".". 

350 local_block = local_block.removeprefix(".") 

351 

352 try: 

353 local_tree = ast.parse(local_block) 

354 except SyntaxError: 

355 pass 

356 else: 

357 local_lambdas = extract_all_lambdas(local_tree) 

358 for candidate in local_lambdas: 

359 if reflection.ast_arguments_matches_signature( 

360 candidate.args, sig 

361 ) and _lambda_code_matches_node(f, candidate): 

362 return format_lambda(ast.unparse(candidate.body)) 

363 

364 # Local parse failed or didn't produce a match, go ahead with the full parse 

365 try: 

366 tree = ast.parse("".join(source_lines)) 

367 except SyntaxError: 

368 all_lambdas = [] 

369 else: 

370 all_lambdas = extract_all_lambdas(tree) 

371 AST_LAMBDAS_CACHE[source_lines] = all_lambdas 

372 

373 aligned_lambdas = [] 

374 for candidate in all_lambdas: 

375 if ( 

376 candidate.lineno - leeway <= lineno0 + 1 <= candidate.lineno + leeway 

377 and reflection.ast_arguments_matches_signature(candidate.args, sig) 

378 ): 

379 aligned_lambdas.append(candidate) 

380 

381 aligned_lambdas.sort(key=lambda c: abs(lineno0 + 1 - c.lineno)) 

382 for candidate in aligned_lambdas: 

383 if _lambda_code_matches_node(f, candidate): 

384 return format_lambda(ast.unparse(candidate.body)) 

385 

386 # None of the aligned lambdas match perfectly in generated code. 

387 if aligned_lambdas and aligned_lambdas[0].lineno == lineno0 + 1: 

388 _check_unknown_perfectly_aligned_lambda(aligned_lambdas[0]) 

389 

390 return if_confused 

391 

392 

393def lambda_description(f): 

394 """ 

395 Returns a syntactically-valid expression describing `f`. This is often, but 

396 not always, the exact lambda definition string which appears in the source code. 

397 The difference comes from parsing the lambda ast into `tree` and then returning 

398 the result of `ast.unparse(tree)`, which may differ in whitespace, double vs 

399 single quotes, etc. 

400 

401 Returns a string indicating an unknown body if the parsing gets confused in any way. 

402 """ 

403 try: 

404 return LAMBDA_DESCRIPTION_CACHE[f] 

405 except KeyError: 

406 pass 

407 

408 key = _function_key(f, bounded_size=True) 

409 location = (f.__code__.co_filename, f.__code__.co_firstlineno) 

410 try: 

411 description, failed_locations = LAMBDA_DIGEST_DESCRIPTION_CACHE[key] 

412 except KeyError: 

413 failed_locations = set() 

414 else: 

415 # We got a hit in the digests cache, but only use it if either it has 

416 # a good (known) description, or if it is unknown but we already tried 

417 # to parse its exact source location before. 

418 if "<unknown>" not in description or location in failed_locations: 

419 # use the cached result 

420 LAMBDA_DESCRIPTION_CACHE[f] = description 

421 return description 

422 

423 description = _lambda_description(f) 

424 LAMBDA_DESCRIPTION_CACHE[f] = description 

425 if "<unknown>" in description: 

426 failed_locations.add(location) 

427 else: 

428 failed_locations.clear() # we have a good description now 

429 LAMBDA_DIGEST_DESCRIPTION_CACHE[key] = description, failed_locations 

430 return description