Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/django/core/files/locks.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

61 statements  

1""" 

2Portable file locking utilities. 

3 

4Based partially on an example by Jonathan Feignberg in the Python 

5Cookbook [1] (licensed under the Python Software License) and a ctypes port by 

6Anatoly Techtonik for Roundup [2] (license [3]). 

7 

8[1] https://code.activestate.com/recipes/65203/ 

9[2] https://sourceforge.net/p/roundup/code/ci/default/tree/roundup/backends/portalocker.py # NOQA 

10[3] https://sourceforge.net/p/roundup/code/ci/default/tree/COPYING.txt 

11 

12Example Usage:: 

13 

14 >>> from django.core.files import locks 

15 >>> with open('./file', 'wb') as f: 

16 ... locks.lock(f, locks.LOCK_EX) 

17 ... f.write('Django') 

18""" 

19 

20import os 

21 

22__all__ = ("LOCK_EX", "LOCK_SH", "LOCK_NB", "lock", "unlock") 

23 

24 

25def _fd(f): 

26 """Get a filedescriptor from something which could be a file or an fd.""" 

27 return f.fileno() if hasattr(f, "fileno") else f 

28 

29 

30if os.name == "nt": 

31 import msvcrt 

32 from ctypes import ( 

33 POINTER, 

34 Structure, 

35 Union, 

36 WinDLL, 

37 byref, 

38 c_int64, 

39 c_ulong, 

40 c_void_p, 

41 sizeof, 

42 ) 

43 from ctypes.wintypes import BOOL, DWORD, HANDLE 

44 

45 LOCK_SH = 0 # the default 

46 LOCK_NB = 0x1 # LOCKFILE_FAIL_IMMEDIATELY 

47 LOCK_EX = 0x2 # LOCKFILE_EXCLUSIVE_LOCK 

48 

49 # --- Adapted from the pyserial project --- 

50 # detect size of ULONG_PTR 

51 if sizeof(c_ulong) != sizeof(c_void_p): 

52 ULONG_PTR = c_int64 

53 else: 

54 ULONG_PTR = c_ulong 

55 PVOID = c_void_p 

56 

57 # --- Union inside Structure by stackoverflow:3480240 --- 

58 class _OFFSET(Structure): 

59 _fields_ = [("Offset", DWORD), ("OffsetHigh", DWORD)] 

60 

61 class _OFFSET_UNION(Union): 

62 _anonymous_ = ["_offset"] 

63 _fields_ = [("_offset", _OFFSET), ("Pointer", PVOID)] 

64 

65 class OVERLAPPED(Structure): 

66 _anonymous_ = ["_offset_union"] 

67 _fields_ = [ 

68 ("Internal", ULONG_PTR), 

69 ("InternalHigh", ULONG_PTR), 

70 ("_offset_union", _OFFSET_UNION), 

71 ("hEvent", HANDLE), 

72 ] 

73 

74 LPOVERLAPPED = POINTER(OVERLAPPED) 

75 

76 # --- Define function prototypes for extra safety --- 

77 kernel32 = WinDLL("kernel32") 

78 LockFileEx = kernel32.LockFileEx 

79 LockFileEx.restype = BOOL 

80 LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED] 

81 UnlockFileEx = kernel32.UnlockFileEx 

82 UnlockFileEx.restype = BOOL 

83 UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED] 

84 

85 def lock(f, flags): 

86 hfile = msvcrt.get_osfhandle(_fd(f)) 

87 overlapped = OVERLAPPED() 

88 ret = LockFileEx(hfile, flags, 0, 0, 0xFFFF0000, byref(overlapped)) 

89 return bool(ret) 

90 

91 def unlock(f): 

92 hfile = msvcrt.get_osfhandle(_fd(f)) 

93 overlapped = OVERLAPPED() 

94 ret = UnlockFileEx(hfile, 0, 0, 0xFFFF0000, byref(overlapped)) 

95 return bool(ret) 

96 

97else: 

98 try: 

99 import fcntl 

100 

101 LOCK_SH = fcntl.LOCK_SH # shared lock 

102 LOCK_NB = fcntl.LOCK_NB # non-blocking 

103 LOCK_EX = fcntl.LOCK_EX 

104 except (ImportError, AttributeError): 

105 # File locking is not supported. 

106 LOCK_EX = LOCK_SH = LOCK_NB = 0 

107 

108 # Dummy functions that don't do anything. 

109 def lock(f, flags): 

110 # File is not locked 

111 return False 

112 

113 def unlock(f): 

114 # File is unlocked 

115 return True 

116 

117 else: 

118 

119 def lock(f, flags): 

120 try: 

121 fcntl.flock(_fd(f), flags) 

122 return True 

123 except BlockingIOError: 

124 return False 

125 

126 def unlock(f): 

127 fcntl.flock(_fd(f), fcntl.LOCK_UN) 

128 return True