Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/asgiref/sync.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

260 statements  

1import asyncio 

2import asyncio.coroutines 

3import contextvars 

4import functools 

5import inspect 

6import os 

7import sys 

8import threading 

9import warnings 

10import weakref 

11from concurrent.futures import Future, ThreadPoolExecutor 

12from typing import ( 

13 TYPE_CHECKING, 

14 Any, 

15 Awaitable, 

16 Callable, 

17 Coroutine, 

18 Dict, 

19 Generic, 

20 List, 

21 Optional, 

22 TypeVar, 

23 Union, 

24 overload, 

25) 

26 

27from .current_thread_executor import CurrentThreadExecutor 

28from .local import Local 

29 

30if sys.version_info >= (3, 10): 

31 from typing import ParamSpec 

32else: 

33 from typing_extensions import ParamSpec 

34 

35if TYPE_CHECKING: 

36 # This is not available to import at runtime 

37 from _typeshed import OptExcInfo 

38 

39_F = TypeVar("_F", bound=Callable[..., Any]) 

40_P = ParamSpec("_P") 

41_R = TypeVar("_R") 

42 

43 

44def _restore_context(context: contextvars.Context) -> None: 

45 # Check for changes in contextvars, and set them to the current 

46 # context for downstream consumers 

47 for cvar in context: 

48 cvalue = context.get(cvar) 

49 try: 

50 if cvar.get() != cvalue: 

51 cvar.set(cvalue) 

52 except LookupError: 

53 cvar.set(cvalue) 

54 

55 

56# Python 3.12 deprecates asyncio.iscoroutinefunction() as an alias for 

57# inspect.iscoroutinefunction(), whilst also removing the _is_coroutine marker. 

58# The latter is replaced with the inspect.markcoroutinefunction decorator. 

59# Until 3.12 is the minimum supported Python version, provide a shim. 

60 

61if hasattr(inspect, "markcoroutinefunction"): 

62 iscoroutinefunction = inspect.iscoroutinefunction 

63 markcoroutinefunction: Callable[[_F], _F] = inspect.markcoroutinefunction 

64else: 

65 iscoroutinefunction = asyncio.iscoroutinefunction # type: ignore[assignment] 

66 

67 def markcoroutinefunction(func: _F) -> _F: 

68 func._is_coroutine = asyncio.coroutines._is_coroutine # type: ignore 

69 return func 

70 

71 

72class AsyncSingleThreadContext: 

73 """Context manager to run async code inside the same thread. 

74 

75 Normally, AsyncToSync functions run either inside a separate ThreadPoolExecutor or 

76 the main event loop if it exists. This context manager ensures that all AsyncToSync 

77 functions execute within the same thread. 

78 

79 This context manager is re-entrant, so only the outer-most call to 

80 AsyncSingleThreadContext will set the context. 

81 

82 Usage: 

83 

84 >>> import asyncio 

85 >>> with AsyncSingleThreadContext(): 

86 ... async_to_sync(asyncio.sleep(1))() 

87 """ 

88 

89 def __init__(self): 

90 self.token = None 

91 

92 def __enter__(self): 

93 try: 

94 AsyncToSync.async_single_thread_context.get() 

95 except LookupError: 

96 self.token = AsyncToSync.async_single_thread_context.set(self) 

97 

98 return self 

99 

100 def __exit__(self, exc, value, tb): 

101 if not self.token: 

102 return 

103 

104 executor = AsyncToSync.context_to_thread_executor.pop(self, None) 

105 if executor: 

106 executor.shutdown() 

107 

108 AsyncToSync.async_single_thread_context.reset(self.token) 

109 

110 

111class ThreadSensitiveContext: 

112 """Async context manager to manage context for thread sensitive mode 

113 

114 This context manager controls which thread pool executor is used when in 

115 thread sensitive mode. By default, a single thread pool executor is shared 

116 within a process. 

117 

118 The ThreadSensitiveContext() context manager may be used to specify a 

119 thread pool per context. 

120 

121 This context manager is re-entrant, so only the outer-most call to 

122 ThreadSensitiveContext will set the context. 

123 

124 Usage: 

125 

126 >>> import time 

127 >>> async with ThreadSensitiveContext(): 

128 ... await sync_to_async(time.sleep, 1)() 

129 """ 

130 

131 def __init__(self): 

132 self.token = None 

133 

134 async def __aenter__(self): 

135 try: 

136 SyncToAsync.thread_sensitive_context.get() 

137 except LookupError: 

138 self.token = SyncToAsync.thread_sensitive_context.set(self) 

139 

140 return self 

141 

142 async def __aexit__(self, exc, value, tb): 

