Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/asgiref/sync.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

251 statements  

1import asyncio 

2import asyncio.coroutines 

3import contextvars 

4import functools 

5import inspect 

6import os 

7import sys 

8import threading 

9import warnings 

10import weakref 

11from concurrent.futures import Future, ThreadPoolExecutor 

12from typing import ( 

13 TYPE_CHECKING, 

14 Any, 

15 Awaitable, 

16 Callable, 

17 Coroutine, 

18 Dict, 

19 Generic, 

20 List, 

21 Optional, 

22 TypeVar, 

23 Union, 

24 overload, 

25) 

26 

27from .current_thread_executor import CurrentThreadExecutor 

28from .local import Local 

29 

30if sys.version_info >= (3, 10): 

31 from typing import ParamSpec 

32else: 

33 from typing_extensions import ParamSpec 

34 

35if TYPE_CHECKING: 

36 # This is not available to import at runtime 

37 from _typeshed import OptExcInfo 

38 

39_F = TypeVar("_F", bound=Callable[..., Any]) 

40_P = ParamSpec("_P") 

41_R = TypeVar("_R") 

42 

43 

44def _restore_context(context: contextvars.Context) -> None: 

45 # Check for changes in contextvars, and set them to the current 

46 # context for downstream consumers 

47 for cvar in context: 

48 cvalue = context.get(cvar) 

49 try: 

50 if cvar.get() != cvalue: 

51 cvar.set(cvalue) 

52 except LookupError: 

53 cvar.set(cvalue) 

54 

55 

56# Python 3.12 deprecates asyncio.iscoroutinefunction() as an alias for 

57# inspect.iscoroutinefunction(), whilst also removing the _is_coroutine marker. 

58# The latter is replaced with the inspect.markcoroutinefunction decorator. 

59# Until 3.12 is the minimum supported Python version, provide a shim. 

60 

61if hasattr(inspect, "markcoroutinefunction"): 

62 iscoroutinefunction = inspect.iscoroutinefunction 

63 markcoroutinefunction: Callable[[_F], _F] = inspect.markcoroutinefunction 

64else: 

65 iscoroutinefunction = asyncio.iscoroutinefunction # type: ignore[assignment] 

66 

67 def markcoroutinefunction(func: _F) -> _F: 

68 func._is_coroutine = asyncio.coroutines._is_coroutine # type: ignore 

69 return func 

70 

71 

72class ThreadSensitiveContext: 

73 """Async context manager to manage context for thread sensitive mode 

74 

75 This context manager controls which thread pool executor is used when in 

76 thread sensitive mode. By default, a single thread pool executor is shared 

77 within a process. 

78 

79 The ThreadSensitiveContext() context manager may be used to specify a 

80 thread pool per context. 

81 

82 This context manager is re-entrant, so only the outer-most call to 

83 ThreadSensitiveContext will set the context. 

84 

85 Usage: 

86 

87 >>> import time 

88 >>> async with ThreadSensitiveContext(): 

89 ... await sync_to_async(time.sleep, 1)() 

90 """ 

91 

92 def __init__(self): 

93 self.token = None 

94 

95 async def __aenter__(self): 

96 try: 

97 SyncToAsync.thread_sensitive_context.get() 

98 except LookupError: 

99 self.token = SyncToAsync.thread_sensitive_context.set(self) 

100 

101 return self 

102 

103 async def __aexit__(self, exc, value, tb): 

104 if not self.token: 

105 return 

106 

107 executor = SyncToAsync.context_to_thread_executor.pop(self, None) 

108 if executor: 

109 executor.shutdown() 

110 SyncToAsync.thread_sensitive_context.reset(self.token) 

111 

112 

113class AsyncToSync(Generic[_P, _R]): 

114 """ 

115 Utility class which turns an awaitable that only works on the thread with 

116 the event loop into a synchronous callable that works in a subthread. 

117 

118 If the call stack contains an async loop, the code runs there. 

119 Otherwise, the code runs in a new loop in a new thread. 

120 

121 Either way, this thread then pauses and waits to run any thread_sensitive 

122 code called from further down the call stack using SyncToAsync, before 

123 finally exiting once the async task returns. 

124 """ 

125 

126 # Keeps a reference to the CurrentThreadExecutor in local context, so that 

127 # any sync_to_async inside the wrapped code can find it. 

128 executors: "Local" = Local() 

129 

