Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/filelock/_unix.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

72 statements  

1from __future__ import annotations 

2 

3import os 

4import sys 

5import warnings 

6from contextlib import suppress 

7from errno import EAGAIN, ENOSYS, EWOULDBLOCK 

8from pathlib import Path 

9from typing import cast 

10 

11from ._api import BaseFileLock 

12from ._util import ensure_directory_exists 

13 

14#: a flag to indicate if the fcntl API is available 

15has_fcntl = False 

16if sys.platform == "win32": # pragma: win32 cover 

17 

18 class UnixFileLock(BaseFileLock): 

19 """Uses the :func:`fcntl.flock` to hard lock the lock file on unix systems.""" 

20 

21 def _acquire(self) -> None: 

22 raise NotImplementedError 

23 

24 def _release(self) -> None: 

25 raise NotImplementedError 

26 

27else: # pragma: win32 no cover 

28 try: 

29 import fcntl 

30 

31 _ = (fcntl.flock, fcntl.LOCK_EX, fcntl.LOCK_NB, fcntl.LOCK_UN) 

32 except (ImportError, AttributeError): 

33 pass 

34 else: 

35 has_fcntl = True 

36 

37 class UnixFileLock(BaseFileLock): 

38 """Uses the :func:`fcntl.flock` to hard lock the lock file on unix systems.""" 

39 

40 def _acquire(self) -> None: # noqa: C901 

41 ensure_directory_exists(self.lock_file) 

42 open_flags = os.O_RDWR | os.O_TRUNC 

43 o_nofollow = getattr(os, "O_NOFOLLOW", None) 

44 if o_nofollow is not None: 

45 open_flags |= o_nofollow 

46 open_flags |= os.O_CREAT 

47 open_mode = self._open_mode() 

48 try: 

49 fd = os.open(self.lock_file, open_flags, open_mode) 

50 except PermissionError: 

51 # Sticky-bit dirs (e.g. /tmp): O_CREAT fails if the file is owned by another user (#317). 

52 # Fall back to opening the existing file without O_CREAT. 

53 if not Path(self.lock_file).exists(): 

54 raise 

55 try: 

56 fd = os.open(self.lock_file, open_flags & ~os.O_CREAT, open_mode) 

57 except FileNotFoundError: 

58 return 

59 if self.has_explicit_mode: 

60 with suppress(PermissionError): 

61 os.fchmod(fd, self._context.mode) 

62 try: 

63 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) 

64 except OSError as exception: 

65 os.close(fd) 

66 if exception.errno == ENOSYS: 

67 with suppress(OSError): 

68 Path(self.lock_file).unlink() 

69 self._fallback_to_soft_lock() 

70 self._acquire() 

71 return 

72 if exception.errno not in {EAGAIN, EWOULDBLOCK}: 

73 raise 

74 else: 

75 # The file may have been unlinked by a concurrent _release() between our open() and flock(). 

76 # A lock on an unlinked inode is useless — discard and let the retry loop start fresh. 

77 if os.fstat(fd).st_nlink == 0: 

78 os.close(fd) 

79 else: 

80 self._context.lock_file_fd = fd 

81 

82 def _fallback_to_soft_lock(self) -> None: 

83 from ._soft import SoftFileLock # noqa: PLC0415 

84 

85 warnings.warn("flock not supported on this filesystem, falling back to SoftFileLock", stacklevel=2) 

86 from .asyncio import AsyncSoftFileLock, BaseAsyncFileLock # noqa: PLC0415 

87 

88 self.__class__ = AsyncSoftFileLock if isinstance(self, BaseAsyncFileLock) else SoftFileLock 

89 

90 def _release(self) -> None: 

91 fd = cast("int", self._context.lock_file_fd) 

92 self._context.lock_file_fd = None 

93 with suppress(OSError): 

94 Path(self.lock_file).unlink() 

95 fcntl.flock(fd, fcntl.LOCK_UN) 

96 os.close(fd) 

97 

98 

99__all__ = [ 

100 "UnixFileLock", 

101 "has_fcntl", 

102]