143 if not self.token: 

144 return 

145 

146 executor = SyncToAsync.context_to_thread_executor.pop(self, None) 

147 if executor: 

148 executor.shutdown() 

149 SyncToAsync.thread_sensitive_context.reset(self.token) 

150 

151 

152class AsyncToSync(Generic[_P, _R]): 

153 """ 

154 Utility class which turns an awaitable that only works on the thread with 

155 the event loop into a synchronous callable that works in a subthread. 

156 

157 If the call stack contains an async loop, the code runs there. 

158 Otherwise, the code runs in a new loop in a new thread. 

159 

160 Either way, this thread then pauses and waits to run any thread_sensitive 

161 code called from further down the call stack using SyncToAsync, before 

162 finally exiting once the async task returns. 

163 """ 

164 

165 # Keeps a reference to the CurrentThreadExecutor in local context, so that 

166 # any sync_to_async inside the wrapped code can find it. 

167 executors: "Local" = Local() 

168 

169 # When we can't find a CurrentThreadExecutor from the context, such as 

170 # inside create_task, we'll look it up here from the running event loop. 

171 loop_thread_executors: "Dict[asyncio.AbstractEventLoop, CurrentThreadExecutor]" = {} 

172 

173 async_single_thread_context: "contextvars.ContextVar[AsyncSingleThreadContext]" = ( 

174 contextvars.ContextVar("async_single_thread_context") 

175 ) 

176 

177 context_to_thread_executor: "weakref.WeakKeyDictionary[AsyncSingleThreadContext, ThreadPoolExecutor]" = ( 

178 weakref.WeakKeyDictionary() 

179 ) 

180 

181 def __init__( 

182 self, 

183 awaitable: Union[ 

184 Callable[_P, Coroutine[Any, Any, _R]], 

185 Callable[_P, Awaitable[_R]], 

186 ], 

187 force_new_loop: bool = False, 

188 ): 

189 if not callable(awaitable) or ( 

190 not iscoroutinefunction(awaitable) 

191 and not iscoroutinefunction(getattr(awaitable, "__call__", awaitable)) 

192 ): 

193 # Python does not have very reliable detection of async functions 

194 # (lots of false negatives) so this is just a warning. 

195 warnings.warn( 

196 "async_to_sync was passed a non-async-marked callable", stacklevel=2 

197 ) 

198 self.awaitable = awaitable 

199 try: 

200 self.__self__ = self.awaitable.__self__ # type: ignore[union-attr] 

201 except AttributeError: 

202 pass 

203 self.force_new_loop = force_new_loop 

204 self.main_event_loop = None 

205 try: 

206 self.main_event_loop = asyncio.get_running_loop() 

207 except RuntimeError: 

208 # There's no event loop in this thread. 

209 pass 

210 

211 def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R: 

212 __traceback_hide__ = True # noqa: F841 

213 

214 if not self.force_new_loop and not self.main_event_loop: 

215 # There's no event loop in this thread. Look for the threadlocal if 

216 # we're inside SyncToAsync 

217 main_event_loop_pid = getattr( 

218 SyncToAsync.threadlocal, "main_event_loop_pid", None 

219 ) 

220 # We make sure the parent loop is from the same process - if 

221 # they've forked, this is not going to be valid any more (#194) 

222 if main_event_loop_pid and main_event_loop_pid == os.getpid(): 

223 self.main_event_loop = getattr( 

224 SyncToAsync.threadlocal, "main_event_loop", None 

225 ) 

226 

227 # You can't call AsyncToSync from a thread with a running event loop 

228 try: 

229 asyncio.get_running_loop() 

230 except RuntimeError: 

231 pass 

232 else: 

233 raise RuntimeError( 

234 "You cannot use AsyncToSync in the same thread as an async event loop - " 

235 "just await the async function directly." 

236 ) 

237 

238 # Make a future for the return information 

239 call_result: "Future[_R]" = Future() 

240 

241 # Make a CurrentThreadExecutor we'll use to idle in this thread - we 

242 # need one for every sync frame, even if there's one above us in the 

243 # same thread. 

244 old_executor = getattr(self.executors, "current", None) 

245 current_executor = CurrentThreadExecutor(old_executor) 

246 self.executors.current = current_executor 

247 

248 # Wrapping context in list so it can be reassigned from within 

249 # `main_wrap`. 

250 context = [contextvars.copy_context()] 

251 

252 # Get task context so that parent task knows which task to propagate 

253 # an asyncio.CancelledError to. 

254 task_context = getattr(SyncToAsync.threadlocal, "task_context", None) 

255 

256 # Use call_soon_threadsafe to schedule a synchronous callback on the 

