Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/file.py: 64%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# file.py -- Safe access to git files
2# Copyright (C) 2010 Google, Inc.
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Safe access to git files."""
24import os
25import sys
26import warnings
27from collections.abc import Iterable, Iterator
28from types import TracebackType
29from typing import IO, Any, ClassVar, Literal, overload
31from ._typing import Buffer
34def ensure_dir_exists(
35 dirname: str | bytes | os.PathLike[str] | os.PathLike[bytes],
36) -> None:
37 """Ensure a directory exists, creating if necessary."""
38 try:
39 os.makedirs(dirname)
40 except FileExistsError:
41 pass
44def _fancy_rename(oldname: str | bytes, newname: str | bytes) -> None:
45 """Rename file with temporary backup file to rollback if rename fails."""
46 if not os.path.exists(newname):
47 os.rename(oldname, newname)
48 return
50 # Defer the tempfile import since it pulls in a lot of other things.
51 import tempfile
53 # destination file exists
54 (fd, tmpfile) = tempfile.mkstemp(".tmp", prefix=str(oldname), dir=".")
55 os.close(fd)
56 os.remove(tmpfile)
57 os.rename(newname, tmpfile)
58 try:
59 os.rename(oldname, newname)
60 except OSError:
61 os.rename(tmpfile, newname)
62 raise
63 os.remove(tmpfile)
66@overload
67def GitFile(
68 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes],
69 mode: Literal["wb"],
70 bufsize: int = -1,
71 mask: int = 0o644,
72 fsync: bool = True,
73) -> "_GitFile": ...
76@overload
77def GitFile(
78 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes],
79 mode: Literal["rb"] = "rb",
80 bufsize: int = -1,
81 mask: int = 0o644,
82 fsync: bool = True,
83) -> IO[bytes]: ...
86@overload
87def GitFile(
88 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes],
89 mode: str = "rb",
90 bufsize: int = -1,
91 mask: int = 0o644,
92 fsync: bool = True,
93) -> "IO[bytes] | _GitFile": ...
96def GitFile(
97 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes],
98 mode: str = "rb",
99 bufsize: int = -1,
100 mask: int = 0o644,
101 fsync: bool = True,
102) -> "IO[bytes] | _GitFile":
103 """Create a file object that obeys the git file locking protocol.
105 Returns: a builtin file object or a _GitFile object
107 Note: See _GitFile for a description of the file locking protocol.
109 Only read-only and write-only (binary) modes are supported; r+, w+, and a
110 are not. To read and write from the same file, you can take advantage of
111 the fact that opening a file for write does not actually open the file you
112 request.
114 The default file mask makes any created files user-writable and
115 world-readable.
117 Args:
118 filename: Path to the file
119 mode: File mode (only 'rb' and 'wb' are supported)
120 bufsize: Buffer size for file operations
121 mask: File mask for created files
122 fsync: Whether to call fsync() before closing (default: True)
124 """
125 if "a" in mode:
126 raise OSError("append mode not supported for Git files")
127 if "+" in mode:
128 raise OSError("read/write mode not supported for Git files")
129 if "b" not in mode:
130 raise OSError("text mode not supported for Git files")
131 if "w" in mode:
132 return _GitFile(filename, mode, bufsize, mask, fsync)
133 else:
134 return open(filename, mode, bufsize)
137class FileLocked(Exception):
138 """File is already locked."""
140 def __init__(
141 self,
142 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes],
143 lockfilename: str | bytes,
144 ) -> None:
145 """Initialize FileLocked.
147 Args:
148 filename: Name of the file that is locked
149 lockfilename: Name of the lock file
150 """
151 self.filename = filename
152 self.lockfilename = lockfilename
153 super().__init__(filename, lockfilename)
156class _GitFile(IO[bytes]):
157 """File that follows the git locking protocol for writes.
159 All writes to a file foo will be written into foo.lock in the same
160 directory, and the lockfile will be renamed to overwrite the original file
161 on close.
163 Note: You *must* call close() or abort() on a _GitFile for the lock to be
164 released. Typically this will happen in a finally block.
165 """
167 _file: IO[bytes]
168 _filename: str | bytes
169 _lockfilename: str | bytes
170 _closed: bool
172 PROXY_PROPERTIES: ClassVar[set[str]] = {
173 "encoding",
174 "errors",
175 "mode",
176 "name",
177 "newlines",
178 "softspace",
179 }
180 PROXY_METHODS: ClassVar[set[str]] = {
181 "__iter__",
182 "__next__",
183 "flush",
184 "fileno",
185 "isatty",
186 "read",
187 "readable",
188 "readline",
189 "readlines",
190 "seek",
191 "seekable",
192 "tell",
193 "truncate",
194 "writable",
195 "write",
196 "writelines",
197 }
199 def __init__(
200 self,
201 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes],
202 mode: str,
203 bufsize: int,
204 mask: int,
205 fsync: bool = True,
206 ) -> None:
207 # Convert PathLike to str/bytes for our internal use
208 self._filename: str | bytes = os.fspath(filename)
209 self._fsync = fsync
210 if isinstance(self._filename, bytes):
211 self._lockfilename: str | bytes = self._filename + b".lock"
212 else:
213 self._lockfilename = self._filename + ".lock"
214 try:
215 fd = os.open(
216 self._lockfilename,
217 os.O_RDWR | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0),
218 mask,
219 )
220 except FileExistsError as exc:
221 raise FileLocked(filename, self._lockfilename) from exc
222 self._file = os.fdopen(fd, mode, bufsize)
223 self._closed = False
225 def __iter__(self) -> Iterator[bytes]:
226 """Iterate over lines in the file."""
227 return iter(self._file)
229 def abort(self) -> None:
230 """Close and discard the lockfile without overwriting the target.
232 If the file is already closed, this is a no-op.
233 """
234 if self._closed:
235 return
236 self._file.close()
237 try:
238 os.remove(self._lockfilename)
239 self._closed = True
240 except FileNotFoundError:
241 # The file may have been removed already, which is ok.
242 self._closed = True
244 def close(self) -> None:
245 """Close this file, saving the lockfile over the original.
247 Note: If this method fails, it will attempt to delete the lockfile.
248 However, it is not guaranteed to do so (e.g. if a filesystem
249 becomes suddenly read-only), which will prevent future writes to
250 this file until the lockfile is removed manually.
252 Raises:
253 OSError: if the original file could not be overwritten. The
254 lock file is still closed, so further attempts to write to the same
255 file object will raise ValueError.
256 """
257 if self._closed:
258 return
259 self._file.flush()
260 if self._fsync:
261 os.fsync(self._file.fileno())
262 self._file.close()
263 try:
264 if getattr(os, "replace", None) is not None:
265 os.replace(self._lockfilename, self._filename)
266 else:
267 if sys.platform != "win32":
268 os.rename(self._lockfilename, self._filename)
269 else:
270 # Windows versions prior to Vista don't support atomic
271 # renames
272 _fancy_rename(self._lockfilename, self._filename)
273 finally:
274 self.abort()
276 def __del__(self) -> None:
277 if not getattr(self, "_closed", True):
278 warnings.warn(f"unclosed {self!r}", ResourceWarning, stacklevel=2)
279 self.abort()
281 def __enter__(self) -> "_GitFile":
282 return self
284 def __exit__(
285 self,
286 exc_type: type[BaseException] | None,
287 exc_val: BaseException | None,
288 exc_tb: TracebackType | None,
289 ) -> None:
290 if exc_type is not None:
291 self.abort()
292 else:
293 self.close()
295 def __fspath__(self) -> str | bytes:
296 """Return the file path for os.fspath() compatibility."""
297 return self._filename
299 @property
300 def closed(self) -> bool:
301 """Return whether the file is closed."""
302 return self._closed
304 def __getattr__(self, name: str) -> Any: # noqa: ANN401
305 """Proxy property calls to the underlying file."""
306 if name in self.PROXY_PROPERTIES:
307 return getattr(self._file, name)
308 raise AttributeError(name)
310 # Implement IO[bytes] methods by delegating to the underlying file
311 def read(self, size: int = -1) -> bytes:
312 return self._file.read(size)
314 # TODO: Remove type: ignore when Python 3.10 support is dropped (Oct 2026)
315 # Python 3.10 has issues with IO[bytes] overload signatures
316 def write(self, data: Buffer, /) -> int: # type: ignore[override,unused-ignore]
317 return self._file.write(data)
319 def readline(self, size: int = -1) -> bytes:
320 return self._file.readline(size)
322 def readlines(self, hint: int = -1) -> list[bytes]:
323 return self._file.readlines(hint)
325 # TODO: Remove type: ignore when Python 3.10 support is dropped (Oct 2026)
326 # Python 3.10 has issues with IO[bytes] overload signatures
327 def writelines(self, lines: Iterable[Buffer], /) -> None: # type: ignore[override,unused-ignore]
328 return self._file.writelines(lines)
330 def seek(self, offset: int, whence: int = 0) -> int:
331 return self._file.seek(offset, whence)
333 def tell(self) -> int:
334 return self._file.tell()
336 def flush(self) -> None:
337 return self._file.flush()
339 def truncate(self, size: int | None = None) -> int:
340 return self._file.truncate(size)
342 def fileno(self) -> int:
343 return self._file.fileno()
345 def isatty(self) -> bool:
346 return self._file.isatty()
348 def readable(self) -> bool:
349 return self._file.readable()
351 def writable(self) -> bool:
352 return self._file.writable()
354 def seekable(self) -> bool:
355 return self._file.seekable()
357 def __next__(self) -> bytes:
358 return next(iter(self._file))