Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/asgiref/sync.py: 2%

241 statements  

« prev     ^ index     » next       coverage.py v7.0.5, created at 2023-01-17 06:13 +0000

1import asyncio 

2import asyncio.coroutines 

3import contextvars 

4import functools 

5import inspect 

6import os 

7import sys 

8import threading 

9import warnings 

10import weakref 

11from concurrent.futures import Future, ThreadPoolExecutor 

12from typing import Any, Callable, Dict, Optional, overload 

13 

14from .current_thread_executor import CurrentThreadExecutor 

15from .local import Local 

16 

17 

18def _restore_context(context): 

19 # Check for changes in contextvars, and set them to the current 

20 # context for downstream consumers 

21 for cvar in context: 

22 try: 

23 if cvar.get() != context.get(cvar): 

24 cvar.set(context.get(cvar)) 

25 except LookupError: 

26 cvar.set(context.get(cvar)) 

27 

28 

29# Python 3.12 deprecates asyncio.iscoroutinefunction() as an alias for 

30# inspect.iscoroutinefunction(), whilst also removing the _is_coroutine marker. 

31# The latter is replaced with the inspect.markcoroutinefunction decorator. 

32# Until 3.12 is the minimum supported Python version, provide a shim. 

33# Django 4.0 only supports 3.8+, so don't concern with the _or_partial backport. 

34 

35# Type hint: should be generic: whatever T it takes it returns. (Same id) 

36def markcoroutinefunction(func: Any) -> Any: 

37 if hasattr(inspect, "markcoroutinefunction"): 

38 return inspect.markcoroutinefunction(func) 

39 else: 

40 func._is_coroutine = asyncio.coroutines._is_coroutine # type: ignore 

41 return func 

42 

43 

44def iscoroutinefunction(func: Any) -> bool: 

45 if hasattr(inspect, "markcoroutinefunction"): 

46 return inspect.iscoroutinefunction(func) 

47 else: 

48 return asyncio.iscoroutinefunction(func) 

49 

50 

51def _iscoroutinefunction_or_partial(func: Any) -> bool: 

52 # Python < 3.8 does not correctly determine partially wrapped 

53 # coroutine functions are coroutine functions, hence the need for 

54 # this to exist. Code taken from CPython. 

55 if sys.version_info >= (3, 8): 

56 return iscoroutinefunction(func) 

57 else: 

58 while inspect.ismethod(func): 

59 func = func.__func__ 

60 while isinstance(func, functools.partial): 

61 func = func.func 

62 

63 return iscoroutinefunction(func) 

64 

65 

66class ThreadSensitiveContext: 

67 """Async context manager to manage context for thread sensitive mode 

68 

69 This context manager controls which thread pool executor is used when in 

70 thread sensitive mode. By default, a single thread pool executor is shared 

71 within a process. 

72 

73 In Python 3.7+, the ThreadSensitiveContext() context manager may be used to 

74 specify a thread pool per context. 

75 

76 This context manager is re-entrant, so only the outer-most call to 

77 ThreadSensitiveContext will set the context. 

78 

79 Usage: 

80 

81 >>> import time 

82 >>> async with ThreadSensitiveContext(): 

83 ... await sync_to_async(time.sleep, 1)() 

84 """ 

85 

86 def __init__(self): 

87 self.token = None 

88 

89 async def __aenter__(self): 

90 try: 

91 SyncToAsync.thread_sensitive_context.get() 

92 except LookupError: 

93 self.token = SyncToAsync.thread_sensitive_context.set(self) 

94 

95 return self 

96 

97 async def __aexit__(self, exc, value, tb): 

98 if not self.token: 

99 return 

100 

101 executor = SyncToAsync.context_to_thread_executor.pop(self, None) 

102 if executor: 

103 executor.shutdown() 

104 SyncToAsync.thread_sensitive_context.reset(self.token) 

105 

106 

107class AsyncToSync: 

