Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/file.py: 64%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# file.py -- Safe access to git files
2# Copyright (C) 2010 Google, Inc.
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Safe access to git files."""
24import os
25import sys
26import warnings
27from collections.abc import Iterator
28from types import TracebackType
29from typing import IO, Any, ClassVar, Literal, Optional, Union, overload
32def ensure_dir_exists(dirname: Union[str, bytes, os.PathLike]) -> None:
33 """Ensure a directory exists, creating if necessary."""
34 try:
35 os.makedirs(dirname)
36 except FileExistsError:
37 pass
40def _fancy_rename(oldname: Union[str, bytes], newname: Union[str, bytes]) -> None:
41 """Rename file with temporary backup file to rollback if rename fails."""
42 if not os.path.exists(newname):
43 os.rename(oldname, newname)
44 return
46 # Defer the tempfile import since it pulls in a lot of other things.
47 import tempfile
49 # destination file exists
50 (fd, tmpfile) = tempfile.mkstemp(".tmp", prefix=str(oldname), dir=".")
51 os.close(fd)
52 os.remove(tmpfile)
53 os.rename(newname, tmpfile)
54 try:
55 os.rename(oldname, newname)
56 except OSError:
57 os.rename(tmpfile, newname)
58 raise
59 os.remove(tmpfile)
62@overload
63def GitFile(
64 filename: Union[str, bytes, os.PathLike],
65 mode: Literal["wb"],
66 bufsize: int = -1,
67 mask: int = 0o644,
68) -> "_GitFile": ...
71@overload
72def GitFile(
73 filename: Union[str, bytes, os.PathLike],
74 mode: Literal["rb"] = "rb",
75 bufsize: int = -1,
76 mask: int = 0o644,
77) -> IO[bytes]: ...
80@overload
81def GitFile(
82 filename: Union[str, bytes, os.PathLike],
83 mode: str = "rb",
84 bufsize: int = -1,
85 mask: int = 0o644,
86) -> Union[IO[bytes], "_GitFile"]: ...
89def GitFile(
90 filename: Union[str, bytes, os.PathLike],
91 mode: str = "rb",
92 bufsize: int = -1,
93 mask: int = 0o644,
94) -> Union[IO[bytes], "_GitFile"]:
95 """Create a file object that obeys the git file locking protocol.
97 Returns: a builtin file object or a _GitFile object
99 Note: See _GitFile for a description of the file locking protocol.
101 Only read-only and write-only (binary) modes are supported; r+, w+, and a
102 are not. To read and write from the same file, you can take advantage of
103 the fact that opening a file for write does not actually open the file you
104 request.
106 The default file mask makes any created files user-writable and
107 world-readable.
109 """
110 if "a" in mode:
111 raise OSError("append mode not supported for Git files")
112 if "+" in mode:
113 raise OSError("read/write mode not supported for Git files")
114 if "b" not in mode:
115 raise OSError("text mode not supported for Git files")
116 if "w" in mode:
117 return _GitFile(filename, mode, bufsize, mask)
118 else:
119 return open(filename, mode, bufsize)
122class FileLocked(Exception):
123 """File is already locked."""
125 def __init__(
126 self, filename: Union[str, bytes, os.PathLike], lockfilename: Union[str, bytes]
127 ) -> None:
128 self.filename = filename
129 self.lockfilename = lockfilename
130 super().__init__(filename, lockfilename)
133class _GitFile:
134 """File that follows the git locking protocol for writes.
136 All writes to a file foo will be written into foo.lock in the same
137 directory, and the lockfile will be renamed to overwrite the original file
138 on close.
140 Note: You *must* call close() or abort() on a _GitFile for the lock to be
141 released. Typically this will happen in a finally block.
142 """
144 PROXY_PROPERTIES: ClassVar[set[str]] = {
145 "closed",
146 "encoding",
147 "errors",
148 "mode",
149 "name",
150 "newlines",
151 "softspace",
152 }
153 PROXY_METHODS: ClassVar[set[str]] = {
154 "__iter__",
155 "flush",
156 "fileno",
157 "isatty",
158 "read",
159 "readline",
160 "readlines",
161 "seek",
162 "tell",
163 "truncate",
164 "write",
165 "writelines",
166 }
168 def __init__(
169 self,
170 filename: Union[str, bytes, os.PathLike],
171 mode: str,
172 bufsize: int,
173 mask: int,
174 ) -> None:
175 # Convert PathLike to str/bytes for our internal use
176 self._filename: Union[str, bytes] = os.fspath(filename)
177 if isinstance(self._filename, bytes):
178 self._lockfilename: Union[str, bytes] = self._filename + b".lock"
179 else:
180 self._lockfilename = self._filename + ".lock"
181 try:
182 fd = os.open(
183 self._lockfilename,
184 os.O_RDWR | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0),
185 mask,
186 )
187 except FileExistsError as exc:
188 raise FileLocked(filename, self._lockfilename) from exc
189 self._file = os.fdopen(fd, mode, bufsize)
190 self._closed = False
192 for method in self.PROXY_METHODS:
193 setattr(self, method, getattr(self._file, method))
195 def __iter__(self) -> Iterator[bytes]:
196 """Iterate over lines in the file."""
197 return iter(self._file)
199 def abort(self) -> None:
200 """Close and discard the lockfile without overwriting the target.
202 If the file is already closed, this is a no-op.
203 """
204 if self._closed:
205 return
206 self._file.close()
207 try:
208 os.remove(self._lockfilename)
209 self._closed = True
210 except FileNotFoundError:
211 # The file may have been removed already, which is ok.
212 self._closed = True
214 def close(self) -> None:
215 """Close this file, saving the lockfile over the original.
217 Note: If this method fails, it will attempt to delete the lockfile.
218 However, it is not guaranteed to do so (e.g. if a filesystem
219 becomes suddenly read-only), which will prevent future writes to
220 this file until the lockfile is removed manually.
222 Raises:
223 OSError: if the original file could not be overwritten. The
224 lock file is still closed, so further attempts to write to the same
225 file object will raise ValueError.
226 """
227 if self._closed:
228 return
229 self._file.flush()
230 os.fsync(self._file.fileno())
231 self._file.close()
232 try:
233 if getattr(os, "replace", None) is not None:
234 os.replace(self._lockfilename, self._filename)
235 else:
236 if sys.platform != "win32":
237 os.rename(self._lockfilename, self._filename)
238 else:
239 # Windows versions prior to Vista don't support atomic
240 # renames
241 _fancy_rename(self._lockfilename, self._filename)
242 finally:
243 self.abort()
245 def __del__(self) -> None:
246 if not getattr(self, "_closed", True):
247 warnings.warn(f"unclosed {self!r}", ResourceWarning, stacklevel=2)
248 self.abort()
250 def __enter__(self) -> "_GitFile":
251 return self
253 def __exit__(
254 self,
255 exc_type: Optional[type[BaseException]],
256 exc_val: Optional[BaseException],
257 exc_tb: Optional[TracebackType],
258 ) -> None:
259 if exc_type is not None:
260 self.abort()
261 else:
262 self.close()
264 def __getattr__(self, name: str) -> Any: # noqa: ANN401
265 """Proxy property calls to the underlying file."""
266 if name in self.PROXY_PROPERTIES:
267 return getattr(self._file, name)
268 raise AttributeError(name)
270 def readable(self) -> bool:
271 """Return whether the file is readable."""
272 return self._file.readable()
274 def writable(self) -> bool:
275 """Return whether the file is writable."""
276 return self._file.writable()
278 def seekable(self) -> bool:
279 """Return whether the file is seekable."""
280 return self._file.seekable()