Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/asgiref/sync.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

235 statements  

1import asyncio 

2import asyncio.coroutines 

3import contextvars 

4import functools 

5import inspect 

6import os 

7import sys 

8import threading 

9import warnings 

10import weakref 

11from concurrent.futures import Future, ThreadPoolExecutor 

12from typing import ( 

13 TYPE_CHECKING, 

14 Any, 

15 Awaitable, 

16 Callable, 

17 Coroutine, 

18 Dict, 

19 Generic, 

20 List, 

21 Optional, 

22 TypeVar, 

23 Union, 

24 overload, 

25) 

26 

27from .current_thread_executor import CurrentThreadExecutor 

28from .local import Local 

29 

30if sys.version_info >= (3, 10): 

31 from typing import ParamSpec 

32else: 

33 from typing_extensions import ParamSpec 

34 

35if TYPE_CHECKING: 

36 # This is not available to import at runtime 

37 from _typeshed import OptExcInfo 

38 

39_F = TypeVar("_F", bound=Callable[..., Any]) 

40_P = ParamSpec("_P") 

41_R = TypeVar("_R") 

42 

43 

44def _restore_context(context: contextvars.Context) -> None: 

45 # Check for changes in contextvars, and set them to the current 

46 # context for downstream consumers 

47 for cvar in context: 

48 cvalue = context.get(cvar) 

49 try: 

50 if cvar.get() != cvalue: 

51 cvar.set(cvalue) 

52 except LookupError: 

53 cvar.set(cvalue) 

54 

55 

56# Python 3.12 deprecates asyncio.iscoroutinefunction() as an alias for 

57# inspect.iscoroutinefunction(), whilst also removing the _is_coroutine marker. 

58# The latter is replaced with the inspect.markcoroutinefunction decorator. 

59# Until 3.12 is the minimum supported Python version, provide a shim. 

60 

61if hasattr(inspect, "markcoroutinefunction"): 

62 iscoroutinefunction = inspect.iscoroutinefunction 

63 markcoroutinefunction: Callable[[_F], _F] = inspect.markcoroutinefunction 

64else: 

65 iscoroutinefunction = asyncio.iscoroutinefunction # type: ignore[assignment] 

66 

67 def markcoroutinefunction(func: _F) -> _F: 

68 func._is_coroutine = asyncio.coroutines._is_coroutine # type: ignore 

69 return func 

70 

71 

72class ThreadSensitiveContext: 

73 """Async context manager to manage context for thread sensitive mode 

74 

75 This context manager controls which thread pool executor is used when in 

76 thread sensitive mode. By default, a single thread pool executor is shared 

77 within a process. 

78 

79 The ThreadSensitiveContext() context manager may be used to specify a 

80 thread pool per context. 

81 

82 This context manager is re-entrant, so only the outer-most call to 

83 ThreadSensitiveContext will set the context. 

84 

85 Usage: 

86 

87 >>> import time 

88 >>> async with ThreadSensitiveContext(): 

89 ... await sync_to_async(time.sleep, 1)() 

90 """ 

91 

92 def __init__(self): 

93 self.token = None 

94 

95 async def __aenter__(self): 

96 try: 

97 SyncToAsync.thread_sensitive_context.get() 

98 except LookupError: 

99 self.token = SyncToAsync.thread_sensitive_context.set(self) 

100 

101 return self 

102 

103 async def __aexit__(self, exc, value, tb): 

104 if not self.token: 

105 return 

106 

107 executor = SyncToAsync.context_to_thread_executor.pop(self, None) 

108 if executor: 

109 executor.shutdown() 

110 SyncToAsync.thread_sensitive_context.reset(self.token) 

111 

112 

113class AsyncToSync(Generic[_P, _R]): 

114 """ 

115 Utility class which turns an awaitable that only works on the thread with 

116 the event loop into a synchronous callable that works in a subthread. 

117 

118 If the call stack contains an async loop, the code runs there. 

119 Otherwise, the code runs in a new loop in a new thread. 

120 

121 Either way, this thread then pauses and waits to run any thread_sensitive 

122 code called from further down the call stack using SyncToAsync, before 

123 finally exiting once the async task returns. 

124 """ 

125 

