Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/asgiref/local.py: 58%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

62 statements  

1import asyncio 

2import contextlib 

3import contextvars 

4import threading 

5from typing import Any, Union 

6 

7 

8class _CVar: 

9 """Storage utility for Local.""" 

10 

11 def __init__(self) -> None: 

12 self._data: dict[str, contextvars.ContextVar[Any]] = {} 

13 

14 def __getattr__(self, key: str) -> Any: 

15 try: 

16 var = self._data[key] 

17 except KeyError: 

18 raise AttributeError(f"{self!r} object has no attribute {key!r}") 

19 

20 try: 

21 return var.get() 

22 except LookupError: 

23 raise AttributeError(f"{self!r} object has no attribute {key!r}") 

24 

25 def __setattr__(self, key: str, value: Any) -> None: 

26 if key == "_data": 

27 return super().__setattr__(key, value) 

28 

29 var = self._data.get(key) 

30 if var is None: 

31 self._data[key] = var = contextvars.ContextVar(key) 

32 var.set(value) 

33 

34 def __delattr__(self, key: str) -> None: 

35 if key in self._data: 

36 del self._data[key] 

37 else: 

38 raise AttributeError(f"{self!r} object has no attribute {key!r}") 

39 

40 

41class Local: 

42 """Local storage for async tasks. 

43 

44 This is a namespace object (similar to `threading.local`) where data is 

45 also local to the current async task (if there is one). 

46 

47 In async threads, local means in the same sense as the `contextvars` 

48 module - i.e. a value set in an async frame will be visible: 

49 

50 - to other async code `await`-ed from this frame. 

51 - to tasks spawned using `asyncio` utilities (`create_task`, `wait_for`, 

52 `gather` and probably others). 

53 - to code scheduled in a sync thread using `sync_to_async` 

54 

55 In "sync" threads (a thread with no async event loop running), the 

56 data is thread-local, but additionally shared with async code executed 

57 via the `async_to_sync` utility, which schedules async code in a new thread 

58 and copies context across to that thread. 

59 

60 If `thread_critical` is True, then the local will only be visible per-thread, 

61 behaving exactly like `threading.local` if the thread is sync, and as 

62 `contextvars` if the thread is async. This allows genuinely thread-sensitive 

63 code (such as DB handles) to be kept stricly to their initial thread and 

64 disable the sharing across `sync_to_async` and `async_to_sync` wrapped calls. 

65 

66 Unlike plain `contextvars` objects, this utility is threadsafe. 

67 """ 

68 

69 def __init__(self, thread_critical: bool = False) -> None: 

70 self._thread_critical = thread_critical 

71 self._thread_lock = threading.RLock() 

72 

73 self._storage: "Union[threading.local, _CVar]" 

74 

75 if thread_critical: 

76 # Thread-local storage 

77 self._storage = threading.local() 

78 else: 

79 # Contextvar storage 

80 self._storage = _CVar() 

81 

82 @contextlib.contextmanager 

83 def _lock_storage(self): 

84 # Thread safe access to storage 

85 if self._thread_critical: 

86 is_async = True 

87 try: 

88 # this is a test for are we in a async or sync 

89 # thread - will raise RuntimeError if there is 

90 # no current loop 

91 asyncio.get_running_loop() 

92 except RuntimeError: 

93 is_async = False 

94 if not is_async: 

95 # We are in a sync thread, the storage is 

96 # just the plain thread local (i.e, "global within 

97 # this thread" - it doesn't matter where you are 

98 # in a call stack you see the same storage) 

99 yield self._storage 

100 else: 

101 # We are in an async thread - storage is still 

102 # local to this thread, but additionally should 

103 # behave like a context var (is only visible with 

104 # the same async call stack) 

105 

106 # Ensure context exists in the current thread 

107 if not hasattr(self._storage, "cvar"): 

108 self._storage.cvar = _CVar() 

109 

110 # self._storage is a thread local, so the members 

111 # can't be accessed in another thread (we don't 

112 # need any locks) 

113 yield self._storage.cvar 

114 else: 

115 # Lock for thread_critical=False as other threads 

116 # can access the exact same storage object 

117 with self._thread_lock: 

118 yield self._storage 

119 

120 def __getattr__(self, key): 

121 with self._lock_storage() as storage: 

122 return getattr(storage, key) 

123 

124 def __setattr__(self, key, value): 

125 if key in ("_local", "_storage", "_thread_critical", "_thread_lock"): 

126 return super().__setattr__(key, value) 

127 with self._lock_storage() as storage: 

128 setattr(storage, key, value) 

129 

130 def __delattr__(self, key): 

131 with self._lock_storage() as storage: 

132 delattr(storage, key)