108 """ 

109 Utility class which turns an awaitable that only works on the thread with 

110 the event loop into a synchronous callable that works in a subthread. 

111 

112 If the call stack contains an async loop, the code runs there. 

113 Otherwise, the code runs in a new loop in a new thread. 

114 

115 Either way, this thread then pauses and waits to run any thread_sensitive 

116 code called from further down the call stack using SyncToAsync, before 

117 finally exiting once the async task returns. 

118 """ 

119 

120 # Maps launched Tasks to the threads that launched them (for locals impl) 

121 launch_map: "Dict[asyncio.Task[object], threading.Thread]" = {} 

122 

123 # Keeps track of which CurrentThreadExecutor to use. This uses an asgiref 

124 # Local, not a threadlocal, so that tasks can work out what their parent used. 

125 executors = Local() 

126 

127 # When we can't find a CurrentThreadExecutor from the context, such as 

128 # inside create_task, we'll look it up here from the running event loop. 

129 loop_thread_executors: "Dict[asyncio.AbstractEventLoop, CurrentThreadExecutor]" = {} 

130 

131 def __init__(self, awaitable, force_new_loop=False): 

132 if not callable(awaitable) or ( 

133 not _iscoroutinefunction_or_partial(awaitable) 

134 and not _iscoroutinefunction_or_partial( 

135 getattr(awaitable, "__call__", awaitable) 

136 ) 

137 ): 

138 # Python does not have very reliable detection of async functions 

139 # (lots of false negatives) so this is just a warning. 

140 warnings.warn( 

141 "async_to_sync was passed a non-async-marked callable", stacklevel=2 

142 ) 

143 self.awaitable = awaitable 

144 try: 

145 self.__self__ = self.awaitable.__self__ 

146 except AttributeError: 

147 pass 

148 if force_new_loop: 

149 # They have asked that we always run in a new sub-loop. 

150 self.main_event_loop = None 

151 else: 

152 try: 

153 self.main_event_loop = asyncio.get_running_loop() 

154 except RuntimeError: 

155 # There's no event loop in this thread. Look for the threadlocal if 

156 # we're inside SyncToAsync 

157 main_event_loop_pid = getattr( 

158 SyncToAsync.threadlocal, "main_event_loop_pid", None 

159 ) 

160 # We make sure the parent loop is from the same process - if 

161 # they've forked, this is not going to be valid any more (#194) 

162 if main_event_loop_pid and main_event_loop_pid == os.getpid(): 

163 self.main_event_loop = getattr( 

164 SyncToAsync.threadlocal, "main_event_loop", None 

165 ) 

166 else: 

167 self.main_event_loop = None 

168 

169 def __call__(self, *args, **kwargs): 

170 # You can't call AsyncToSync from a thread with a running event loop 

171 try: 

172 event_loop = asyncio.get_running_loop() 

173 except RuntimeError: 

174 pass 

175 else: 

176 if event_loop.is_running(): 

177 raise RuntimeError( 

178 "You cannot use AsyncToSync in the same thread as an async event loop - " 

179 "just await the async function directly." 

180 ) 

181 

182 # Wrapping context in list so it can be reassigned from within 

183 # `main_wrap`. 

184 context = [contextvars.copy_context()] 

185 

186 # Make a future for the return information 

187 call_result = Future() 

188 # Get the source thread 

189 source_thread = threading.current_thread() 

190 # Make a CurrentThreadExecutor we'll use to idle in this thread - we 

191 # need one for every sync frame, even if there's one above us in the 

192 # same thread. 

193 if hasattr(self.executors, "current"): 

194 old_current_executor = self.executors.current 

195 else: 

196 old_current_executor = None 

197 current_executor = CurrentThreadExecutor() 

198 self.executors.current = current_executor 

199 loop = None 

200 # Use call_soon_threadsafe to schedule a synchronous callback on the 

201 # main event loop's thread if it's there, otherwise make a new loop 

202 # in this thread. 

203 try: 

204 awaitable = self.main_wrap( 

205 args, kwargs, call_result, source_thread, sys.exc_info(), context 

206 ) 

207 

208 if not (self.main_event_loop and self.main_event_loop.is_running()): 