126 # Keeps a reference to the CurrentThreadExecutor in local context, so that 

127 # any sync_to_async inside the wrapped code can find it. 

128 executors: "Local" = Local() 

129 

130 # When we can't find a CurrentThreadExecutor from the context, such as 

131 # inside create_task, we'll look it up here from the running event loop. 

132 loop_thread_executors: "Dict[asyncio.AbstractEventLoop, CurrentThreadExecutor]" = {} 

133 

134 def __init__( 

135 self, 

136 awaitable: Union[ 

137 Callable[_P, Coroutine[Any, Any, _R]], 

138 Callable[_P, Awaitable[_R]], 

139 ], 

140 force_new_loop: bool = False, 

141 ): 

142 if not callable(awaitable) or ( 

143 not iscoroutinefunction(awaitable) 

144 and not iscoroutinefunction(getattr(awaitable, "__call__", awaitable)) 

145 ): 

146 # Python does not have very reliable detection of async functions 

147 # (lots of false negatives) so this is just a warning. 

148 warnings.warn( 

149 "async_to_sync was passed a non-async-marked callable", stacklevel=2 

150 ) 

151 self.awaitable = awaitable 

152 try: 

153 self.__self__ = self.awaitable.__self__ # type: ignore[union-attr] 

154 except AttributeError: 

155 pass 

156 self.force_new_loop = force_new_loop 

157 self.main_event_loop = None 

158 try: 

159 self.main_event_loop = asyncio.get_running_loop() 

160 except RuntimeError: 

161 # There's no event loop in this thread. 

162 pass 

163 

164 def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R: 

165 __traceback_hide__ = True # noqa: F841 

166 

167 if not self.force_new_loop and not self.main_event_loop: 

168 # There's no event loop in this thread. Look for the threadlocal if 

169 # we're inside SyncToAsync 

170 main_event_loop_pid = getattr( 

171 SyncToAsync.threadlocal, "main_event_loop_pid", None 

172 ) 

173 # We make sure the parent loop is from the same process - if 

174 # they've forked, this is not going to be valid any more (#194) 

175 if main_event_loop_pid and main_event_loop_pid == os.getpid(): 

176 self.main_event_loop = getattr( 

177 SyncToAsync.threadlocal, "main_event_loop", None 

178 ) 

179 

180 # You can't call AsyncToSync from a thread with a running event loop 

181 try: 

182 asyncio.get_running_loop() 

183 except RuntimeError: 

184 pass 

185 else: 

186 raise RuntimeError( 

187 "You cannot use AsyncToSync in the same thread as an async event loop - " 

188 "just await the async function directly." 

189 ) 

190 

191 # Make a future for the return information 

192 call_result: "Future[_R]" = Future() 

193 

194 # Make a CurrentThreadExecutor we'll use to idle in this thread - we 

195 # need one for every sync frame, even if there's one above us in the 

196 # same thread. 

197 old_executor = getattr(self.executors, "current", None) 

198 current_executor = CurrentThreadExecutor(old_executor) 

199 self.executors.current = current_executor 

200 

201 # Wrapping context in list so it can be reassigned from within 

202 # `main_wrap`. 

203 context = [contextvars.copy_context()] 

204 

205 # Get task context so that parent task knows which task to propagate 

206 # an asyncio.CancelledError to. 

207 task_context = getattr(SyncToAsync.threadlocal, "task_context", None) 

208 

209 # Use call_soon_threadsafe to schedule a synchronous callback on the 

210 # main event loop's thread if it's there, otherwise make a new loop 

211 # in this thread. 

212 try: 

213 awaitable = self.main_wrap( 

214 call_result, 

215 sys.exc_info(), 

216 task_context, 

217 context, 

218 # prepare an awaitable which can be passed as is to self.main_wrap, 

219 # so that `args` and `kwargs` don't need to be 

220 # destructured when passed to self.main_wrap 

221 # (which is required by `ParamSpec`) 

222 # as that may cause overlapping arguments 

223 self.awaitable(*args, **kwargs), 

224 ) 

225 

226 async def new_loop_wrap() -> None: 

227 loop = asyncio.get_running_loop() 

228 self.loop_thread_executors[loop] = current_executor 

229 try: 

230 await awaitable 