130 # When we can't find a CurrentThreadExecutor from the context, such as 

131 # inside create_task, we'll look it up here from the running event loop. 

132 loop_thread_executors: "Dict[asyncio.AbstractEventLoop, CurrentThreadExecutor]" = {} 

133 

134 def __init__( 

135 self, 

136 awaitable: Union[ 

137 Callable[_P, Coroutine[Any, Any, _R]], 

138 Callable[_P, Awaitable[_R]], 

139 ], 

140 force_new_loop: bool = False, 

141 ): 

142 if not callable(awaitable) or ( 

143 not iscoroutinefunction(awaitable) 

144 and not iscoroutinefunction(getattr(awaitable, "__call__", awaitable)) 

145 ): 

146 # Python does not have very reliable detection of async functions 

147 # (lots of false negatives) so this is just a warning. 

148 warnings.warn( 

149 "async_to_sync was passed a non-async-marked callable", stacklevel=2 

150 ) 

151 self.awaitable = awaitable 

152 try: 

153 self.__self__ = self.awaitable.__self__ # type: ignore[union-attr] 

154 except AttributeError: 

155 pass 

156 self.force_new_loop = force_new_loop 

157 self.main_event_loop = None 

158 try: 

159 self.main_event_loop = asyncio.get_running_loop() 

160 except RuntimeError: 

161 # There's no event loop in this thread. 

162 pass 

163 

164 def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R: 

165 __traceback_hide__ = True # noqa: F841 

166 

167 if not self.force_new_loop and not self.main_event_loop: 

168 # There's no event loop in this thread. Look for the threadlocal if 

169 # we're inside SyncToAsync 

170 main_event_loop_pid = getattr( 

171 SyncToAsync.threadlocal, "main_event_loop_pid", None 

172 ) 

173 # We make sure the parent loop is from the same process - if 

174 # they've forked, this is not going to be valid any more (#194) 

175 if main_event_loop_pid and main_event_loop_pid == os.getpid(): 

176 self.main_event_loop = getattr( 

177 SyncToAsync.threadlocal, "main_event_loop", None 

178 ) 

179 

180 # You can't call AsyncToSync from a thread with a running event loop 

181 try: 

182 event_loop = asyncio.get_running_loop() 

183 except RuntimeError: 

184 pass 

185 else: 

186 if event_loop.is_running(): 

187 raise RuntimeError( 

188 "You cannot use AsyncToSync in the same thread as an async event loop - " 

189 "just await the async function directly." 

190 ) 

191 

192 # Make a future for the return information 

193 call_result: "Future[_R]" = Future() 

194 

195 # Make a CurrentThreadExecutor we'll use to idle in this thread - we 

196 # need one for every sync frame, even if there's one above us in the 

197 # same thread. 

198 old_executor = getattr(self.executors, "current", None) 

199 current_executor = CurrentThreadExecutor() 

200 self.executors.current = current_executor 

201 

202 # Wrapping context in list so it can be reassigned from within 

203 # `main_wrap`. 

204 context = [contextvars.copy_context()] 

205 

206 # Get task context so that parent task knows which task to propagate 

207 # an asyncio.CancelledError to. 

208 task_context = getattr(SyncToAsync.threadlocal, "task_context", None) 

209 

210 loop = None 

211 # Use call_soon_threadsafe to schedule a synchronous callback on the 

212 # main event loop's thread if it's there, otherwise make a new loop 

213 # in this thread. 

214 try: 

215 awaitable = self.main_wrap( 

216 call_result, 

217 sys.exc_info(), 

218 task_context, 

219 context, 

220 *args, 

221 **kwargs, 

222 ) 

223 

224 if not (self.main_event_loop and self.main_event_loop.is_running()): 

225 # Make our own event loop - in a new thread - and run inside that. 

226 loop = asyncio.new_event_loop() 

227 self.loop_thread_executors[loop] = current_executor 

228 loop_executor = ThreadPoolExecutor(max_workers=1) 

229 loop_future = loop_executor.submit( 

230 self._run_event_loop, loop, awaitable 

231 ) 

232 if current_executor: 

233 # Run the CurrentThreadExecutor until the future is done 

234 current_executor.run_until_future(loop_future) 

235 # Wait for future and/or allow for exception propagation 

236 loop_future.result() 

237 else: 

238 # Call it inside the existing loop 

