Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/asgiref/local.py: 62%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

60 statements  

1import asyncio 

2import contextlib 

3import contextvars 

4import threading 

5from typing import Any, Dict, Union 

6 

7 

8class _CVar: 

9 """Storage utility for Local.""" 

10 

11 def __init__(self) -> None: 

12 self._data: "contextvars.ContextVar[Dict[str, Any]]" = contextvars.ContextVar( 

13 "asgiref.local" 

14 ) 

15 

16 def __getattr__(self, key): 

17 storage_object = self._data.get({}) 

18 try: 

19 return storage_object[key] 

20 except KeyError: 

21 raise AttributeError(f"{self!r} object has no attribute {key!r}") 

22 

23 def __setattr__(self, key: str, value: Any) -> None: 

24 if key == "_data": 

25 return super().__setattr__(key, value) 

26 

27 storage_object = self._data.get({}).copy() 

28 storage_object[key] = value 

29 self._data.set(storage_object) 

30 

31 def __delattr__(self, key: str) -> None: 

32 storage_object = self._data.get({}).copy() 

33 if key in storage_object: 

34 del storage_object[key] 

35 self._data.set(storage_object) 

36 else: 

37 raise AttributeError(f"{self!r} object has no attribute {key!r}") 

38 

39 

40class Local: 

41 """Local storage for async tasks. 

42 

43 This is a namespace object (similar to `threading.local`) where data is 

44 also local to the current async task (if there is one). 

45 

46 In async threads, local means in the same sense as the `contextvars` 

47 module - i.e. a value set in an async frame will be visible: 

48 

49 - to other async code `await`-ed from this frame. 

50 - to tasks spawned using `asyncio` utilities (`create_task`, `wait_for`, 

51 `gather` and probably others). 

52 - to code scheduled in a sync thread using `sync_to_async` 

53 

54 In "sync" threads (a thread with no async event loop running), the 

55 data is thread-local, but additionally shared with async code executed 

56 via the `async_to_sync` utility, which schedules async code in a new thread 

57 and copies context across to that thread. 

58 

59 If `thread_critical` is True, then the local will only be visible per-thread, 

60 behaving exactly like `threading.local` if the thread is sync, and as 

61 `contextvars` if the thread is async. This allows genuinely thread-sensitive 

62 code (such as DB handles) to be kept stricly to their initial thread and 

63 disable the sharing across `sync_to_async` and `async_to_sync` wrapped calls. 

64 

65 Unlike plain `contextvars` objects, this utility is threadsafe. 

66 """ 

67 

68 def __init__(self, thread_critical: bool = False) -> None: 

69 self._thread_critical = thread_critical 

70 self._thread_lock = threading.RLock() 

71 

72 self._storage: "Union[threading.local, _CVar]" 

73 

74 if thread_critical: 

75 # Thread-local storage 

76 self._storage = threading.local() 

77 else: 

78 # Contextvar storage 

79 self._storage = _CVar() 

80 

81 @contextlib.contextmanager 

82 def _lock_storage(self): 

83 # Thread safe access to storage 

84 if self._thread_critical: 

85 is_async = True 

86 try: 

87 # this is a test for are we in a async or sync 

88 # thread - will raise RuntimeError if there is 

89 # no current loop 

90 asyncio.get_running_loop() 

91 except RuntimeError: 

92 is_async = False 

93 if not is_async: 

94 # We are in a sync thread, the storage is 

95 # just the plain thread local (i.e, "global within 

96 # this thread" - it doesn't matter where you are 

97 # in a call stack you see the same storage) 

98 yield self._storage 

99 else: 

100 # We are in an async thread - storage is still 

101 # local to this thread, but additionally should 

102 # behave like a context var (is only visible with 

103 # the same async call stack) 

104 

105 # Ensure context exists in the current thread 

106 if not hasattr(self._storage, "cvar"): 

107 self._storage.cvar = _CVar() 

108 

109 # self._storage is a thread local, so the members 

110 # can't be accessed in another thread (we don't 

111 # need any locks) 

112 yield self._storage.cvar 

113 else: 

114 # Lock for thread_critical=False as other threads 

115 # can access the exact same storage object 

116 with self._thread_lock: 

117 yield self._storage 

118 

119 def __getattr__(self, key): 

120 with self._lock_storage() as storage: 

121 return getattr(storage, key) 

122 

123 def __setattr__(self, key, value): 

124 if key in ("_local", "_storage", "_thread_critical", "_thread_lock"): 

125 return super().__setattr__(key, value) 

126 with self._lock_storage() as storage: 

127 setattr(storage, key, value) 

128 

129 def __delattr__(self, key): 

130 with self._lock_storage() as storage: 

131 delattr(storage, key)