231 finally: 

232 del self.loop_thread_executors[loop] 

233 

234 if self.main_event_loop is not None: 

235 try: 

236 self.main_event_loop.call_soon_threadsafe( 

237 self.main_event_loop.create_task, awaitable 

238 ) 

239 except RuntimeError: 

240 running_in_main_event_loop = False 

241 else: 

242 running_in_main_event_loop = True 

243 # Run the CurrentThreadExecutor until the future is done. 

244 current_executor.run_until_future(call_result) 

245 else: 

246 running_in_main_event_loop = False 

247 

248 if not running_in_main_event_loop: 

249 # Make our own event loop - in a new thread - and run inside that. 

250 loop_executor = ThreadPoolExecutor(max_workers=1) 

251 loop_future = loop_executor.submit(asyncio.run, new_loop_wrap()) 

252 # Run the CurrentThreadExecutor until the future is done. 

253 current_executor.run_until_future(loop_future) 

254 # Wait for future and/or allow for exception propagation 

255 loop_future.result() 

256 finally: 

257 _restore_context(context[0]) 

258 # Restore old current thread executor state 

259 self.executors.current = old_executor 

260 

261 # Wait for results from the future. 

262 return call_result.result() 

263 

264 def __get__(self, parent: Any, objtype: Any) -> Callable[_P, _R]: 

265 """ 

266 Include self for methods 

267 """ 

268 func = functools.partial(self.__call__, parent) 

269 return functools.update_wrapper(func, self.awaitable) 

270 

271 async def main_wrap( 

272 self, 

273 call_result: "Future[_R]", 

274 exc_info: "OptExcInfo", 

275 task_context: "Optional[List[asyncio.Task[Any]]]", 

276 context: List[contextvars.Context], 

277 awaitable: Union[Coroutine[Any, Any, _R], Awaitable[_R]], 

278 ) -> None: 

279 """ 

280 Wraps the awaitable with something that puts the result into the 

281 result/exception future. 

282 """ 

283 

284 __traceback_hide__ = True # noqa: F841 

285 

286 if context is not None: 

287 _restore_context(context[0]) 

288 

289 current_task = asyncio.current_task() 

290 if current_task is not None and task_context is not None: 

291 task_context.append(current_task) 

292 

293 try: 

294 # If we have an exception, run the function inside the except block 

295 # after raising it so exc_info is correctly populated. 

296 if exc_info[1]: 

297 try: 

298 raise exc_info[1] 

299 except BaseException: 

300 result = await awaitable 

301 else: 

302 result = await awaitable 

303 except BaseException as e: 

304 call_result.set_exception(e) 

305 else: 

306 call_result.set_result(result) 

307 finally: 

308 if current_task is not None and task_context is not None: 

309 task_context.remove(current_task) 

310 context[0] = contextvars.copy_context() 

311 

312 

313class SyncToAsync(Generic[_P, _R]): 

314 """ 

315 Utility class which turns a synchronous callable into an awaitable that 

316 runs in a threadpool. It also sets a threadlocal inside the thread so 

317 calls to AsyncToSync can escape it. 

318 

319 If thread_sensitive is passed, the code will run in the same thread as any 

320 outer code. This is needed for underlying Python code that is not 

321 threadsafe (for example, code which handles SQLite database connections). 

322 

323 If the outermost program is async (i.e. SyncToAsync is outermost), then 

324 this will be a dedicated single sub-thread that all sync code runs in, 

325 one after the other. If the outermost program is sync (i.e. AsyncToSync is 

326 outermost), this will just be the main thread. This is achieved by idling 

327 with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent, 

328 rather than just blocking. 

329 

330 If executor is passed in, that will be used instead of the loop's default executor. 

331 In order to pass in an executor, thread_sensitive must be set to False, otherwise 

332 a TypeError will be raised. 

333 """ 

334 

335 # Storage for main event loop references 

336 threadlocal = threading.local() 

337 

338 # Single-thread executor for thread-sensitive code 

339 single_thread_executor = ThreadPoolExecutor(max_workers=1) 

340 

341 # Maintain a contextvar for the current execution context. Optionally used 

342 # for thread sensitive mode. 

