Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/filelock/_unix.py: 71%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

45 statements  

1from __future__ import annotations 

2 

3import os 

4import sys 

5from contextlib import suppress 

6from errno import ENOSYS 

7from pathlib import Path 

8from typing import cast 

9 

10from ._api import BaseFileLock 

11from ._util import ensure_directory_exists 

12 

13#: a flag to indicate if the fcntl API is available 

14has_fcntl = False 

15if sys.platform == "win32": # pragma: win32 cover 

16 

17 class UnixFileLock(BaseFileLock): 

18 """Uses the :func:`fcntl.flock` to hard lock the lock file on unix systems.""" 

19 

20 def _acquire(self) -> None: 

21 raise NotImplementedError 

22 

23 def _release(self) -> None: 

24 raise NotImplementedError 

25 

26else: # pragma: win32 no cover 

27 try: 

28 import fcntl 

29 

30 _ = (fcntl.flock, fcntl.LOCK_EX, fcntl.LOCK_NB, fcntl.LOCK_UN) 

31 except (ImportError, AttributeError): 

32 pass 

33 else: 

34 has_fcntl = True 

35 

36 class UnixFileLock(BaseFileLock): 

37 """Uses the :func:`fcntl.flock` to hard lock the lock file on unix systems.""" 

38 

39 def _acquire(self) -> None: 

40 ensure_directory_exists(self.lock_file) 

41 open_flags = os.O_RDWR | os.O_TRUNC 

42 if not Path(self.lock_file).exists(): 

43 open_flags |= os.O_CREAT 

44 fd = os.open(self.lock_file, open_flags, self._context.mode) 

45 with suppress(PermissionError): # This locked is not owned by this UID 

46 os.fchmod(fd, self._context.mode) 

47 try: 

48 fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) 

49 except OSError as exception: 

50 os.close(fd) 

51 if exception.errno == ENOSYS: # NotImplemented error 

52 msg = "FileSystem does not appear to support flock; use SoftFileLock instead" 

53 raise NotImplementedError(msg) from exception 

54 else: 

55 self._context.lock_file_fd = fd 

56 

57 def _release(self) -> None: 

58 # Do not remove the lockfile: 

59 # https://github.com/tox-dev/py-filelock/issues/31 

60 # https://stackoverflow.com/questions/17708885/flock-removing-locked-file-without-race-condition 

61 fd = cast("int", self._context.lock_file_fd) 

62 self._context.lock_file_fd = None 

63 fcntl.flock(fd, fcntl.LOCK_UN) 

64 os.close(fd) 

65 

66 

67__all__ = [ 

68 "UnixFileLock", 

69 "has_fcntl", 

70]