Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/filelock/_windows.py: 30%
40 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:11 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:11 +0000
1from __future__ import annotations
3import os
4import sys
5from errno import EACCES
6from typing import cast
8from ._api import BaseFileLock
9from ._util import raise_on_not_writable_file
11if sys.platform == "win32": # pragma: win32 cover
12 import msvcrt
14 class WindowsFileLock(BaseFileLock):
15 """Uses the :func:`msvcrt.locking` function to hard lock the lock file on Windows systems."""
17 def _acquire(self) -> None:
18 raise_on_not_writable_file(self.lock_file)
19 flags = (
20 os.O_RDWR # open for read and write
21 | os.O_CREAT # create file if not exists
22 | os.O_TRUNC # truncate file if not empty
23 )
24 try:
25 fd = os.open(self.lock_file, flags, self._context.mode)
26 except OSError as exception:
27 if exception.errno != EACCES: # has no access to this lock
28 raise
29 else:
30 try:
31 msvcrt.locking(fd, msvcrt.LK_NBLCK, 1)
32 except OSError as exception:
33 os.close(fd) # close file first
34 if exception.errno != EACCES: # file is already locked
35 raise
36 else:
37 self._context.lock_file_fd = fd
39 def _release(self) -> None:
40 fd = cast(int, self._context.lock_file_fd)
41 self._context.lock_file_fd = None
42 msvcrt.locking(fd, msvcrt.LK_UNLCK, 1)
43 os.close(fd)
45 try:
46 os.remove(self.lock_file)
47 # Probably another instance of the application hat acquired the file lock.
48 except OSError:
49 pass
51else: # pragma: win32 no cover
53 class WindowsFileLock(BaseFileLock):
54 """Uses the :func:`msvcrt.locking` function to hard lock the lock file on Windows systems."""
56 def _acquire(self) -> None:
57 raise NotImplementedError
59 def _release(self) -> None:
60 raise NotImplementedError
63__all__ = [
64 "WindowsFileLock",
65]