257 # main event loop's thread if it's there, otherwise make a new loop 

258 # in this thread. 

259 try: 

260 awaitable = self.main_wrap( 

261 call_result, 

262 sys.exc_info(), 

263 task_context, 

264 context, 

265 # prepare an awaitable which can be passed as is to self.main_wrap, 

266 # so that `args` and `kwargs` don't need to be 

267 # destructured when passed to self.main_wrap 

268 # (which is required by `ParamSpec`) 

269 # as that may cause overlapping arguments 

270 self.awaitable(*args, **kwargs), 

271 ) 

272 

273 async def new_loop_wrap() -> None: 

274 loop = asyncio.get_running_loop() 

275 self.loop_thread_executors[loop] = current_executor 

276 try: 

277 await awaitable 

278 finally: 

279 del self.loop_thread_executors[loop] 

280 

281 if self.main_event_loop is not None: 

282 try: 

283 self.main_event_loop.call_soon_threadsafe( 

284 self.main_event_loop.create_task, awaitable 

285 ) 

286 except RuntimeError: 

287 running_in_main_event_loop = False 

288 else: 

289 running_in_main_event_loop = True 

290 # Run the CurrentThreadExecutor until the future is done. 

291 current_executor.run_until_future(call_result) 

292 else: 

293 running_in_main_event_loop = False 

294 

295 if not running_in_main_event_loop: 

296 loop_executor = None 

297 

298 if self.async_single_thread_context.get(None): 

299 single_thread_context = self.async_single_thread_context.get() 

300 

301 if single_thread_context in self.context_to_thread_executor: 

302 loop_executor = self.context_to_thread_executor[ 

303 single_thread_context 

304 ] 

305 else: 

306 loop_executor = ThreadPoolExecutor(max_workers=1) 

307 self.context_to_thread_executor[ 

308 single_thread_context 

309 ] = loop_executor 

310 else: 

311 # Make our own event loop - in a new thread - and run inside that. 

312 loop_executor = ThreadPoolExecutor(max_workers=1) 

313 

314 loop_future = loop_executor.submit(asyncio.run, new_loop_wrap()) 

315 # Run the CurrentThreadExecutor until the future is done. 

316 current_executor.run_until_future(loop_future) 

317 # Wait for future and/or allow for exception propagation 

318 loop_future.result() 

319 finally: 

320 _restore_context(context[0]) 

321 # Restore old current thread executor state 

322 self.executors.current = old_executor 

323 

324 # Wait for results from the future. 

325 return call_result.result() 

326 

327 def __get__(self, parent: Any, objtype: Any) -> Callable[_P, _R]: 

328 """ 

329 Include self for methods 

330 """ 

331 func = functools.partial(self.__call__, parent) 

332 return functools.update_wrapper(func, self.awaitable) 

333 

334 async def main_wrap( 

335 self, 

336 call_result: "Future[_R]", 

337 exc_info: "OptExcInfo", 

338 task_context: "Optional[List[asyncio.Task[Any]]]", 

339 context: List[contextvars.Context], 

340 awaitable: Union[Coroutine[Any, Any, _R], Awaitable[_R]], 

341 ) -> None: 

342 """ 

343 Wraps the awaitable with something that puts the result into the 

344 result/exception future. 

345 """ 

346 

347 __traceback_hide__ = True # noqa: F841 

348 

349 if context is not None: 

350 _restore_context(context[0]) 

351 

352 current_task = asyncio.current_task() 

353 if current_task is not None and task_context is not None: 

354 task_context.append(current_task) 

355 

356 try: 

357 # If we have an exception, run the function inside the except block 

358 # after raising it so exc_info is correctly populated. 

359 if exc_info[1]: 

360 try: 

361 raise exc_info[1] 

362 except BaseException: 

363 result = await awaitable 

364 else: 

365 result = await awaitable 

366 except BaseException as e: 

367 call_result.set_exception(e) 

368 else: 

369 call_result.set_result(result) 

370 finally: 

371 if current_task is not None and task_context is not None: 

372 task_context.remove(current_task) 

373 context[0] = contextvars.copy_context() 

374 

375 

376class SyncToAsync(Generic[_P, _R]): 