343 thread_sensitive_context: "contextvars.ContextVar[ThreadSensitiveContext]" = ( 

344 contextvars.ContextVar("thread_sensitive_context") 

345 ) 

346 

347 # Contextvar that is used to detect if the single thread executor 

348 # would be awaited on while already being used in the same context 

349 deadlock_context: "contextvars.ContextVar[bool]" = contextvars.ContextVar( 

350 "deadlock_context" 

351 ) 

352 

353 # Maintaining a weak reference to the context ensures that thread pools are 

354 # erased once the context goes out of scope. This terminates the thread pool. 

355 context_to_thread_executor: "weakref.WeakKeyDictionary[ThreadSensitiveContext, ThreadPoolExecutor]" = ( 

356 weakref.WeakKeyDictionary() 

357 ) 

358 

359 def __init__( 

360 self, 

361 func: Callable[_P, _R], 

362 thread_sensitive: bool = True, 

363 executor: Optional["ThreadPoolExecutor"] = None, 

364 ) -> None: 

365 if ( 

366 not callable(func) 

367 or iscoroutinefunction(func) 

368 or iscoroutinefunction(getattr(func, "__call__", func)) 

369 ): 

370 raise TypeError("sync_to_async can only be applied to sync functions.") 

371 self.func = func 

372 functools.update_wrapper(self, func) 

373 self._thread_sensitive = thread_sensitive 

374 markcoroutinefunction(self) 

375 if thread_sensitive and executor is not None: 

376 raise TypeError("executor must not be set when thread_sensitive is True") 

377 self._executor = executor 

378 try: 

379 self.__self__ = func.__self__ # type: ignore 

380 except AttributeError: 

381 pass 

382 

383 async def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R: 

384 __traceback_hide__ = True # noqa: F841 

385 loop = asyncio.get_running_loop() 

386 

387 # Work out what thread to run the code in 

388 if self._thread_sensitive: 

389 current_thread_executor = getattr(AsyncToSync.executors, "current", None) 

390 if current_thread_executor: 

391 # If we have a parent sync thread above somewhere, use that 

392 executor = current_thread_executor 

393 elif self.thread_sensitive_context.get(None): 

394 # If we have a way of retrieving the current context, attempt 

395 # to use a per-context thread pool executor 

396 thread_sensitive_context = self.thread_sensitive_context.get() 

397 

398 if thread_sensitive_context in self.context_to_thread_executor: 

399 # Re-use thread executor in current context 

400 executor = self.context_to_thread_executor[thread_sensitive_context] 

401 else: 

402 # Create new thread executor in current context 

403 executor = ThreadPoolExecutor(max_workers=1) 

404 self.context_to_thread_executor[thread_sensitive_context] = executor 

405 elif loop in AsyncToSync.loop_thread_executors: 

406 # Re-use thread executor for running loop 

407 executor = AsyncToSync.loop_thread_executors[loop] 

408 elif self.deadlock_context.get(False): 

409 raise RuntimeError( 

410 "Single thread executor already being used, would deadlock" 

411 ) 

412 else: 

413 # Otherwise, we run it in a fixed single thread 

414 executor = self.single_thread_executor 

415 self.deadlock_context.set(True) 

416 else: 

417 # Use the passed in executor, or the loop's default if it is None 

418 executor = self._executor 

419 

420 context = contextvars.copy_context() 

421 child = functools.partial(self.func, *args, **kwargs) 

422 func = context.run 

423 task_context: List[asyncio.Task[Any]] = [] 

424 

425 # Run the code in the right thread 

426 exec_coro = loop.run_in_executor( 

427 executor, 

428 functools.partial( 

429 self.thread_handler, 

430 loop, 

431 sys.exc_info(), 

432 task_context, 

433 func, 

434 child, 

435 ), 

436 ) 

437 ret: _R 

438 try: 

439 ret = await asyncio.shield(exec_coro) 

440 except asyncio.CancelledError: 

441 cancel_parent = True 

442 try: 

443 task = task_context[0] 

444 task.cancel() 

445 try: 

446 await task 

447 cancel_parent = False 

448 except asyncio.CancelledError: 

449 pass 

450 except IndexError: 

451 pass 

452 if exec_coro.done(): 

453 raise 

454 if cancel_parent: 