239 self.main_event_loop.call_soon_threadsafe( 

240 self.main_event_loop.create_task, awaitable 

241 ) 

242 if current_executor: 

243 # Run the CurrentThreadExecutor until the future is done 

244 current_executor.run_until_future(call_result) 

245 finally: 

246 # Clean up any executor we were running 

247 if loop is not None: 

248 del self.loop_thread_executors[loop] 

249 _restore_context(context[0]) 

250 # Restore old current thread executor state 

251 self.executors.current = old_executor 

252 

253 # Wait for results from the future. 

254 return call_result.result() 

255 

256 def _run_event_loop(self, loop, coro): 

257 """ 

258 Runs the given event loop (designed to be called in a thread). 

259 """ 

260 asyncio.set_event_loop(loop) 

261 try: 

262 loop.run_until_complete(coro) 

263 finally: 

264 try: 

265 # mimic asyncio.run() behavior 

266 # cancel unexhausted async generators 

267 tasks = asyncio.all_tasks(loop) 

268 for task in tasks: 

269 task.cancel() 

270 

271 async def gather(): 

272 await asyncio.gather(*tasks, return_exceptions=True) 

273 

274 loop.run_until_complete(gather()) 

275 for task in tasks: 

276 if task.cancelled(): 

277 continue 

278 if task.exception() is not None: 

279 loop.call_exception_handler( 

280 { 

281 "message": "unhandled exception during loop shutdown", 

282 "exception": task.exception(), 

283 "task": task, 

284 } 

285 ) 

286 if hasattr(loop, "shutdown_asyncgens"): 

287 loop.run_until_complete(loop.shutdown_asyncgens()) 

288 finally: 

289 loop.close() 

290 asyncio.set_event_loop(self.main_event_loop) 

291 

292 def __get__(self, parent: Any, objtype: Any) -> Callable[_P, _R]: 

293 """ 

294 Include self for methods 

295 """ 

296 func = functools.partial(self.__call__, parent) 

297 return functools.update_wrapper(func, self.awaitable) 

298 

299 async def main_wrap( 

300 self, 

301 call_result: "Future[_R]", 

302 exc_info: "OptExcInfo", 

303 task_context: "Optional[List[asyncio.Task[Any]]]", 

304 context: List[contextvars.Context], 

305 *args: _P.args, 

306 **kwargs: _P.kwargs, 

307 ) -> None: 

308 """ 

309 Wraps the awaitable with something that puts the result into the 

310 result/exception future. 

311 """ 

312 

313 __traceback_hide__ = True # noqa: F841 

314 

315 if context is not None: 

316 _restore_context(context[0]) 

317 

318 current_task = asyncio.current_task() 

319 if current_task is not None and task_context is not None: 

320 task_context.append(current_task) 

321 

322 try: 

323 # If we have an exception, run the function inside the except block 

324 # after raising it so exc_info is correctly populated. 

325 if exc_info[1]: 

326 try: 

327 raise exc_info[1] 

328 except BaseException: 

329 result = await self.awaitable(*args, **kwargs) 

330 else: 

331 result = await self.awaitable(*args, **kwargs) 

332 except BaseException as e: 

333 call_result.set_exception(e) 

334 else: 

335 call_result.set_result(result) 

336 finally: 

337 if current_task is not None and task_context is not None: 

338 task_context.remove(current_task) 

339 context[0] = contextvars.copy_context() 

340 

341 

342class SyncToAsync(Generic[_P, _R]): 

343 """ 

344 Utility class which turns a synchronous callable into an awaitable that 

345 runs in a threadpool. It also sets a threadlocal inside the thread so 

346 calls to AsyncToSync can escape it. 

347 

348 If thread_sensitive is passed, the code will run in the same thread as any 

349 outer code. This is needed for underlying Python code that is not 

350 threadsafe (for example, code which handles SQLite database connections). 

351 

352 If the outermost program is async (i.e. SyncToAsync is outermost), then 

353 this will be a dedicated single sub-thread that all sync code runs in, 

354 one after the other. If the outermost program is sync (i.e. AsyncToSync is 

355 outermost), this will just be the main thread. This is achieved by idling 

356 with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent, 

357 rather than just blocking. 

358 

359 If executor is passed in, that will be used instead of the loop's default executor. 

360 In order to pass in an executor, thread_sensitive must be set to False, otherwise 

361 a TypeError will be raised. 

362 """ 

363 

364 # Storage for main event loop references 