377 """ 

378 Utility class which turns a synchronous callable into an awaitable that 

379 runs in a threadpool. It also sets a threadlocal inside the thread so 

380 calls to AsyncToSync can escape it. 

381 

382 If thread_sensitive is passed, the code will run in the same thread as any 

383 outer code. This is needed for underlying Python code that is not 

384 threadsafe (for example, code which handles SQLite database connections). 

385 

386 If the outermost program is async (i.e. SyncToAsync is outermost), then 

387 this will be a dedicated single sub-thread that all sync code runs in, 

388 one after the other. If the outermost program is sync (i.e. AsyncToSync is 

389 outermost), this will just be the main thread. This is achieved by idling 

390 with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent, 

391 rather than just blocking. 

392 

393 If executor is passed in, that will be used instead of the loop's default executor. 

394 In order to pass in an executor, thread_sensitive must be set to False, otherwise 

395 a TypeError will be raised. 

396 """ 

397 

398 # Storage for main event loop references 

399 threadlocal = threading.local() 

400 

401 # Single-thread executor for thread-sensitive code 

402 single_thread_executor = ThreadPoolExecutor(max_workers=1) 

403 

404 # Maintain a contextvar for the current execution context. Optionally used 

405 # for thread sensitive mode. 

406 thread_sensitive_context: "contextvars.ContextVar[ThreadSensitiveContext]" = ( 

407 contextvars.ContextVar("thread_sensitive_context") 

408 ) 

409 

410 # Contextvar that is used to detect if the single thread executor 

411 # would be awaited on while already being used in the same context 

412 deadlock_context: "contextvars.ContextVar[bool]" = contextvars.ContextVar( 

413 "deadlock_context" 

414 ) 

415 

416 # Maintaining a weak reference to the context ensures that thread pools are 

417 # erased once the context goes out of scope. This terminates the thread pool. 

418 context_to_thread_executor: "weakref.WeakKeyDictionary[ThreadSensitiveContext, ThreadPoolExecutor]" = ( 

419 weakref.WeakKeyDictionary() 

420 ) 

421 

422 def __init__( 

423 self, 

424 func: Callable[_P, _R], 

425 thread_sensitive: bool = True, 

426 executor: Optional["ThreadPoolExecutor"] = None, 

427 ) -> None: 

428 if ( 

429 not callable(func) 

430 or iscoroutinefunction(func) 

431 or iscoroutinefunction(getattr(func, "__call__", func)) 

432 ): 

433 raise TypeError("sync_to_async can only be applied to sync functions.") 

434 self.func = func 

435 functools.update_wrapper(self, func) 

436 self._thread_sensitive = thread_sensitive 

437 markcoroutinefunction(self) 

438 if thread_sensitive and executor is not None: 

439 raise TypeError("executor must not be set when thread_sensitive is True") 

440 self._executor = executor 

441 try: 

442 self.__self__ = func.__self__ # type: ignore 

443 except AttributeError: 

444 pass 

445 

446 async def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R: 

447 __traceback_hide__ = True # noqa: F841 

448 loop = asyncio.get_running_loop() 

449 

450 # Work out what thread to run the code in 

451 if self._thread_sensitive: 

452 current_thread_executor = getattr(AsyncToSync.executors, "current", None) 

453 if current_thread_executor: 

454 # If we have a parent sync thread above somewhere, use that 

455 executor = current_thread_executor 

456 elif self.thread_sensitive_context.get(None): 

457 # If we have a way of retrieving the current context, attempt 

458 # to use a per-context thread pool executor 

459 thread_sensitive_context = self.thread_sensitive_context.get() 

460 

461 if thread_sensitive_context in self.context_to_thread_executor: 

462 # Re-use thread executor in current context 

463 executor = self.context_to_thread_executor[thread_sensitive_context] 

464 else: 

465 # Create new thread executor in current context 

466 executor = ThreadPoolExecutor(max_workers=1) 

467 self.context_to_thread_executor[thread_sensitive_context] = executor 

468 elif loop in AsyncToSync.loop_thread_executors: 

469 # Re-use thread executor for running loop 

470 executor = AsyncToSync.loop_thread_executors[loop] 

471 elif self.deadlock_context.get(False): 

472 raise RuntimeError( 

473 "Single thread executor already being used, would deadlock" 

474 ) 

475 else: 

476 # Otherwise, we run it in a fixed single thread 

477 executor = self.single_thread_executor 

478 self.deadlock_context.set(True) 

479 else: 

480 # Use the passed in executor, or the loop's default if it is None 

481 executor = self._executor 

482 

483 context = contextvars.copy_context() 

484 child = functools.partial(self.func, *args, **kwargs) 

485 func = context.run 

486 task_context: List[asyncio.Task[Any]] = [] 

487 

488 # Run the code in the right thread 

489 exec_coro = loop.run_in_executor( 

490 executor, 

491 functools.partial( 

492 self.thread_handler, 

493 loop, 

494 sys.exc_info(), 

495 task_context, 

496 func, 

497 child, 

498 ), 

499 ) 

500 ret: _R 

501 try: 