455 exec_coro.cancel() 

456 ret = await exec_coro 

457 finally: 

458 _restore_context(context) 

459 self.deadlock_context.set(False) 

460 

461 return ret 

462 

463 def __get__( 

464 self, parent: Any, objtype: Any 

465 ) -> Callable[_P, Coroutine[Any, Any, _R]]: 

466 """ 

467 Include self for methods 

468 """ 

469 func = functools.partial(self.__call__, parent) 

470 return functools.update_wrapper(func, self.func) 

471 

472 def thread_handler(self, loop, exc_info, task_context, func, *args, **kwargs): 

473 """ 

474 Wraps the sync application with exception handling. 

475 """ 

476 

477 __traceback_hide__ = True # noqa: F841 

478 

479 # Set the threadlocal for AsyncToSync 

480 self.threadlocal.main_event_loop = loop 

481 self.threadlocal.main_event_loop_pid = os.getpid() 

482 self.threadlocal.task_context = task_context 

483 

484 # Run the function 

485 # If we have an exception, run the function inside the except block 

486 # after raising it so exc_info is correctly populated. 

487 if exc_info[1]: 

488 try: 

489 raise exc_info[1] 

490 except BaseException: 

491 return func(*args, **kwargs) 

492 else: 

493 return func(*args, **kwargs) 

494 

495 

496@overload 

497def async_to_sync( 

498 *, 

499 force_new_loop: bool = False, 

500) -> Callable[ 

501 [Union[Callable[_P, Coroutine[Any, Any, _R]], Callable[_P, Awaitable[_R]]]], 

502 Callable[_P, _R], 

503]: 

504 ... 

505 

506 

507@overload 

508def async_to_sync( 

509 awaitable: Union[ 

510 Callable[_P, Coroutine[Any, Any, _R]], 

511 Callable[_P, Awaitable[_R]], 

512 ], 

513 *, 

514 force_new_loop: bool = False, 

515) -> Callable[_P, _R]: 

516 ... 

517 

518 

519def async_to_sync( 

520 awaitable: Optional[ 

521 Union[ 

522 Callable[_P, Coroutine[Any, Any, _R]], 

523 Callable[_P, Awaitable[_R]], 

524 ] 

525 ] = None, 

526 *, 

527 force_new_loop: bool = False, 

528) -> Union[ 

529 Callable[ 

530 [Union[Callable[_P, Coroutine[Any, Any, _R]], Callable[_P, Awaitable[_R]]]], 

531 Callable[_P, _R], 

532 ], 

533 Callable[_P, _R], 

534]: 

535 if awaitable is None: 

536 return lambda f: AsyncToSync( 

537 f, 

538 force_new_loop=force_new_loop, 

539 ) 

540 return AsyncToSync( 

541 awaitable, 

542 force_new_loop=force_new_loop, 

543 ) 

544 

545 

546@overload 

547def sync_to_async( 

548 *, 

549 thread_sensitive: bool = True, 

550 executor: Optional["ThreadPoolExecutor"] = None, 

551) -> Callable[[Callable[_P, _R]], Callable[_P, Coroutine[Any, Any, _R]]]: 

552 ... 

553 

554 

555@overload 

556def sync_to_async( 

557 func: Callable[_P, _R], 

558 *, 

559 thread_sensitive: bool = True, 

560 executor: Optional["ThreadPoolExecutor"] = None, 

561) -> Callable[_P, Coroutine[Any, Any, _R]]: 

562 ... 

563 

564 

565def sync_to_async( 

566 func: Optional[Callable[_P, _R]] = None, 

567 *, 

568 thread_sensitive: bool = True, 

569 executor: Optional["ThreadPoolExecutor"] = None, 

570) -> Union[ 

571 Callable[[Callable[_P, _R]], Callable[_P, Coroutine[Any, Any, _R]]], 

572 Callable[_P, Coroutine[Any, Any, _R]], 

573]: 

574 if func is None: 

575 return lambda f: SyncToAsync( 

576 f, 

577 thread_sensitive=thread_sensitive, 

578 executor=executor, 

579 ) 

580 return SyncToAsync( 

581 func, 

582 thread_sensitive=thread_sensitive, 

583 executor=executor, 

584 )