209 # Make our own event loop - in a new thread - and run inside that. 

210 loop = asyncio.new_event_loop() 

211 self.loop_thread_executors[loop] = current_executor 

212 loop_executor = ThreadPoolExecutor(max_workers=1) 

213 loop_future = loop_executor.submit( 

214 self._run_event_loop, loop, awaitable 

215 ) 

216 if current_executor: 

217 # Run the CurrentThreadExecutor until the future is done 

218 current_executor.run_until_future(loop_future) 

219 # Wait for future and/or allow for exception propagation 

220 loop_future.result() 

221 else: 

222 # Call it inside the existing loop 

223 self.main_event_loop.call_soon_threadsafe( 

224 self.main_event_loop.create_task, awaitable 

225 ) 

226 if current_executor: 

227 # Run the CurrentThreadExecutor until the future is done 

228 current_executor.run_until_future(call_result) 

229 finally: 

230 # Clean up any executor we were running 

231 if loop is not None: 

232 del self.loop_thread_executors[loop] 

233 if hasattr(self.executors, "current"): 

234 del self.executors.current 

235 if old_current_executor: 

236 self.executors.current = old_current_executor 

237 _restore_context(context[0]) 

238 

239 # Wait for results from the future. 

240 return call_result.result() 

241 

242 def _run_event_loop(self, loop, coro): 

243 """ 

244 Runs the given event loop (designed to be called in a thread). 

245 """ 

246 asyncio.set_event_loop(loop) 

247 try: 

248 loop.run_until_complete(coro) 

249 finally: 

250 try: 

251 # mimic asyncio.run() behavior 

252 # cancel unexhausted async generators 

253 tasks = asyncio.all_tasks(loop) 

254 for task in tasks: 

255 task.cancel() 

256 

257 async def gather(): 

258 await asyncio.gather(*tasks, return_exceptions=True) 

259 

260 loop.run_until_complete(gather()) 

261 for task in tasks: 

262 if task.cancelled(): 

263 continue 

264 if task.exception() is not None: 

265 loop.call_exception_handler( 

266 { 

267 "message": "unhandled exception during loop shutdown", 

268 "exception": task.exception(), 

269 "task": task, 

270 } 

271 ) 

272 if hasattr(loop, "shutdown_asyncgens"): 

273 loop.run_until_complete(loop.shutdown_asyncgens()) 

274 finally: 

275 loop.close() 

276 asyncio.set_event_loop(self.main_event_loop) 

277 

278 def __get__(self, parent, objtype): 

279 """ 

280 Include self for methods 

281 """ 

282 func = functools.partial(self.__call__, parent) 

283 return functools.update_wrapper(func, self.awaitable) 

284 

285 async def main_wrap( 

286 self, args, kwargs, call_result, source_thread, exc_info, context 

287 ): 

288 """ 

289 Wraps the awaitable with something that puts the result into the 

290 result/exception future. 

291 """ 

292 if context is not None: 

293 _restore_context(context[0]) 

294 

295 current_task = SyncToAsync.get_current_task() 

296 self.launch_map[current_task] = source_thread 

297 try: 

298 # If we have an exception, run the function inside the except block 

299 # after raising it so exc_info is correctly populated. 

300 if exc_info[1]: 

301 try: 

302 raise exc_info[1] 

303 except BaseException: 

304 result = await self.awaitable(*args, **kwargs) 

305 else: 

306 result = await self.awaitable(*args, **kwargs) 

307 except BaseException as e: 

308 call_result.set_exception(e) 

309 else: 

310 call_result.set_result(result) 

311 finally: 

312 del self.launch_map[current_task] 

313 

314 context[0] = contextvars.copy_context() 

315 

316 

317class SyncToAsync: 