502 ret = await asyncio.shield(exec_coro) 

503 except asyncio.CancelledError: 

504 cancel_parent = True 

505 try: 

506 task = task_context[0] 

507 task.cancel() 

508 try: 

509 await task 

510 cancel_parent = False 

511 except asyncio.CancelledError: 

512 pass 

513 except IndexError: 

514 pass 

515 if exec_coro.done(): 

516 raise 

517 if cancel_parent: 

518 exec_coro.cancel() 

519 ret = await exec_coro 

520 finally: 

521 _restore_context(context) 

522 self.deadlock_context.set(False) 

523 

524 return ret 

525 

526 def __get__( 

527 self, parent: Any, objtype: Any 

528 ) -> Callable[_P, Coroutine[Any, Any, _R]]: 

529 """ 

530 Include self for methods 

531 """ 

532 func = functools.partial(self.__call__, parent) 

533 return functools.update_wrapper(func, self.func) 

534 

535 def thread_handler(self, loop, exc_info, task_context, func, *args, **kwargs): 

536 """ 

537 Wraps the sync application with exception handling. 

538 """ 

539 

540 __traceback_hide__ = True # noqa: F841 

541 

542 # Set the threadlocal for AsyncToSync 

543 self.threadlocal.main_event_loop = loop 

544 self.threadlocal.main_event_loop_pid = os.getpid() 

545 self.threadlocal.task_context = task_context 

546 

547 # Run the function 

548 # If we have an exception, run the function inside the except block 

549 # after raising it so exc_info is correctly populated. 

550 if exc_info[1]: 

551 try: 

552 raise exc_info[1] 

553 except BaseException: 

554 return func(*args, **kwargs) 

555 else: 

556 return func(*args, **kwargs) 

557 

558 

559@overload 

560def async_to_sync( 

561 *, 

562 force_new_loop: bool = False, 

563) -> Callable[ 

564 [Union[Callable[_P, Coroutine[Any, Any, _R]], Callable[_P, Awaitable[_R]]]], 

565 Callable[_P, _R], 

566]: 

567 ... 

568 

569 

570@overload 

571def async_to_sync( 

572 awaitable: Union[ 

573 Callable[_P, Coroutine[Any, Any, _R]], 

574 Callable[_P, Awaitable[_R]], 

575 ], 

576 *, 

577 force_new_loop: bool = False, 

578) -> Callable[_P, _R]: 

579 ... 

580 

581 

582def async_to_sync( 

583 awaitable: Optional[ 

584 Union[ 

585 Callable[_P, Coroutine[Any, Any, _R]], 

586 Callable[_P, Awaitable[_R]], 

587 ] 

588 ] = None, 

589 *, 

590 force_new_loop: bool = False, 

591) -> Union[ 

592 Callable[ 

593 [Union[Callable[_P, Coroutine[Any, Any, _R]], Callable[_P, Awaitable[_R]]]], 

594 Callable[_P, _R], 

595 ], 

596 Callable[_P, _R], 

597]: 

598 if awaitable is None: 

599 return lambda f: AsyncToSync( 

600 f, 

601 force_new_loop=force_new_loop, 

602 ) 

603 return AsyncToSync( 

604 awaitable, 

605 force_new_loop=force_new_loop, 

606 ) 

607 

608 

609@overload 

610def sync_to_async( 

611 *, 

612 thread_sensitive: bool = True, 

613 executor: Optional["ThreadPoolExecutor"] = None, 

614) -> Callable[[Callable[_P, _R]], Callable[_P, Coroutine[Any, Any, _R]]]: 

615 ... 

616 

617 

618@overload 

619def sync_to_async( 

620 func: Callable[_P, _R], 

621 *, 

622 thread_sensitive: bool = True, 

623 executor: Optional["ThreadPoolExecutor"] = None, 

624) -> Callable[_P, Coroutine[Any, Any, _R]]: 

625 ... 

626 

627 

628def sync_to_async( 

629 func: Optional[Callable[_P, _R]] = None, 

630 *, 

631 thread_sensitive: bool = True, 

632 executor: Optional["ThreadPoolExecutor"] = None, 

633) -> Union[ 

634 Callable[[Callable[_P, _R]], Callable[_P, Coroutine[Any, Any, _R]]], 

635 Callable[_P, Coroutine[Any, Any, _R]], 

636]: 

637 if func is None: 

638 return lambda f: SyncToAsync( 

639 f, 

640 thread_sensitive=thread_sensitive, 

641 executor=executor, 

642 ) 

643 return SyncToAsync( 

644 func, 

645 thread_sensitive=thread_sensitive, 

646 executor=executor, 

647 )