1"""
2Portable file locking utilities.
3
4Based partially on an example by Jonathan Feignberg in the Python
5Cookbook [1] (licensed under the Python Software License) and a ctypes port by
6Anatoly Techtonik for Roundup [2] (license [3]).
7
8[1] https://code.activestate.com/recipes/65203/
9[2] https://sourceforge.net/p/roundup/code/ci/default/tree/roundup/backends/portalocker.py # NOQA
10[3] https://sourceforge.net/p/roundup/code/ci/default/tree/COPYING.txt
11
12Example Usage::
13
14 >>> from django.core.files import locks
15 >>> with open('./file', 'wb') as f:
16 ... locks.lock(f, locks.LOCK_EX)
17 ... f.write('Django')
18"""
19
20import os
21
22__all__ = ("LOCK_EX", "LOCK_SH", "LOCK_NB", "lock", "unlock")
23
24
25def _fd(f):
26 """Get a filedescriptor from something which could be a file or an fd."""
27 return f.fileno() if hasattr(f, "fileno") else f
28
29
30if os.name == "nt":
31 import msvcrt
32 from ctypes import (
33 POINTER,
34 Structure,
35 Union,
36 WinDLL,
37 byref,
38 c_int64,
39 c_ulong,
40 c_void_p,
41 sizeof,
42 )
43 from ctypes.wintypes import BOOL, DWORD, HANDLE
44
45 LOCK_SH = 0 # the default
46 LOCK_NB = 0x1 # LOCKFILE_FAIL_IMMEDIATELY
47 LOCK_EX = 0x2 # LOCKFILE_EXCLUSIVE_LOCK
48
49 # --- Adapted from the pyserial project ---
50 # detect size of ULONG_PTR
51 if sizeof(c_ulong) != sizeof(c_void_p):
52 ULONG_PTR = c_int64
53 else:
54 ULONG_PTR = c_ulong
55 PVOID = c_void_p
56
57 # --- Union inside Structure by stackoverflow:3480240 ---
58 class _OFFSET(Structure):
59 _fields_ = [("Offset", DWORD), ("OffsetHigh", DWORD)]
60
61 class _OFFSET_UNION(Union):
62 _anonymous_ = ["_offset"]
63 _fields_ = [("_offset", _OFFSET), ("Pointer", PVOID)]
64
65 class OVERLAPPED(Structure):
66 _anonymous_ = ["_offset_union"]
67 _fields_ = [
68 ("Internal", ULONG_PTR),
69 ("InternalHigh", ULONG_PTR),
70 ("_offset_union", _OFFSET_UNION),
71 ("hEvent", HANDLE),
72 ]
73
74 LPOVERLAPPED = POINTER(OVERLAPPED)
75
76 # --- Define function prototypes for extra safety ---
77 kernel32 = WinDLL("kernel32")
78 LockFileEx = kernel32.LockFileEx
79 LockFileEx.restype = BOOL
80 LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED]
81 UnlockFileEx = kernel32.UnlockFileEx
82 UnlockFileEx.restype = BOOL
83 UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED]
84
85 def lock(f, flags):
86 hfile = msvcrt.get_osfhandle(_fd(f))
87 overlapped = OVERLAPPED()
88 ret = LockFileEx(hfile, flags, 0, 0, 0xFFFF0000, byref(overlapped))
89 return bool(ret)
90
91 def unlock(f):
92 hfile = msvcrt.get_osfhandle(_fd(f))
93 overlapped = OVERLAPPED()
94 ret = UnlockFileEx(hfile, 0, 0, 0xFFFF0000, byref(overlapped))
95 return bool(ret)
96
97else:
98 try:
99 import fcntl
100
101 LOCK_SH = fcntl.LOCK_SH # shared lock
102 LOCK_NB = fcntl.LOCK_NB # non-blocking
103 LOCK_EX = fcntl.LOCK_EX
104 except (ImportError, AttributeError):
105 # File locking is not supported.
106 LOCK_EX = LOCK_SH = LOCK_NB = 0
107
108 # Dummy functions that don't do anything.
109 def lock(f, flags):
110 # File is not locked
111 return False
112
113 def unlock(f):
114 # File is unlocked
115 return True
116
117 else:
118
119 def lock(f, flags):
120 try:
121 fcntl.flock(_fd(f), flags)
122 return True
123 except BlockingIOError:
124 return False
125
126 def unlock(f):
127 fcntl.flock(_fd(f), fcntl.LOCK_UN)
128 return True