1# This file is part of Hypothesis, which may be found at
2# https://github.com/HypothesisWorks/hypothesis/
3#
4# Copyright the Hypothesis Authors.
5# Individual contributors are listed in AUTHORS.rst and the git log.
6#
7# This Source Code Form is subject to the terms of the Mozilla Public License,
8# v. 2.0. If a copy of the MPL was not distributed with this file, You can
9# obtain one at https://mozilla.org/MPL/2.0/.
10
11import ast
12import hashlib
13import inspect
14import linecache
15import sys
16import textwrap
17from collections.abc import MutableMapping
18from inspect import Parameter
19from typing import Any, Callable, Optional
20from weakref import WeakKeyDictionary
21
22from hypothesis.internal import reflection
23from hypothesis.internal.cache import LRUCache
24
25# we have several levels of caching for lambda descriptions.
26# * LAMBDA_DESCRIPTION_CACHE maps a lambda f to its description _lambda_description(f).
27# Note that _lambda_description(f) may not be identical to f as it appears in the
28# source code file.
29# * LAMBDA_DIGEST_DESCRIPTION_CACHE maps _function_key(f) to _lambda_description(f).
30# _function_key implements something close to "ast equality":
31# two syntactically identical (minus whitespace etc) lambdas appearing in
32# different files have the same key. Cache hits here provide a fast path which
33# avoids ast-parsing syntactic lambdas we've seen before. Two lambdas with the
34# same _function_key will not have different _lambda_descriptions - if
35# they do, that's a bug here.
36# * AST_LAMBDAS_CACHE maps source code lines to a list of the lambdas found in
37# that source code. A cache hit here avoids reparsing the ast.
38LAMBDA_DESCRIPTION_CACHE: MutableMapping[Callable, str] = WeakKeyDictionary()
39LAMBDA_DIGEST_DESCRIPTION_CACHE: LRUCache[tuple[Any], str] = LRUCache(max_size=1000)
40AST_LAMBDAS_CACHE: LRUCache[tuple[str], list[ast.Lambda]] = LRUCache(max_size=100)
41
42
43def extract_all_lambdas(tree):
44 lambdas = []
45
46 class Visitor(ast.NodeVisitor):
47
48 def visit_Lambda(self, node):
49 lambdas.append(node)
50 self.visit(node.body)
51
52 Visitor().visit(tree)
53 return lambdas
54
55
56def extract_all_attributes(tree):
57 attributes = []
58
59 class Visitor(ast.NodeVisitor):
60 def visit_Attribute(self, node):
61 attributes.append(node)
62 self.visit(node.value)
63
64 Visitor().visit(tree)
65 return attributes
66
67
68def _function_key(f, *, bounded_size=False):
69 """Returns a digest that differentiates functions that have different sources.
70
71 Either a function or a code object may be passed. If code object, default
72 arg/kwarg values are not recoverable - this is the best we can do, and is
73 sufficient for the use case of comparing nested lambdas.
74 """
75 try:
76 code = f.__code__
77 defaults_repr = repr((f.__defaults__, f.__kwdefaults__))
78 except AttributeError:
79 code = f
80 defaults_repr = ()
81 consts_repr = repr(code.co_consts)
82 if bounded_size:
83 # Compress repr to avoid keeping arbitrarily large strings pinned as cache
84 # keys. We don't do this unconditionally because hashing takes time, and is
85 # not necessary if the key is used just for comparison (and is not stored).
86 if len(consts_repr) > 48:
87 consts_repr = hashlib.sha384(consts_repr.encode()).digest()
88 if len(defaults_repr) > 48:
89 defaults_repr = hashlib.sha384(defaults_repr.encode()).digest()
90 return (
91 consts_repr,
92 defaults_repr,
93 code.co_argcount,
94 code.co_kwonlyargcount,
95 code.co_code,
96 code.co_names,
97 code.co_varnames,
98 code.co_freevars,
99 code.co_name,
100 )
101
102
103class _op:
104 # Opcodes, from dis.opmap. These may change between major versions.
105 NOP = 9
106 LOAD_FAST = 85
107 LOAD_FAST_LOAD_FAST = 88
108 LOAD_FAST_BORROW = 86
109 LOAD_FAST_BORROW_LOAD_FAST_BORROW = 87
110
111
112def _normalize_code(f, l):
113 # A small selection of possible peephole code transformations, based on what
114 # is actually seen to differ between compilations in our test suite. Each
115 # entry contains two equivalent opcode sequences, plus a condition
116 # function called with their respective oparg sequences, which must return
117 # true for the transformation to be valid.
118 Checker = Callable[[list[int], list[int]], bool]
119 transforms: tuple[list[int], list[int], Optional[Checker]] = [
120 ([_op.NOP], [], lambda a, b: True),
121 (
122 [_op.LOAD_FAST, _op.LOAD_FAST],
123 [_op.LOAD_FAST_LOAD_FAST],
124 lambda a, b: a == [b[0] >> 4, b[0] & 15],
125 ),
126 (
127 [_op.LOAD_FAST_BORROW, _op.LOAD_FAST_BORROW],
128 [_op.LOAD_FAST_BORROW_LOAD_FAST_BORROW],
129 lambda a, b: a == [b[0] >> 4, b[0] & 15],
130 ),
131 ]
132 # augment with converse
133 transforms += [
134 (
135 ops_b,
136 ops_a,
137 condition and (lambda a, b, condition=condition: condition(b, a)),
138 )
139 for ops_a, ops_b, condition in transforms
140 ]
141
142 # Normalize equivalent code. We assume that each bytecode op is 2 bytes,
143 # which is the case since Python 3.6. Since the opcodes values may change
144 # between version, there is a risk that a transform may not be equivalent
145 # -- even so, the risk of a bad transform producing a false positive is
146 # minuscule.
147 co_code = list(l.__code__.co_code)
148 f_code = list(f.__code__.co_code)
149
150 def alternating(code, i, n):
151 return code[i : i + 2 * n : 2]
152
153 i = 2
154 while i < max(len(co_code), len(f_code)):
155 # note that co_code is mutated in loop
156 if i < min(len(co_code), len(f_code)) and f_code[i] == co_code[i]:
157 i += 2
158 else:
159 for op1, op2, condition in transforms:
160 if (
161 op1 == alternating(f_code, i, len(op1))
162 and op2 == alternating(co_code, i, len(op2))
163 and condition(
164 alternating(f_code, i + 1, len(op1)),
165 alternating(co_code, i + 1, len(op2)),
166 )
167 ):
168 break
169 else:
170 # no point in continuing since the bytecodes are different anyway
171 break
172 # Splice in the transform and continue
173 co_code = (
174 co_code[:i] + f_code[i : i + 2 * len(op1)] + co_code[i + 2 * len(op2) :]
175 )
176 i += 2 * len(op1)
177
178 # Normalize consts, in particular replace any lambda consts with the
179 # corresponding const from the template function, IFF they have the same
180 # source key.
181
182 f_consts = f.__code__.co_consts
183 l_consts = l.__code__.co_consts
184 if len(f_consts) == len(l_consts) and any(
185 inspect.iscode(l_const) for l_const in l_consts
186 ):
187 normalized_consts = []
188 for f_const, l_const in zip(f_consts, l_consts):
189 if (
190 inspect.iscode(l_const)
191 and inspect.iscode(f_const)
192 and _function_key(f_const) == _function_key(l_const)
193 ):
194 # If the lambdas are compiled from the same source, make them be the
195 # same object so that the toplevel lambdas end up equal. Note that
196 # default arguments are not available on the code objects. But if the
197 # default arguments differ then the lambdas must also differ in other
198 # ways, since default arguments are set up from bytecode and constants.
199 # I.e., this appears to be safe wrt false positives.
200 normalized_consts.append(f_const)
201 else:
202 normalized_consts.append(l_const)
203 else:
204 normalized_consts = l_consts
205
206 return l.__code__.replace(
207 co_code=bytes(co_code),
208 co_consts=tuple(normalized_consts),
209 )
210
211
212_module_map: dict[int, str] = {}
213
214
215def _mimic_lambda_from_node(f, node):
216 # Compile the source (represented by an ast.Lambda node) in a context that
217 # as far as possible mimics the context that f was compiled in. If - and
218 # only if - this was the source of f then the result is indistinguishable
219 # from f itself (to a casual observer such as _function_key).
220 f_globals = f.__globals__.copy()
221 f_code = f.__code__
222 source = ast.unparse(node)
223
224 # Install values for non-literal argument defaults. Thankfully, these are
225 # always captured by value - so there is no interaction with the closure.
226 if f.__defaults__:
227 for f_default, l_default in zip(f.__defaults__, node.args.defaults):
228 if isinstance(l_default, ast.Name):
229 f_globals[l_default.id] = f_default
230 if f.__kwdefaults__: # pragma: no cover
231 for l_default, l_varname in zip(node.args.kw_defaults, node.args.kwonlyargs):
232 if isinstance(l_default, ast.Name):
233 f_globals[l_default.id] = f.__kwdefaults__[l_varname.arg]
234
235 # CPython's compiler treats known imports differently than normal globals,
236 # so check if we use attributes from globals that are modules (if so, we
237 # import them explicitly and redundantly in the exec below)
238 referenced_modules = [
239 (local_name, module)
240 for attr in extract_all_attributes(node)
241 if (
242 isinstance(attr.value, ast.Name)
243 and (local_name := attr.value.id)
244 and inspect.ismodule(module := f_globals.get(local_name))
245 )
246 ]
247
248 if not f_code.co_freevars and not referenced_modules:
249 compiled = eval(source, f_globals)
250 else:
251 if f_code.co_freevars:
252 # We have to reconstruct a local closure. The closure will have
253 # the same values as the original function, although this is not
254 # required for source/bytecode equality.
255 f_globals |= {
256 f"__lc{i}": c.cell_contents for i, c in enumerate(f.__closure__)
257 }
258 captures = [f"{name}=__lc{i}" for i, name in enumerate(f_code.co_freevars)]
259 capture_str = ";".join(captures) + ";"
260 else:
261 capture_str = ""
262 if referenced_modules:
263 # We add import statements for all referenced modules, since that
264 # influences the compiled code. The assumption is that these modules
265 # were explicitly imported, not assigned, in the source - if not,
266 # this may/will give a different compilation result.
267 global _module_map
268 if len(_module_map) != len(sys.modules): # pragma: no branch
269 _module_map = {id(module): name for name, module in sys.modules.items()}
270 imports = [
271 (module_name, local_name)
272 for local_name, module in referenced_modules
273 if (module_name := _module_map.get(id(module))) is not None
274 ]
275 import_fragments = [f"{name} as {asname}" for name, asname in set(imports)]
276 import_str = f"import {','.join(import_fragments)}\n"
277 else:
278 import_str = ""
279 exec_str = (
280 f"{import_str}def __construct_lambda(): {capture_str} return ({source})"
281 )
282 exec(exec_str, f_globals)
283 compiled = f_globals["__construct_lambda"]()
284
285 return compiled
286
287
288def _lambda_code_matches_node(f, node):
289 try:
290 compiled = _mimic_lambda_from_node(f, node)
291 except (NameError, SyntaxError): # pragma: no cover # source is generated from ast
292 return False
293 if _function_key(f) == _function_key(compiled):
294 return True
295 # Try harder
296 compiled.__code__ = _normalize_code(f, compiled)
297 return _function_key(f) == _function_key(compiled)
298
299
300def _check_unknown_perfectly_aligned_lambda(candidate):
301 # This is a monkeypatch point for our self-tests, to make unknown
302 # lambdas raise.
303 pass
304
305
306def _lambda_description(f, leeway=50, *, fail_if_confused_with_perfect_candidate=False):
307 if hasattr(f, "__wrapped_target"):
308 f = f.__wrapped_target
309
310 # You might be wondering how a lambda can have a return-type annotation?
311 # The answer is that we add this at runtime, in new_given_signature(),
312 # and we do support strange choices as applying @given() to a lambda.
313 sig = inspect.signature(f)
314 assert sig.return_annotation in (Parameter.empty, None), sig
315
316 # Using pytest-xdist on Python 3.13, there's an entry in the linecache for
317 # file "<string>", which then returns nonsense to getsource. Discard it.
318 linecache.cache.pop("<string>", None)
319
320 def format_lambda(body):
321 # The signature is more informative than the corresponding ast.unparse
322 # output in the case of default argument values, so add the signature
323 # to the unparsed body
324 return (
325 f"lambda {str(sig)[1:-1]}: {body}" if sig.parameters else f"lambda: {body}"
326 )
327
328 if_confused = format_lambda("<unknown>")
329
330 try:
331 source_lines, lineno0 = inspect.findsource(f)
332 source_lines = tuple(source_lines) # make it hashable
333 except OSError:
334 return if_confused
335
336 try:
337 all_lambdas = AST_LAMBDAS_CACHE[source_lines]
338 except KeyError:
339 # The source isn't already parsed, so we try to shortcut by parsing just
340 # the local block. If that fails to produce a code-identical lambda,
341 # fall through to the full parse.
342 local_lines = inspect.getblock(source_lines[lineno0:])
343 local_block = textwrap.dedent("".join(local_lines))
344 # The fairly common ".map(lambda x: ...)" case. This partial block
345 # isn't valid syntax, but it might be if we remove the leading ".".
346 local_block = local_block.removeprefix(".")
347
348 try:
349 local_tree = ast.parse(local_block)
350 except SyntaxError:
351 pass
352 else:
353 local_lambdas = extract_all_lambdas(local_tree)
354 for candidate in local_lambdas:
355 if reflection.ast_arguments_matches_signature(
356 candidate.args, sig
357 ) and _lambda_code_matches_node(f, candidate):
358 return format_lambda(ast.unparse(candidate.body))
359
360 # Local parse failed or didn't produce a match, go ahead with the full parse
361 try:
362 tree = ast.parse("".join(source_lines))
363 except SyntaxError:
364 all_lambdas = []
365 else:
366 all_lambdas = extract_all_lambdas(tree)
367 AST_LAMBDAS_CACHE[source_lines] = all_lambdas
368
369 aligned_lambdas = []
370 for candidate in all_lambdas:
371 if (
372 candidate.lineno - leeway <= lineno0 + 1 <= candidate.lineno + leeway
373 and reflection.ast_arguments_matches_signature(candidate.args, sig)
374 ):
375 aligned_lambdas.append(candidate)
376
377 aligned_lambdas.sort(key=lambda c: abs(lineno0 + 1 - c.lineno))
378 for candidate in aligned_lambdas:
379 if _lambda_code_matches_node(f, candidate):
380 return format_lambda(ast.unparse(candidate.body))
381
382 # None of the aligned lambdas match perfectly in generated code.
383 if aligned_lambdas and aligned_lambdas[0].lineno == lineno0 + 1:
384 _check_unknown_perfectly_aligned_lambda(aligned_lambdas[0])
385
386 return if_confused
387
388
389def lambda_description(f):
390 """
391 Returns a syntactically-valid expression describing `f`. This is often, but
392 not always, the exact lambda definition string which appears in the source code.
393 The difference comes from parsing the lambda ast into `tree` and then returning
394 the result of `ast.unparse(tree)`, which may differ in whitespace, double vs
395 single quotes, etc.
396
397 Returns a string indicating an unknown body if the parsing gets confused in any way.
398 """
399 try:
400 return LAMBDA_DESCRIPTION_CACHE[f]
401 except KeyError:
402 pass
403
404 key = _function_key(f, bounded_size=True)
405 location = (f.__code__.co_filename, f.__code__.co_firstlineno)
406 try:
407 description, failed_locations = LAMBDA_DIGEST_DESCRIPTION_CACHE[key]
408 except KeyError:
409 failed_locations = set()
410 else:
411 # We got a hit in the digests cache, but only use it if either it has
412 # a good (known) description, or if it is unknown but we already tried
413 # to parse its exact source location before.
414 if "<unknown>" not in description or location in failed_locations:
415 # use the cached result
416 LAMBDA_DESCRIPTION_CACHE[f] = description
417 return description
418
419 description = _lambda_description(f)
420 LAMBDA_DESCRIPTION_CACHE[f] = description
421 if "<unknown>" in description:
422 failed_locations.add(location)
423 else:
424 failed_locations.clear() # we have a good description now
425 LAMBDA_DIGEST_DESCRIPTION_CACHE[key] = description, failed_locations
426 return description