1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import ast
12import hashlib
13import inspect
14import linecache
15import sys
16import textwrap
17from collections.abc import Callable, MutableMapping
18from inspect import Parameter
19from typing import Any
20from weakref import WeakKeyDictionary
21
22from hypothesis.internal import reflection
23from hypothesis.internal.cache import LRUCache
24
25# we have several levels of caching for lambda descriptions.
26# * LAMBDA_DESCRIPTION_CACHE maps a lambda f to its description _lambda_description(f).
27# Note that _lambda_description(f) may not be identical to f as it appears in the
28# source code file.
29# * LAMBDA_DIGEST_DESCRIPTION_CACHE maps _function_key(f) to _lambda_description(f).
30# _function_key implements something close to "ast equality":
31# two syntactically identical (minus whitespace etc) lambdas appearing in
32# different files have the same key. Cache hits here provide a fast path which
33# avoids ast-parsing syntactic lambdas we've seen before. Two lambdas with the
34# same _function_key will not have different _lambda_descriptions - if
35# they do, that's a bug here.
36# * AST_LAMBDAS_CACHE maps source code lines to a list of the lambdas found in
37# that source code. A cache hit here avoids reparsing the ast.
38LAMBDA_DESCRIPTION_CACHE: MutableMapping[Callable, str] = WeakKeyDictionary()
39LAMBDA_DIGEST_DESCRIPTION_CACHE: LRUCache[tuple[Any], str] = LRUCache(max_size=1000)
40AST_LAMBDAS_CACHE: LRUCache[tuple[str], list[ast.Lambda]] = LRUCache(max_size=100)
41
42
43def extract_all_lambdas(tree):
44 lambdas = []
45
46 class Visitor(ast.NodeVisitor):
47
48 def visit_Lambda(self, node):
49 lambdas.append(node)
50 self.visit(node.body)
51
52 Visitor().visit(tree)
53 return lambdas
54
55
56def extract_all_attributes(tree):
57 attributes = []
58
59 class Visitor(ast.NodeVisitor):
60 def visit_Attribute(self, node):
61 attributes.append(node)
62 self.visit(node.value)
63
64 Visitor().visit(tree)
65 return attributes
66
67
68def _function_key(f, *, bounded_size=False, ignore_name=False):
69 """Returns a digest that differentiates functions that have different sources.
70
71 Either a function or a code object may be passed. If code object, default
72 arg/kwarg values are not recoverable - this is the best we can do, and is
73 sufficient for the use case of comparing nested lambdas.
74 """
75 try:
76 code = f.__code__
77 defaults_repr = repr((f.__defaults__, f.__kwdefaults__))
78 except AttributeError:
79 code = f
80 defaults_repr = ()
81 consts_repr = repr(code.co_consts)
82 if bounded_size:
83 # Compress repr to avoid keeping arbitrarily large strings pinned as cache
84 # keys. We don't do this unconditionally because hashing takes time, and is
85 # not necessary if the key is used just for comparison (and is not stored).
86 if len(consts_repr) > 48:
87 consts_repr = hashlib.sha384(consts_repr.encode()).digest()
88 if len(defaults_repr) > 48:
89 defaults_repr = hashlib.sha384(defaults_repr.encode()).digest()
90 return (
91 consts_repr,
92 defaults_repr,
93 code.co_argcount,
94 code.co_kwonlyargcount,
95 code.co_code,
96 code.co_names,
97 code.co_varnames,
98 code.co_freevars,
99 ignore_name or code.co_name,
100 )
101
102
103class _op:
104 # Opcodes, from dis.opmap. These may change between major versions.
105 NOP = 9
106 LOAD_FAST = 85
107 LOAD_FAST_LOAD_FAST = 88
108 LOAD_FAST_BORROW = 86
109 LOAD_FAST_BORROW_LOAD_FAST_BORROW = 87
110
111
112def _normalize_code(f, l):
113 # A small selection of possible peephole code transformations, based on what
114 # is actually seen to differ between compilations in our test suite. Each
115 # entry contains two equivalent opcode sequences, plus a condition
116 # function called with their respective oparg sequences, which must return
117 # true for the transformation to be valid.
118 Checker = Callable[[list[int], list[int]], bool]
119 transforms: tuple[list[int], list[int], Checker | None] = [
120 ([_op.NOP], [], lambda a, b: True),
121 (
122 [_op.LOAD_FAST, _op.LOAD_FAST],
123 [_op.LOAD_FAST_LOAD_FAST],
124 lambda a, b: a == [b[0] >> 4, b[0] & 15],
125 ),
126 (
127 [_op.LOAD_FAST_BORROW, _op.LOAD_FAST_BORROW],
128 [_op.LOAD_FAST_BORROW_LOAD_FAST_BORROW],
129 lambda a, b: a == [b[0] >> 4, b[0] & 15],
130 ),
131 ]
132 # augment with converse
133 transforms += [
134 (
135 ops_b,
136 ops_a,
137 condition and (lambda a, b, condition=condition: condition(b, a)),
138 )
139 for ops_a, ops_b, condition in transforms
140 ]
141
142 # Normalize equivalent code. We assume that each bytecode op is 2 bytes,
143 # which is the case since Python 3.6. Since the opcodes values may change
144 # between version, there is a risk that a transform may not be equivalent
145 # -- even so, the risk of a bad transform producing a false positive is
146 # minuscule.
147 co_code = list(l.__code__.co_code)
148 f_code = list(f.__code__.co_code)
149
150 def alternating(code, i, n):
151 return code[i : i + 2 * n : 2]
152
153 i = 2
154 while i < max(len(co_code), len(f_code)):
155 # note that co_code is mutated in loop
156 if i < min(len(co_code), len(f_code)) and f_code[i] == co_code[i]:
157 i += 2
158 else:
159 for op1, op2, condition in transforms:
160 if (
161 op1 == alternating(f_code, i, len(op1))
162 and op2 == alternating(co_code, i, len(op2))
163 and condition(
164 alternating(f_code, i + 1, len(op1)),
165 alternating(co_code, i + 1, len(op2)),
166 )
167 ):
168 break
169 else:
170 # no point in continuing since the bytecodes are different anyway
171 break
172 # Splice in the transform and continue
173 co_code = (
174 co_code[:i] + f_code[i : i + 2 * len(op1)] + co_code[i + 2 * len(op2) :]
175 )
176 i += 2 * len(op1)
177
178 # Normalize consts, in particular replace any lambda consts with the
179 # corresponding const from the template function, IFF they have the same
180 # source key.
181
182 f_consts = f.__code__.co_consts
183 l_consts = l.__code__.co_consts
184 if len(f_consts) == len(l_consts) and any(
185 inspect.iscode(l_const) for l_const in l_consts
186 ):
187 normalized_consts = []
188 for f_const, l_const in zip(f_consts, l_consts, strict=True):
189 if (
190 inspect.iscode(l_const)
191 and inspect.iscode(f_const)
192 and _function_key(f_const) == _function_key(l_const)
193 ):
194 # If the lambdas are compiled from the same source, make them be the
195 # same object so that the toplevel lambdas end up equal. Note that
196 # default arguments are not available on the code objects. But if the
197 # default arguments differ then the lambdas must also differ in other
198 # ways, since default arguments are set up from bytecode and constants.
199 # I.e., this appears to be safe wrt false positives.
200 normalized_consts.append(f_const)
201 else:
202 normalized_consts.append(l_const)
203 else:
204 normalized_consts = l_consts
205
206 return l.__code__.replace(
207 co_code=bytes(co_code),
208 co_consts=tuple(normalized_consts),
209 )
210
211
212_module_map: dict[int, str] = {}
213
214
215def _mimic_lambda_from_node(f, node):
216 # Compile the source (represented by an ast.Lambda node) in a context that
217 # as far as possible mimics the context that f was compiled in. If - and
218 # only if - this was the source of f then the result is indistinguishable
219 # from f itself (to a casual observer such as _function_key).
220 f_globals = f.__globals__.copy()
221 f_code = f.__code__
222 source = ast.unparse(node)
223
224 # Install values for non-literal argument defaults. Thankfully, these are
225 # always captured by value - so there is no interaction with the closure.
226 if f.__defaults__:
227 for f_default, l_default in zip(
228 f.__defaults__, node.args.defaults, strict=True
229 ):
230 if isinstance(l_default, ast.Name):
231 f_globals[l_default.id] = f_default
232 if f.__kwdefaults__: # pragma: no cover
233 for l_default, l_varname in zip(
234 node.args.kw_defaults, node.args.kwonlyargs, strict=True
235 ):
236 if isinstance(l_default, ast.Name):
237 f_globals[l_default.id] = f.__kwdefaults__[l_varname.arg]
238
239 # CPython's compiler treats known imports differently than normal globals,
240 # so check if we use attributes from globals that are modules (if so, we
241 # import them explicitly and redundantly in the exec below)
242 referenced_modules = [
243 (local_name, module)
244 for attr in extract_all_attributes(node)
245 if (
246 isinstance(attr.value, ast.Name)
247 and (local_name := attr.value.id)
248 and inspect.ismodule(module := f_globals.get(local_name))
249 )
250 ]
251
252 if not f_code.co_freevars and not referenced_modules:
253 compiled = eval(source, f_globals)
254 else:
255 if f_code.co_freevars:
256 # We have to reconstruct a local closure. The closure will have
257 # the same values as the original function, although this is not
258 # required for source/bytecode equality.
259 f_globals |= {
260 f"__lc{i}": c.cell_contents for i, c in enumerate(f.__closure__)
261 }
262 captures = [f"{name}=__lc{i}" for i, name in enumerate(f_code.co_freevars)]
263 capture_str = ";".join(captures) + ";"
264 else:
265 capture_str = ""
266 if referenced_modules:
267 # We add import statements for all referenced modules, since that
268 # influences the compiled code. The assumption is that these modules
269 # were explicitly imported, not assigned, in the source - if not,
270 # this may/will give a different compilation result.
271 global _module_map
272 if len(_module_map) != len(sys.modules): # pragma: no branch
273 _module_map = {id(module): name for name, module in sys.modules.items()}
274 imports = [
275 (module_name, local_name)
276 for local_name, module in referenced_modules
277 if (module_name := _module_map.get(id(module))) is not None
278 ]
279 import_fragments = [f"{name} as {asname}" for name, asname in set(imports)]
280 import_str = f"import {','.join(import_fragments)}\n"
281 else:
282 import_str = ""
283 exec_str = (
284 f"{import_str}def __construct_lambda(): {capture_str} return ({source})"
285 )
286 exec(exec_str, f_globals)
287 compiled = f_globals["__construct_lambda"]()
288
289 return compiled
290
291
292def _lambda_code_matches_node(f, node):
293 try:
294 compiled = _mimic_lambda_from_node(f, node)
295 except (NameError, SyntaxError): # pragma: no cover # source is generated from ast
296 return False
297 if _function_key(f) == _function_key(compiled):
298 return True
299 # Try harder
300 compiled.__code__ = _normalize_code(f, compiled)
301 return _function_key(f) == _function_key(compiled)
302
303
304def _check_unknown_perfectly_aligned_lambda(candidate):
305 # This is a monkeypatch point for our self-tests, to make unknown
306 # lambdas raise.
307 pass
308
309
310def _lambda_description(f, leeway=50, *, fail_if_confused_with_perfect_candidate=False):
311 if hasattr(f, "__wrapped_target"):
312 f = f.__wrapped_target
313
314 # You might be wondering how a lambda can have a return-type annotation?
315 # The answer is that we add this at runtime, in new_given_signature(),
316 # and we do support strange choices as applying @given() to a lambda.
317 sig = inspect.signature(f)
318 assert sig.return_annotation in (Parameter.empty, None), sig
319
320 # Using pytest-xdist on Python 3.13, there's an entry in the linecache for
321 # file "<string>", which then returns nonsense to getsource. Discard it.
322 linecache.cache.pop("<string>", None)
323
324 def format_lambda(body):
325 # The signature is more informative than the corresponding ast.unparse
326 # output in the case of default argument values, so add the signature
327 # to the unparsed body
328 return (
329 f"lambda {str(sig)[1:-1]}: {body}" if sig.parameters else f"lambda: {body}"
330 )
331
332 if_confused = format_lambda("<unknown>")
333
334 try:
335 source_lines, lineno0 = inspect.findsource(f)
336 source_lines = tuple(source_lines) # make it hashable
337 except OSError:
338 return if_confused
339
340 try:
341 all_lambdas = AST_LAMBDAS_CACHE[source_lines]
342 except KeyError:
343 # The source isn't already parsed, so we try to shortcut by parsing just
344 # the local block. If that fails to produce a code-identical lambda,
345 # fall through to the full parse.
346 local_lines = inspect.getblock(source_lines[lineno0:])
347 local_block = textwrap.dedent("".join(local_lines))
348 # The fairly common ".map(lambda x: ...)" case. This partial block
349 # isn't valid syntax, but it might be if we remove the leading ".".
350 local_block = local_block.removeprefix(".")
351
352 try:
353 local_tree = ast.parse(local_block)
354 except SyntaxError:
355 pass
356 else:
357 local_lambdas = extract_all_lambdas(local_tree)
358 for candidate in local_lambdas:
359 if reflection.ast_arguments_matches_signature(
360 candidate.args, sig
361 ) and _lambda_code_matches_node(f, candidate):
362 return format_lambda(ast.unparse(candidate.body))
363
364 # Local parse failed or didn't produce a match, go ahead with the full parse
365 try:
366 tree = ast.parse("".join(source_lines))
367 except SyntaxError:
368 all_lambdas = []
369 else:
370 all_lambdas = extract_all_lambdas(tree)
371 AST_LAMBDAS_CACHE[source_lines] = all_lambdas
372
373 aligned_lambdas = []
374 for candidate in all_lambdas:
375 if (
376 candidate.lineno - leeway <= lineno0 + 1 <= candidate.lineno + leeway
377 and reflection.ast_arguments_matches_signature(candidate.args, sig)
378 ):
379 aligned_lambdas.append(candidate)
380
381 aligned_lambdas.sort(key=lambda c: abs(lineno0 + 1 - c.lineno))
382 for candidate in aligned_lambdas:
383 if _lambda_code_matches_node(f, candidate):
384 return format_lambda(ast.unparse(candidate.body))
385
386 # None of the aligned lambdas match perfectly in generated code.
387 if aligned_lambdas and aligned_lambdas[0].lineno == lineno0 + 1:
388 _check_unknown_perfectly_aligned_lambda(aligned_lambdas[0])
389
390 return if_confused
391
392
393def lambda_description(f):
394 """
395 Returns a syntactically-valid expression describing `f`. This is often, but
396 not always, the exact lambda definition string which appears in the source code.
397 The difference comes from parsing the lambda ast into `tree` and then returning
398 the result of `ast.unparse(tree)`, which may differ in whitespace, double vs
399 single quotes, etc.
400
401 Returns a string indicating an unknown body if the parsing gets confused in any way.
402 """
403 try:
404 return LAMBDA_DESCRIPTION_CACHE[f]
405 except KeyError:
406 pass
407
408 key = _function_key(f, bounded_size=True)
409 location = (f.__code__.co_filename, f.__code__.co_firstlineno)
410 try:
411 description, failed_locations = LAMBDA_DIGEST_DESCRIPTION_CACHE[key]
412 except KeyError:
413 failed_locations = set()
414 else:
415 # We got a hit in the digests cache, but only use it if either it has
416 # a good (known) description, or if it is unknown but we already tried
417 # to parse its exact source location before.
418 if "<unknown>" not in description or location in failed_locations:
419 # use the cached result
420 LAMBDA_DESCRIPTION_CACHE[f] = description
421 return description
422
423 description = _lambda_description(f)
424 LAMBDA_DESCRIPTION_CACHE[f] = description
425 if "<unknown>" in description:
426 failed_locations.add(location)
427 else:
428 failed_locations.clear() # we have a good description now
429 LAMBDA_DIGEST_DESCRIPTION_CACHE[key] = description, failed_locations
430 return description