Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/file.py: 65%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# file.py -- Safe access to git files
2# Copyright (C) 2010 Google, Inc.
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Safe access to git files."""
24__all__ = [
25 "FileLocked",
26 "GitFile",
27 "ensure_dir_exists",
28]
30import os
31import sys
32import warnings
33from collections.abc import Iterable, Iterator
34from types import TracebackType
35from typing import IO, Any, ClassVar, Literal, overload
37from ._typing import Buffer
39if sys.version_info >= (3, 11):
40 from typing import Self
41else:
42 from typing_extensions import Self
45def ensure_dir_exists(
46 dirname: str | bytes | os.PathLike[str] | os.PathLike[bytes],
47) -> None:
48 """Ensure a directory exists, creating if necessary."""
49 try:
50 os.makedirs(dirname)
51 except FileExistsError:
52 pass
55def _fancy_rename(oldname: str | bytes, newname: str | bytes) -> None:
56 """Rename file with temporary backup file to rollback if rename fails."""
57 if not os.path.exists(newname):
58 os.rename(oldname, newname)
59 return
61 # Defer the tempfile import since it pulls in a lot of other things.
62 import tempfile
64 # destination file exists
65 (fd, tmpfile) = tempfile.mkstemp(".tmp", prefix=str(oldname), dir=".")
66 os.close(fd)
67 os.remove(tmpfile)
68 os.rename(newname, tmpfile)
69 try:
70 os.rename(oldname, newname)
71 except OSError:
72 os.rename(tmpfile, newname)
73 raise
74 os.remove(tmpfile)
77@overload
78def GitFile(
79 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes],
80 mode: Literal["wb"],
81 bufsize: int = -1,
82 mask: int = 0o644,
83 fsync: bool = True,
84) -> "_GitFile": ...
87@overload
88def GitFile(
89 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes],
90 mode: Literal["rb"] = "rb",
91 bufsize: int = -1,
92 mask: int = 0o644,
93 fsync: bool = True,
94) -> IO[bytes]: ...
97@overload
98def GitFile(
99 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes],
100 mode: str = "rb",
101 bufsize: int = -1,
102 mask: int = 0o644,
103 fsync: bool = True,
104) -> "IO[bytes] | _GitFile": ...
107def GitFile(
108 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes],
109 mode: str = "rb",
110 bufsize: int = -1,
111 mask: int = 0o644,
112 fsync: bool = True,
113) -> "IO[bytes] | _GitFile":
114 """Create a file object that obeys the git file locking protocol.
116 Returns: a builtin file object or a _GitFile object
118 Note: See _GitFile for a description of the file locking protocol.
120 Only read-only and write-only (binary) modes are supported; r+, w+, and a
121 are not. To read and write from the same file, you can take advantage of
122 the fact that opening a file for write does not actually open the file you
123 request.
125 The default file mask makes any created files user-writable and
126 world-readable.
128 Args:
129 filename: Path to the file
130 mode: File mode (only 'rb' and 'wb' are supported)
131 bufsize: Buffer size for file operations
132 mask: File mask for created files
133 fsync: Whether to call fsync() before closing (default: True)
135 """
136 if "a" in mode:
137 raise OSError("append mode not supported for Git files")
138 if "+" in mode:
139 raise OSError("read/write mode not supported for Git files")
140 if "b" not in mode:
141 raise OSError("text mode not supported for Git files")
142 if "w" in mode:
143 return _GitFile(filename, mode, bufsize, mask, fsync)
144 else:
145 return open(filename, mode, bufsize)
148class FileLocked(Exception):
149 """File is already locked."""
151 def __init__(
152 self,
153 filename: str | bytes,
154 lockfilename: str | bytes,
155 ) -> None:
156 """Initialize FileLocked.
158 Args:
159 filename: Name of the file that is locked
160 lockfilename: Name of the lock file
161 """
162 self.filename = filename
163 self.lockfilename = lockfilename
164 super().__init__(filename, lockfilename)
167class _GitFile(IO[bytes]):
168 """File that follows the git locking protocol for writes.
170 All writes to a file foo will be written into foo.lock in the same
171 directory, and the lockfile will be renamed to overwrite the original file
172 on close.
174 Note: You *must* call close() or abort() on a _GitFile for the lock to be
175 released. Typically this will happen in a finally block.
176 """
178 _file: IO[bytes]
179 _filename: str | bytes
180 _lockfilename: str | bytes
181 _closed: bool
183 PROXY_PROPERTIES: ClassVar[set[str]] = {
184 "encoding",
185 "errors",
186 "mode",
187 "name",
188 "newlines",
189 "softspace",
190 }
191 PROXY_METHODS: ClassVar[set[str]] = {
192 "__iter__",
193 "__next__",
194 "flush",
195 "fileno",
196 "isatty",
197 "read",
198 "readable",
199 "readline",
200 "readlines",
201 "seek",
202 "seekable",
203 "tell",
204 "truncate",
205 "writable",
206 "write",
207 "writelines",
208 }
210 def __init__(
211 self,
212 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes],
213 mode: str,
214 bufsize: int,
215 mask: int,
216 fsync: bool = True,
217 ) -> None:
218 # Convert PathLike to str/bytes for our internal use
219 self._filename: str | bytes = os.fspath(filename)
220 self._fsync = fsync
221 if isinstance(self._filename, bytes):
222 self._lockfilename: str | bytes = self._filename + b".lock"
223 else:
224 self._lockfilename = self._filename + ".lock"
225 try:
226 fd = os.open(
227 self._lockfilename,
228 os.O_RDWR | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0),
229 mask,
230 )
231 except FileExistsError as exc:
232 raise FileLocked(self._filename, self._lockfilename) from exc
233 self._file = os.fdopen(fd, mode, bufsize)
234 self._closed = False
236 def __iter__(self) -> Iterator[bytes]:
237 """Iterate over lines in the file."""
238 return iter(self._file)
240 def abort(self) -> None:
241 """Close and discard the lockfile without overwriting the target.
243 If the file is already closed, this is a no-op.
244 """
245 if self._closed:
246 return
247 self._file.close()
248 try:
249 os.remove(self._lockfilename)
250 self._closed = True
251 except FileNotFoundError:
252 # The file may have been removed already, which is ok.
253 self._closed = True
255 def close(self) -> None:
256 """Close this file, saving the lockfile over the original.
258 Note: If this method fails, it will attempt to delete the lockfile.
259 However, it is not guaranteed to do so (e.g. if a filesystem
260 becomes suddenly read-only), which will prevent future writes to
261 this file until the lockfile is removed manually.
263 Raises:
264 OSError: if the original file could not be overwritten. The
265 lock file is still closed, so further attempts to write to the same
266 file object will raise ValueError.
267 """
268 if self._closed:
269 return
270 self._file.flush()
271 if self._fsync:
272 os.fsync(self._file.fileno())
273 self._file.close()
274 try:
275 if getattr(os, "replace", None) is not None:
276 os.replace(self._lockfilename, self._filename)
277 else:
278 if sys.platform != "win32":
279 os.rename(self._lockfilename, self._filename)
280 else:
281 # Windows versions prior to Vista don't support atomic
282 # renames
283 _fancy_rename(self._lockfilename, self._filename)
284 finally:
285 self.abort()
287 def __del__(self) -> None:
288 if not getattr(self, "_closed", True):
289 warnings.warn(f"unclosed {self!r}", ResourceWarning, stacklevel=2)
290 self.abort()
292 def __enter__(self) -> Self:
293 return self
295 def __exit__(
296 self,
297 exc_type: type[BaseException] | None,
298 exc_val: BaseException | None,
299 exc_tb: TracebackType | None,
300 ) -> None:
301 if exc_type is not None:
302 self.abort()
303 else:
304 self.close()
306 def __fspath__(self) -> str | bytes:
307 """Return the file path for os.fspath() compatibility."""
308 return self._filename
310 @property
311 def closed(self) -> bool:
312 """Return whether the file is closed."""
313 return self._closed
315 def __getattr__(self, name: str) -> Any: # noqa: ANN401
316 """Proxy property calls to the underlying file."""
317 if name in self.PROXY_PROPERTIES:
318 return getattr(self._file, name)
319 raise AttributeError(name)
321 # Implement IO[bytes] methods by delegating to the underlying file
322 def read(self, size: int = -1) -> bytes:
323 return self._file.read(size)
325 # TODO: Remove type: ignore when Python 3.10 support is dropped (Oct 2026)
326 # Python 3.10 has issues with IO[bytes] overload signatures
327 def write(self, data: Buffer, /) -> int: # type: ignore[override,unused-ignore]
328 return self._file.write(data)
330 def readline(self, size: int = -1) -> bytes:
331 return self._file.readline(size)
333 def readlines(self, hint: int = -1) -> list[bytes]:
334 return self._file.readlines(hint)
336 # TODO: Remove type: ignore when Python 3.10 support is dropped (Oct 2026)
337 # Python 3.10 has issues with IO[bytes] overload signatures
338 def writelines(self, lines: Iterable[Buffer], /) -> None: # type: ignore[override,unused-ignore]
339 return self._file.writelines(lines)
341 def seek(self, offset: int, whence: int = 0) -> int:
342 return self._file.seek(offset, whence)
344 def tell(self) -> int:
345 return self._file.tell()
347 def flush(self) -> None:
348 return self._file.flush()
350 def truncate(self, size: int | None = None) -> int:
351 return self._file.truncate(size)
353 def fileno(self) -> int:
354 return self._file.fileno()
356 def isatty(self) -> bool:
357 return self._file.isatty()
359 def readable(self) -> bool:
360 return self._file.readable()
362 def writable(self) -> bool:
363 return self._file.writable()
365 def seekable(self) -> bool:
366 return self._file.seekable()
368 def __next__(self) -> bytes:
369 return next(iter(self._file))