365 threadlocal = threading.local() 

366 

367 # Single-thread executor for thread-sensitive code 

368 single_thread_executor = ThreadPoolExecutor(max_workers=1) 

369 

370 # Maintain a contextvar for the current execution context. Optionally used 

371 # for thread sensitive mode. 

372 thread_sensitive_context: "contextvars.ContextVar[ThreadSensitiveContext]" = ( 

373 contextvars.ContextVar("thread_sensitive_context") 

374 ) 

375 

376 # Contextvar that is used to detect if the single thread executor 

377 # would be awaited on while already being used in the same context 

378 deadlock_context: "contextvars.ContextVar[bool]" = contextvars.ContextVar( 

379 "deadlock_context" 

380 ) 

381 

382 # Maintaining a weak reference to the context ensures that thread pools are 

383 # erased once the context goes out of scope. This terminates the thread pool. 

384 context_to_thread_executor: "weakref.WeakKeyDictionary[ThreadSensitiveContext, ThreadPoolExecutor]" = ( 

385 weakref.WeakKeyDictionary() 

386 ) 

387 

388 def __init__( 

389 self, 

390 func: Callable[_P, _R], 

391 thread_sensitive: bool = True, 

392 executor: Optional["ThreadPoolExecutor"] = None, 

393 ) -> None: 

394 if ( 

395 not callable(func) 

396 or iscoroutinefunction(func) 

397 or iscoroutinefunction(getattr(func, "__call__", func)) 

398 ): 

399 raise TypeError("sync_to_async can only be applied to sync functions.") 

400 self.func = func 

401 functools.update_wrapper(self, func) 

402 self._thread_sensitive = thread_sensitive 

403 markcoroutinefunction(self) 

404 if thread_sensitive and executor is not None: 

405 raise TypeError("executor must not be set when thread_sensitive is True") 

406 self._executor = executor 

407 try: 

408 self.__self__ = func.__self__ # type: ignore 

409 except AttributeError: 

410 pass 

411 

412 async def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R: 

413 __traceback_hide__ = True # noqa: F841 

414 loop = asyncio.get_running_loop() 

415 

416 # Work out what thread to run the code in 

417 if self._thread_sensitive: 

418 current_thread_executor = getattr(AsyncToSync.executors, "current", None) 

419 if current_thread_executor: 

420 # If we have a parent sync thread above somewhere, use that 

421 executor = current_thread_executor 

422 elif self.thread_sensitive_context.get(None): 

423 # If we have a way of retrieving the current context, attempt 

424 # to use a per-context thread pool executor 

425 thread_sensitive_context = self.thread_sensitive_context.get() 

426 

427 if thread_sensitive_context in self.context_to_thread_executor: 

428 # Re-use thread executor in current context 

429 executor = self.context_to_thread_executor[thread_sensitive_context] 

430 else: 

431 # Create new thread executor in current context 

432 executor = ThreadPoolExecutor(max_workers=1) 

433 self.context_to_thread_executor[thread_sensitive_context] = executor 

434 elif loop in AsyncToSync.loop_thread_executors: 

435 # Re-use thread executor for running loop 

436 executor = AsyncToSync.loop_thread_executors[loop] 

437 elif self.deadlock_context.get(False): 

438 raise RuntimeError( 

439 "Single thread executor already being used, would deadlock" 

440 ) 

441 else: 

442 # Otherwise, we run it in a fixed single thread 

443 executor = self.single_thread_executor 

444 self.deadlock_context.set(True) 

445 else: 

446 # Use the passed in executor, or the loop's default if it is None 

447 executor = self._executor 

448 

449 context = contextvars.copy_context() 

450 child = functools.partial(self.func, *args, **kwargs) 

451 func = context.run 

452 task_context: List[asyncio.Task[Any]] = [] 

453 

454 # Run the code in the right thread 

455 exec_coro = loop.run_in_executor( 

456 executor, 

457 functools.partial( 

458 self.thread_handler, 

459 loop, 

460 sys.exc_info(), 

461 task_context, 

462 func, 

463 child, 

464 ), 

465 ) 

466 ret: _R 

467 try: 

468 ret = await asyncio.shield(exec_coro) 

469 except asyncio.CancelledError: 

470 cancel_parent = True 

471 try: 

472 task = task_context[0] 

473 task.cancel() 

474 try: 

475 await task 

476 cancel_parent = False 