318 """ 

319 Utility class which turns a synchronous callable into an awaitable that 

320 runs in a threadpool. It also sets a threadlocal inside the thread so 

321 calls to AsyncToSync can escape it. 

322 

323 If thread_sensitive is passed, the code will run in the same thread as any 

324 outer code. This is needed for underlying Python code that is not 

325 threadsafe (for example, code which handles SQLite database connections). 

326 

327 If the outermost program is async (i.e. SyncToAsync is outermost), then 

328 this will be a dedicated single sub-thread that all sync code runs in, 

329 one after the other. If the outermost program is sync (i.e. AsyncToSync is 

330 outermost), this will just be the main thread. This is achieved by idling 

331 with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent, 

332 rather than just blocking. 

333 

334 If executor is passed in, that will be used instead of the loop's default executor. 

335 In order to pass in an executor, thread_sensitive must be set to False, otherwise 

336 a TypeError will be raised. 

337 """ 

338 

339 # Maps launched threads to the coroutines that spawned them 

340 launch_map: "Dict[threading.Thread, asyncio.Task[object]]" = {} 

341 

342 # Storage for main event loop references 

343 threadlocal = threading.local() 

344 

345 # Single-thread executor for thread-sensitive code 

346 single_thread_executor = ThreadPoolExecutor(max_workers=1) 

347 

348 # Maintain a contextvar for the current execution context. Optionally used 

349 # for thread sensitive mode. 

350 thread_sensitive_context: "contextvars.ContextVar[str]" = contextvars.ContextVar( 

351 "thread_sensitive_context" 

352 ) 

353 

354 # Contextvar that is used to detect if the single thread executor 

355 # would be awaited on while already being used in the same context 

356 deadlock_context: "contextvars.ContextVar[bool]" = contextvars.ContextVar( 

357 "deadlock_context" 

358 ) 

359 

360 # Maintaining a weak reference to the context ensures that thread pools are 

361 # erased once the context goes out of scope. This terminates the thread pool. 

362 context_to_thread_executor: "weakref.WeakKeyDictionary[object, ThreadPoolExecutor]" = ( 

363 weakref.WeakKeyDictionary() 

364 ) 

365 

366 def __init__( 

367 self, 

368 func: Callable[..., Any], 

369 thread_sensitive: bool = True, 

370 executor: Optional["ThreadPoolExecutor"] = None, 

371 ) -> None: 

372 if ( 

373 not callable(func) 

374 or _iscoroutinefunction_or_partial(func) 

375 or _iscoroutinefunction_or_partial(getattr(func, "__call__", func)) 

376 ): 

377 raise TypeError("sync_to_async can only be applied to sync functions.") 

378 self.func = func 

379 functools.update_wrapper(self, func) 

380 self._thread_sensitive = thread_sensitive 

381 markcoroutinefunction(self) 

382 if thread_sensitive and executor is not None: 

383 raise TypeError("executor must not be set when thread_sensitive is True") 

384 self._executor = executor 

385 try: 

386 self.__self__ = func.__self__ # type: ignore 

387 except AttributeError: 

388 pass 

389 

390 async def __call__(self, *args, **kwargs): 

391 loop = asyncio.get_running_loop() 

392 

393 # Work out what thread to run the code in 

394 if self._thread_sensitive: 

395 if hasattr(AsyncToSync.executors, "current"): 

396 # If we have a parent sync thread above somewhere, use that 

397 executor = AsyncToSync.executors.current 

398 elif self.thread_sensitive_context and self.thread_sensitive_context.get( 

399 None 

400 ): 

401 # If we have a way of retrieving the current context, attempt 

402 # to use a per-context thread pool executor 

403 thread_sensitive_context = self.thread_sensitive_context.get() 

404 

405 if thread_sensitive_context in self.context_to_thread_executor: 

406 # Re-use thread executor in current context 

407 executor = self.context_to_thread_executor[thread_sensitive_context] 

408 else: 

409 # Create new thread executor in current context 

410 executor = ThreadPoolExecutor(max_workers=1) 

411 self.context_to_thread_executor[thread_sensitive_context] = executor 

412 elif loop in AsyncToSync.loop_thread_executors: 

413 # Re-use thread executor for running loop 

414 executor = AsyncToSync.loop_thread_executors[loop] 

415 elif self.deadlock_context and self.deadlock_context.get(False): 

