1"""Synchronization decorators and calling-convention markers/bridges.
2
3Provides ``synchronized`` for thread and async locking, ``mark_as_sync``
4and ``mark_as_async`` for declaring the effective calling convention of a
5wrapped callable (without converting it), and ``async_to_sync`` /
6``sync_to_async`` for bridging between the two.
7"""
8
9import asyncio
10import sys
11from functools import partial
12from inspect import (
13 CO_ASYNC_GENERATOR,
14 CO_COROUTINE,
15 CO_GENERATOR,
16 CO_ITERABLE_COROUTINE,
17 iscoroutinefunction,
18)
19from threading import Lock, RLock
20
21from .__wrapt__ import BoundFunctionWrapper, CallableObjectProxy, FunctionWrapper
22from .decorators import decorator
23
24# Calling-convention marker wrappers. These manipulate __code__.co_flags
25# so that inspect.iscoroutinefunction() reports the intended calling
26# convention, which lets stdlib code and the synchronized() decorator
27# auto-select the correct sync or async wrapping behaviour even when
28# stacked decorators change the effective convention (for example an
29# inner decorator that invokes an async def via asyncio.run()).
30
31
32class _SyncCodeProxy(CallableObjectProxy):
33
34 def __init__(self, wrapped, generator=None):
35 super().__init__(wrapped)
36 self._self_generator = generator
37
38 @property
39 def co_flags(self):
40 original = self.__wrapped__.co_flags
41 # Strip async-axis and iterable-coroutine bits; sync means neither
42 # coroutine function nor async generator nor types.coroutine-style.
43 flags = original & ~(CO_COROUTINE | CO_ASYNC_GENERATOR | CO_ITERABLE_COROUTINE)
44 if self._self_generator is True:
45 flags |= CO_GENERATOR
46 elif self._self_generator is False:
47 flags &= ~CO_GENERATOR
48 else:
49 # Auto: if input was an async generator, preserve generator-ness
50 # on the sync side by setting CO_GENERATOR. Otherwise leave
51 # CO_GENERATOR as-is (already copied from the wrapped flags).
52 if original & CO_ASYNC_GENERATOR:
53 flags |= CO_GENERATOR
54 return flags
55
56
57class _SyncFunctionSurrogate(CallableObjectProxy):
58
59 def __init__(self, wrapped, generator=None):
60 super().__init__(wrapped)
61 self._self_generator = generator
62
63 @property
64 def __code__(self):
65 return _SyncCodeProxy(self.__wrapped__.__code__, self._self_generator)
66
67
68class _BoundSyncFunctionWrapper(BoundFunctionWrapper):
69
70 def __init__(self, *args, **kwargs):
71 super().__init__(*args, **kwargs)
72 self._self_is_not_coroutine = True
73
74 @property
75 def __func__(self):
76 return _SyncFunctionSurrogate(
77 self.__wrapped__.__func__, self._self_parent._self_generator
78 )
79
80
81class _SyncFunctionWrapper(FunctionWrapper):
82
83 __bound_function_wrapper__ = _BoundSyncFunctionWrapper
84
85 def __init__(self, wrapped, wrapper, generator=None):
86 super().__init__(wrapped, wrapper)
87 self._self_is_not_coroutine = True
88 self._self_generator = generator
89
90 @property
91 def __code__(self):
92 return _SyncCodeProxy(self.__wrapped__.__code__, self._self_generator)
93
94
95class _AsyncCodeProxy(CallableObjectProxy):
96
97 def __init__(self, wrapped, generator=None):
98 super().__init__(wrapped)
99 self._self_generator = generator
100
101 @property
102 def co_flags(self):
103 original = self.__wrapped__.co_flags
104 # Strip all four convention bits; we reassert the right ones below.
105 flags = original & ~(
106 CO_GENERATOR | CO_COROUTINE | CO_ITERABLE_COROUTINE | CO_ASYNC_GENERATOR
107 )
108 if self._self_generator is True:
109 flags |= CO_ASYNC_GENERATOR
110 elif self._self_generator is False:
111 flags |= CO_COROUTINE
112 else:
113 # Auto: if input was a generator (sync or async), produce an
114 # async generator; otherwise produce a coroutine function.
115 if original & (CO_GENERATOR | CO_ASYNC_GENERATOR):
116 flags |= CO_ASYNC_GENERATOR
117 else:
118 flags |= CO_COROUTINE
119 return flags
120
121
122class _AsyncFunctionSurrogate(CallableObjectProxy):
123
124 def __init__(self, wrapped, generator=None):
125 super().__init__(wrapped)
126 self._self_generator = generator
127
128 @property
129 def __code__(self):
130 return _AsyncCodeProxy(self.__wrapped__.__code__, self._self_generator)
131
132
133class _BoundAsyncFunctionWrapper(BoundFunctionWrapper):
134
135 @property
136 def __func__(self):
137 return _AsyncFunctionSurrogate(
138 self.__wrapped__.__func__, self._self_parent._self_generator
139 )
140
141
142class _AsyncFunctionWrapper(FunctionWrapper):
143
144 __bound_function_wrapper__ = _BoundAsyncFunctionWrapper
145
146 def __init__(self, wrapped, wrapper, generator=None):
147 super().__init__(wrapped, wrapper)
148 self._self_generator = generator
149
150 @property
151 def __code__(self):
152 return _AsyncCodeProxy(self.__wrapped__.__code__, self._self_generator)
153
154
155def mark_as_sync(wrapped=None, /, *, generator=None):
156 """Mark a callable as synchronous from the perspective of calling
157 convention detection. The returned wrapper is a pass-through that
158 reports `inspect.iscoroutinefunction()` as False regardless of
159 whether the underlying callable is declared `async def`. Useful
160 when a stacked decorator has already collapsed an async function
161 into a synchronous one (for example by using `asyncio.run()`).
162
163 The `generator` keyword toggles the sync generator bit
164 (`CO_GENERATOR`) on the resulting wrapper. Tri-state:
165
166 - `None` (default): auto. Preserve generator-ness from the input --
167 if the input was an async generator, the wrapper reports as a sync
168 generator; otherwise CO_GENERATOR is copied through unchanged.
169 - `True`: force CO_GENERATOR on. Wrapper reports as a sync generator.
170 - `False`: force CO_GENERATOR off. Wrapper reports as a plain sync
171 function even if the input had CO_GENERATOR set.
172
173 Regardless of `generator`, CO_COROUTINE, CO_ASYNC_GENERATOR, and
174 CO_ITERABLE_COROUTINE are all cleared (sync means none of those).
175 """
176
177 def _decorator(wrapped):
178 def _wrapper(wrapped, instance, args, kwargs):
179 return wrapped(*args, **kwargs)
180
181 return _SyncFunctionWrapper(wrapped, _wrapper, generator=generator)
182
183 if wrapped is None:
184 return _decorator
185 return _decorator(wrapped)
186
187
188def mark_as_async(wrapped=None, /, *, generator=None):
189 """Mark a callable as asynchronous from the perspective of calling
190 convention detection. The returned wrapper reports
191 `inspect.iscoroutinefunction()` as True regardless of whether the
192 underlying callable is declared `async def`. Useful when a stacked
193 decorator returns a coroutine from a plain `def` wrapper.
194
195 The `generator` keyword chooses between coroutine function and
196 async generator reporting. Tri-state:
197
198 - `None` (default): auto. If the input was a sync or async
199 generator, the wrapper reports as an async generator
200 (`CO_ASYNC_GENERATOR`); otherwise it reports as a coroutine
201 function (`CO_COROUTINE`).
202 - `True`: force async generator reporting (`CO_ASYNC_GENERATOR` set,
203 `CO_COROUTINE` cleared). These two flags are mutually exclusive at
204 the CPython code-object level.
205 - `False`: force coroutine function reporting (`CO_COROUTINE` set,
206 `CO_ASYNC_GENERATOR` cleared).
207
208 CO_GENERATOR and CO_ITERABLE_COROUTINE are always cleared (the
209 async path does not use either).
210 """
211
212 async def _wrapper(wrapped, instance, args, kwargs):
213 return wrapped(*args, **kwargs)
214
215 def _decorator(wrapped):
216 return _AsyncFunctionWrapper(wrapped, _wrapper, generator=generator)
217
218 if wrapped is None:
219 return _decorator
220 return _decorator(wrapped)
221
222
223def async_to_sync(wrapped):
224 """Adapt an async callable so it can be called synchronously. Each
225 call runs the coroutine to completion via `asyncio.run()`. The
226 returned wrapper reports as synchronous under
227 `inspect.iscoroutinefunction()`. Naming follows the asgiref
228 convention."""
229
230 def wrapper(wrapped, instance, args, kwargs):
231 return asyncio.run(wrapped(*args, **kwargs))
232
233 return _SyncFunctionWrapper(wrapped, wrapper)
234
235
236def sync_to_async(wrapped):
237 """Adapt a sync callable so it can be awaited. Each call dispatches
238 the synchronous work to the default executor via
239 `loop.run_in_executor()`. The returned wrapper reports as
240 asynchronous under `inspect.iscoroutinefunction()`. Naming follows
241 the asgiref convention."""
242
243 async def wrapper(wrapped, instance, args, kwargs):
244 loop = asyncio.get_running_loop()
245 return await loop.run_in_executor(None, partial(wrapped, *args, **kwargs))
246
247 return _AsyncFunctionWrapper(wrapped, wrapper)
248
249
250def _synchronized_is_async_lock(obj):
251 return iscoroutinefunction(getattr(obj, "acquire", None))
252
253
254def _synchronized_is_async_callable(obj):
255 # Walk the __wrapped__ chain, returning True as soon as any layer
256 # declares itself a coroutine function. A sync marker wrapper can
257 # carry an authoritative `_self_is_not_coroutine` attribute that
258 # short-circuits the walk before it descends into a genuinely
259 # async inner layer. Cycle / runaway-chain protection modelled on
260 # inspect.unwrap().
261
262 memo = {id(obj): obj}
263 recursion_limit = sys.getrecursionlimit()
264 target = obj
265
266 while True:
267 if isinstance(target, (classmethod, staticmethod)):
268 inner = getattr(target, "__wrapped__", None)
269 if inner is None:
270 inner = target.__func__
271 target = inner
272 id_target = id(target)
273 if id_target in memo or len(memo) >= recursion_limit:
274 raise ValueError("wrapper loop when unwrapping {!r}".format(obj))
275 memo[id_target] = target
276 continue
277
278 if getattr(target, "_self_is_not_coroutine", False):
279 return False
280
281 if iscoroutinefunction(target):
282 return True
283
284 next_target = getattr(target, "__wrapped__", None)
285 if next_target is None or next_target is target:
286 return False
287 target = next_target
288 id_target = id(target)
289 if id_target in memo or len(memo) >= recursion_limit:
290 raise ValueError("wrapper loop when unwrapping {!r}".format(obj))
291 memo[id_target] = target
292
293
294# Decorator for implementing thread synchronization. It can be used as a
295# decorator, in which case the synchronization context is determined by
296# what type of function is wrapped, or it can also be used as a context
297# manager, where the user needs to supply the correct synchronization
298# context. It is also possible to supply an object which appears to be a
299# synchronization primitive of some sort, by virtue of having release()
300# and acquire() methods. In that case that will be used directly as the
301# synchronization primitive without creating a separate lock against the
302# derived or supplied context.
303
304
305def synchronized(wrapped):
306 """Depending on the nature of the `wrapped` object, will either return a
307 decorator which can be used to wrap a function or method, or a context
308 manager, both of which will act accordingly depending on how used, to
309 synchronize access to calling of the wrapped function, or the block of
310 code within the context manager. If it is an object which is a
311 synchronization primitive, such as a threading Lock, RLock, Semaphore,
312 Condition, or Event, then it is assumed that the object is to be used
313 directly as the synchronization primitive, otherwise a lock is created
314 automatically and attached to the wrapped object and used as the
315 synchronization primitive.
316
317 Async functions are supported: if the wrapped callable is an async
318 function, an `asyncio.Lock` is created for the context and the wrapper
319 awaits the lock. The returned object also exposes `__aenter__` and
320 `__aexit__` so it can be used with `async with` to synchronise a block
321 of code using an independent per-context `asyncio.Lock`. If an object
322 with coroutine `acquire`/`release` methods (such as an `asyncio.Lock`)
323 is supplied directly, the returned decorator and context manager will
324 use it via the async protocol.
325 """
326
327 # Determine if being passed an object which is a synchronization
328 # primitive. We can't check by type for Lock, RLock, Semaphore etc,
329 # as the means of creating them isn't the type. Therefore use the
330 # existence of acquire() and release() methods. This is more
331 # extensible anyway as it allows custom synchronization mechanisms.
332
333 if hasattr(wrapped, "acquire") and hasattr(wrapped, "release"):
334 # We remember what the original lock is and then return a new
335 # decorator which accesses and locks it. When returning the new
336 # decorator we wrap it with an object proxy so we can override
337 # the context manager methods in case it is being used to wrap
338 # synchronized statements with a 'with' statement.
339
340 lock = wrapped
341
342 if _synchronized_is_async_lock(lock):
343
344 @decorator
345 async def _synchronized(wrapped, instance, args, kwargs):
346 async with lock:
347 return await wrapped(*args, **kwargs)
348
349 class _AsyncSynchronizedLockProxy(CallableObjectProxy):
350
351 async def __aenter__(self):
352 await lock.acquire()
353 return lock
354
355 async def __aexit__(self, *args):
356 lock.release()
357
358 return _AsyncSynchronizedLockProxy(wrapped=_synchronized)
359
360 @decorator
361 def _synchronized(wrapped, instance, args, kwargs):
362 # Execute the wrapped function while the original supplied
363 # lock is held.
364
365 with lock:
366 return wrapped(*args, **kwargs)
367
368 class _SynchronizedLockProxy(CallableObjectProxy):
369
370 def __enter__(self):
371 lock.acquire()
372 return lock
373
374 def __exit__(self, *args):
375 lock.release()
376
377 return _SynchronizedLockProxy(wrapped=_synchronized)
378
379 # Following only apply when the lock is being created automatically
380 # based on the context of what was supplied. In this case we supply
381 # a final decorator, but need to use FunctionWrapper directly as we
382 # want to derive from it to add context manager methods in case it is
383 # being used to wrap synchronized statements with a 'with' statement.
384
385 def _synchronized_lock(context):
386 # Attempt to retrieve the lock for the specific context.
387
388 lock = vars(context).get("_synchronized_lock", None)
389
390 if lock is None:
391 # There is no existing lock defined for the context we
392 # are dealing with so we need to create one. This needs
393 # to be done in a way to guarantee there is only one
394 # created, even if multiple threads try and create it at
395 # the same time. We can't always use the setdefault()
396 # method on the __dict__ for the context. This is the
397 # case where the context is a class, as __dict__ is
398 # actually a dictproxy. What we therefore do is use a
399 # meta lock on this wrapper itself, to control the
400 # creation and assignment of the lock attribute against
401 # the context.
402
403 with synchronized._synchronized_meta_lock:
404 # We need to check again for whether the lock we want
405 # exists in case two threads were trying to create it
406 # at the same time and were competing to create the
407 # meta lock.
408
409 lock = vars(context).get("_synchronized_lock", None)
410
411 if lock is None:
412 lock = RLock()
413 setattr(context, "_synchronized_lock", lock)
414
415 return lock
416
417 def _synchronized_async_lock(context):
418 # Per-context asyncio.Lock, created lazily on first use. Created
419 # under the shared meta lock so creation is safe across threads;
420 # the meta lock is never held across an await. asyncio.Lock is
421 # not reentrant.
422
423 lock = vars(context).get("_synchronized_async_lock", None)
424
425 if lock is None:
426 with synchronized._synchronized_meta_lock:
427 lock = vars(context).get("_synchronized_async_lock", None)
428
429 if lock is None:
430 lock = asyncio.Lock()
431 setattr(context, "_synchronized_async_lock", lock)
432
433 return lock
434
435 def _synchronized_wrapper(wrapped, instance, args, kwargs):
436 # Execute the wrapped function while the lock for the
437 # desired context is held. If instance is None then the
438 # wrapped function is used as the context.
439
440 with _synchronized_lock(instance if instance is not None else wrapped):
441 return wrapped(*args, **kwargs)
442
443 async def _synchronized_async_wrapper(wrapped, instance, args, kwargs):
444 async with _synchronized_async_lock(
445 instance if instance is not None else wrapped
446 ):
447 return await wrapped(*args, **kwargs)
448
449 class _SynchronizedFunctionWrapper(FunctionWrapper):
450
451 def __enter__(self):
452 self._self_lock = _synchronized_lock(self.__wrapped__)
453 self._self_lock.acquire()
454 return self._self_lock
455
456 def __exit__(self, *args):
457 self._self_lock.release()
458
459 async def __aenter__(self):
460 self._self_async_lock = _synchronized_async_lock(self.__wrapped__)
461 await self._self_async_lock.acquire()
462 return self._self_async_lock
463
464 async def __aexit__(self, *args):
465 self._self_async_lock.release()
466
467 if _synchronized_is_async_callable(wrapped):
468 return _SynchronizedFunctionWrapper(
469 wrapped=wrapped, wrapper=_synchronized_async_wrapper
470 )
471
472 return _SynchronizedFunctionWrapper(wrapped=wrapped, wrapper=_synchronized_wrapper)
473
474
475synchronized._synchronized_meta_lock = Lock() # type: ignore[attr-defined]