477 except asyncio.CancelledError: 

478 pass 

479 except IndexError: 

480 pass 

481 if exec_coro.done(): 

482 raise 

483 if cancel_parent: 

484 exec_coro.cancel() 

485 ret = await exec_coro 

486 finally: 

487 _restore_context(context) 

488 self.deadlock_context.set(False) 

489 

490 return ret 

491 

492 def __get__( 

493 self, parent: Any, objtype: Any 

494 ) -> Callable[_P, Coroutine[Any, Any, _R]]: 

495 """ 

496 Include self for methods 

497 """ 

498 func = functools.partial(self.__call__, parent) 

499 return functools.update_wrapper(func, self.func) 

500 

501 def thread_handler(self, loop, exc_info, task_context, func, *args, **kwargs): 

502 """ 

503 Wraps the sync application with exception handling. 

504 """ 

505 

506 __traceback_hide__ = True # noqa: F841 

507 

508 # Set the threadlocal for AsyncToSync 

509 self.threadlocal.main_event_loop = loop 

510 self.threadlocal.main_event_loop_pid = os.getpid() 

511 self.threadlocal.task_context = task_context 

512 

513 # Run the function 

514 # If we have an exception, run the function inside the except block 

515 # after raising it so exc_info is correctly populated. 

516 if exc_info[1]: 

517 try: 

518 raise exc_info[1] 

519 except BaseException: 

520 return func(*args, **kwargs) 

521 else: 

522 return func(*args, **kwargs) 

523 

524 

525@overload 

526def async_to_sync( 

527 *, 

528 force_new_loop: bool = False, 

529) -> Callable[ 

530 [Union[Callable[_P, Coroutine[Any, Any, _R]], Callable[_P, Awaitable[_R]]]], 

531 Callable[_P, _R], 

532]: 

533 ... 

534 

535 

536@overload 

537def async_to_sync( 

538 awaitable: Union[ 

539 Callable[_P, Coroutine[Any, Any, _R]], 

540 Callable[_P, Awaitable[_R]], 

541 ], 

542 *, 

543 force_new_loop: bool = False, 

544) -> Callable[_P, _R]: 

545 ... 

546 

547 

548def async_to_sync( 

549 awaitable: Optional[ 

550 Union[ 

551 Callable[_P, Coroutine[Any, Any, _R]], 

552 Callable[_P, Awaitable[_R]], 

553 ] 

554 ] = None, 

555 *, 

556 force_new_loop: bool = False, 

557) -> Union[ 

558 Callable[ 

559 [Union[Callable[_P, Coroutine[Any, Any, _R]], Callable[_P, Awaitable[_R]]]], 

560 Callable[_P, _R], 

561 ], 

562 Callable[_P, _R], 

563]: 

564 if awaitable is None: 

565 return lambda f: AsyncToSync( 

566 f, 

567 force_new_loop=force_new_loop, 

568 ) 

569 return AsyncToSync( 

570 awaitable, 

571 force_new_loop=force_new_loop, 

572 ) 

573 

574 

575@overload 

576def sync_to_async( 

577 *, 

578 thread_sensitive: bool = True, 

579 executor: Optional["ThreadPoolExecutor"] = None, 

580) -> Callable[[Callable[_P, _R]], Callable[_P, Coroutine[Any, Any, _R]]]: 

581 ... 

582 

583 

584@overload 

585def sync_to_async( 

586 func: Callable[_P, _R], 

587 *, 

588 thread_sensitive: bool = True, 

589 executor: Optional["ThreadPoolExecutor"] = None, 

590) -> Callable[_P, Coroutine[Any, Any, _R]]: 

591 ... 

592 

593 

594def sync_to_async( 

595 func: Optional[Callable[_P, _R]] = None, 

596 *, 

597 thread_sensitive: bool = True, 

598 executor: Optional["ThreadPoolExecutor"] = None, 

599) -> Union[ 

600 Callable[[Callable[_P, _R]], Callable[_P, Coroutine[Any, Any, _R]]], 

601 Callable[_P, Coroutine[Any, Any, _R]], 

602]: 

603 if func is None: 

604 return lambda f: SyncToAsync( 

605 f, 

606 thread_sensitive=thread_sensitive, 

607 executor=executor, 

608 ) 

609 return SyncToAsync( 

610 func, 

611 thread_sensitive=thread_sensitive, 

612 executor=executor, 

613 )