416 raise RuntimeError( 

417 "Single thread executor already being used, would deadlock" 

418 ) 

419 else: 

420 # Otherwise, we run it in a fixed single thread 

421 executor = self.single_thread_executor 

422 if self.deadlock_context: 

423 self.deadlock_context.set(True) 

424 else: 

425 # Use the passed in executor, or the loop's default if it is None 

426 executor = self._executor 

427 

428 context = contextvars.copy_context() 

429 child = functools.partial(self.func, *args, **kwargs) 

430 func = context.run 

431 args = (child,) 

432 kwargs = {} 

433 

434 try: 

435 # Run the code in the right thread 

436 future = loop.run_in_executor( 

437 executor, 

438 functools.partial( 

439 self.thread_handler, 

440 loop, 

441 self.get_current_task(), 

442 sys.exc_info(), 

443 func, 

444 *args, 

445 **kwargs, 

446 ), 

447 ) 

448 ret = await asyncio.wait_for(future, timeout=None) 

449 

450 finally: 

451 _restore_context(context) 

452 if self.deadlock_context: 

453 self.deadlock_context.set(False) 

454 

455 return ret 

456 

457 def __get__(self, parent, objtype): 

458 """ 

459 Include self for methods 

460 """ 

461 func = functools.partial(self.__call__, parent) 

462 return functools.update_wrapper(func, self.func) 

463 

464 def thread_handler(self, loop, source_task, exc_info, func, *args, **kwargs): 

465 """ 

466 Wraps the sync application with exception handling. 

467 """ 

468 # Set the threadlocal for AsyncToSync 

469 self.threadlocal.main_event_loop = loop 

470 self.threadlocal.main_event_loop_pid = os.getpid() 

471 # Set the task mapping (used for the locals module) 

472 current_thread = threading.current_thread() 

473 if AsyncToSync.launch_map.get(source_task) == current_thread: 

474 # Our parent task was launched from this same thread, so don't make 

475 # a launch map entry - let it shortcut over us! (and stop infinite loops) 

476 parent_set = False 

477 else: 

478 self.launch_map[current_thread] = source_task 

479 parent_set = True 

480 # Run the function 

481 try: 

482 # If we have an exception, run the function inside the except block 

483 # after raising it so exc_info is correctly populated. 

484 if exc_info[1]: 

485 try: 

486 raise exc_info[1] 

487 except BaseException: 

488 return func(*args, **kwargs) 

489 else: 

490 return func(*args, **kwargs) 

491 finally: 

492 # Only delete the launch_map parent if we set it, otherwise it is 

493 # from someone else. 

494 if parent_set: 

495 del self.launch_map[current_thread] 

496 

497 @staticmethod 

498 def get_current_task(): 

499 """ 

500 Implementation of asyncio.current_task() 

501 that returns None if there is no task. 

502 """ 

503 try: 

504 return asyncio.current_task() 

505 except RuntimeError: 

506 return None 

507 

508 

509# Lowercase aliases (and decorator friendliness) 

510async_to_sync = AsyncToSync 

511 

512 

513@overload 

514def sync_to_async( 

515 func: None = None, 

516 thread_sensitive: bool = True, 

517 executor: Optional["ThreadPoolExecutor"] = None, 

518) -> Callable[[Callable[..., Any]], SyncToAsync]: 

519 ... 

520 

521 

522@overload 

523def sync_to_async( 

524 func: Callable[..., Any], 

525 thread_sensitive: bool = True, 

526 executor: Optional["ThreadPoolExecutor"] = None, 

527) -> SyncToAsync: 

528 ... 

529 

530 

531def sync_to_async( 

532 func=None, 

533 thread_sensitive=True, 

534 executor=None, 

535): 

536 if func is None: 

537 return lambda f: SyncToAsync( 

538 f, 

539 thread_sensitive=thread_sensitive, 

540 executor=executor, 

541 ) 

542 return SyncToAsync( 

543 func, 

544 thread_sensitive=thread_sensitive, 

545 executor=executor, 

546 )