Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/file.py: 65%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# file.py -- Safe access to git files
2# Copyright (C) 2010 Google, Inc.
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Safe access to git files."""
24__all__ = [
25 "FileLocked",
26 "GitFile",
27 "ensure_dir_exists",
28]
30import os
31import sys
32import warnings
33from collections.abc import Iterable, Iterator
34from types import TracebackType
35from typing import IO, Any, ClassVar, Literal, overload
37from ._typing import Buffer
40def ensure_dir_exists(
41 dirname: str | bytes | os.PathLike[str] | os.PathLike[bytes],
42) -> None:
43 """Ensure a directory exists, creating if necessary."""
44 try:
45 os.makedirs(dirname)
46 except FileExistsError:
47 pass
50def _fancy_rename(oldname: str | bytes, newname: str | bytes) -> None:
51 """Rename file with temporary backup file to rollback if rename fails."""
52 if not os.path.exists(newname):
53 os.rename(oldname, newname)
54 return
56 # Defer the tempfile import since it pulls in a lot of other things.
57 import tempfile
59 # destination file exists
60 (fd, tmpfile) = tempfile.mkstemp(".tmp", prefix=str(oldname), dir=".")
61 os.close(fd)
62 os.remove(tmpfile)
63 os.rename(newname, tmpfile)
64 try:
65 os.rename(oldname, newname)
66 except OSError:
67 os.rename(tmpfile, newname)
68 raise
69 os.remove(tmpfile)
72@overload
73def GitFile(
74 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes],
75 mode: Literal["wb"],
76 bufsize: int = -1,
77 mask: int = 0o644,
78 fsync: bool = True,
79) -> "_GitFile": ...
82@overload
83def GitFile(
84 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes],
85 mode: Literal["rb"] = "rb",
86 bufsize: int = -1,
87 mask: int = 0o644,
88 fsync: bool = True,
89) -> IO[bytes]: ...
92@overload
93def GitFile(
94 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes],
95 mode: str = "rb",
96 bufsize: int = -1,
97 mask: int = 0o644,
98 fsync: bool = True,
99) -> "IO[bytes] | _GitFile": ...
102def GitFile(
103 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes],
104 mode: str = "rb",
105 bufsize: int = -1,
106 mask: int = 0o644,
107 fsync: bool = True,
108) -> "IO[bytes] | _GitFile":
109 """Create a file object that obeys the git file locking protocol.
111 Returns: a builtin file object or a _GitFile object
113 Note: See _GitFile for a description of the file locking protocol.
115 Only read-only and write-only (binary) modes are supported; r+, w+, and a
116 are not. To read and write from the same file, you can take advantage of
117 the fact that opening a file for write does not actually open the file you
118 request.
120 The default file mask makes any created files user-writable and
121 world-readable.
123 Args:
124 filename: Path to the file
125 mode: File mode (only 'rb' and 'wb' are supported)
126 bufsize: Buffer size for file operations
127 mask: File mask for created files
128 fsync: Whether to call fsync() before closing (default: True)
130 """
131 if "a" in mode:
132 raise OSError("append mode not supported for Git files")
133 if "+" in mode:
134 raise OSError("read/write mode not supported for Git files")
135 if "b" not in mode:
136 raise OSError("text mode not supported for Git files")
137 if "w" in mode:
138 return _GitFile(filename, mode, bufsize, mask, fsync)
139 else:
140 return open(filename, mode, bufsize)
143class FileLocked(Exception):
144 """File is already locked."""
146 def __init__(
147 self,
148 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes],
149 lockfilename: str | bytes,
150 ) -> None:
151 """Initialize FileLocked.
153 Args:
154 filename: Name of the file that is locked
155 lockfilename: Name of the lock file
156 """
157 self.filename = filename
158 self.lockfilename = lockfilename
159 super().__init__(filename, lockfilename)
162class _GitFile(IO[bytes]):
163 """File that follows the git locking protocol for writes.
165 All writes to a file foo will be written into foo.lock in the same
166 directory, and the lockfile will be renamed to overwrite the original file
167 on close.
169 Note: You *must* call close() or abort() on a _GitFile for the lock to be
170 released. Typically this will happen in a finally block.
171 """
173 _file: IO[bytes]
174 _filename: str | bytes
175 _lockfilename: str | bytes
176 _closed: bool
178 PROXY_PROPERTIES: ClassVar[set[str]] = {
179 "encoding",
180 "errors",
181 "mode",
182 "name",
183 "newlines",
184 "softspace",
185 }
186 PROXY_METHODS: ClassVar[set[str]] = {
187 "__iter__",
188 "__next__",
189 "flush",
190 "fileno",
191 "isatty",
192 "read",
193 "readable",
194 "readline",
195 "readlines",
196 "seek",
197 "seekable",
198 "tell",
199 "truncate",
200 "writable",
201 "write",
202 "writelines",
203 }
205 def __init__(
206 self,
207 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes],
208 mode: str,
209 bufsize: int,
210 mask: int,
211 fsync: bool = True,
212 ) -> None:
213 # Convert PathLike to str/bytes for our internal use
214 self._filename: str | bytes = os.fspath(filename)
215 self._fsync = fsync
216 if isinstance(self._filename, bytes):
217 self._lockfilename: str | bytes = self._filename + b".lock"
218 else:
219 self._lockfilename = self._filename + ".lock"
220 try:
221 fd = os.open(
222 self._lockfilename,
223 os.O_RDWR | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0),
224 mask,
225 )
226 except FileExistsError as exc:
227 raise FileLocked(filename, self._lockfilename) from exc
228 self._file = os.fdopen(fd, mode, bufsize)
229 self._closed = False
231 def __iter__(self) -> Iterator[bytes]:
232 """Iterate over lines in the file."""
233 return iter(self._file)
235 def abort(self) -> None:
236 """Close and discard the lockfile without overwriting the target.
238 If the file is already closed, this is a no-op.
239 """
240 if self._closed:
241 return
242 self._file.close()
243 try:
244 os.remove(self._lockfilename)
245 self._closed = True
246 except FileNotFoundError:
247 # The file may have been removed already, which is ok.
248 self._closed = True
250 def close(self) -> None:
251 """Close this file, saving the lockfile over the original.
253 Note: If this method fails, it will attempt to delete the lockfile.
254 However, it is not guaranteed to do so (e.g. if a filesystem
255 becomes suddenly read-only), which will prevent future writes to
256 this file until the lockfile is removed manually.
258 Raises:
259 OSError: if the original file could not be overwritten. The
260 lock file is still closed, so further attempts to write to the same
261 file object will raise ValueError.
262 """
263 if self._closed:
264 return
265 self._file.flush()
266 if self._fsync:
267 os.fsync(self._file.fileno())
268 self._file.close()
269 try:
270 if getattr(os, "replace", None) is not None:
271 os.replace(self._lockfilename, self._filename)
272 else:
273 if sys.platform != "win32":
274 os.rename(self._lockfilename, self._filename)
275 else:
276 # Windows versions prior to Vista don't support atomic
277 # renames
278 _fancy_rename(self._lockfilename, self._filename)
279 finally:
280 self.abort()
282 def __del__(self) -> None:
283 if not getattr(self, "_closed", True):
284 warnings.warn(f"unclosed {self!r}", ResourceWarning, stacklevel=2)
285 self.abort()
287 def __enter__(self) -> "_GitFile":
288 return self
290 def __exit__(
291 self,
292 exc_type: type[BaseException] | None,
293 exc_val: BaseException | None,
294 exc_tb: TracebackType | None,
295 ) -> None:
296 if exc_type is not None:
297 self.abort()
298 else:
299 self.close()
301 def __fspath__(self) -> str | bytes:
302 """Return the file path for os.fspath() compatibility."""
303 return self._filename
305 @property
306 def closed(self) -> bool:
307 """Return whether the file is closed."""
308 return self._closed
310 def __getattr__(self, name: str) -> Any: # noqa: ANN401
311 """Proxy property calls to the underlying file."""
312 if name in self.PROXY_PROPERTIES:
313 return getattr(self._file, name)
314 raise AttributeError(name)
316 # Implement IO[bytes] methods by delegating to the underlying file
317 def read(self, size: int = -1) -> bytes:
318 return self._file.read(size)
320 # TODO: Remove type: ignore when Python 3.10 support is dropped (Oct 2026)
321 # Python 3.10 has issues with IO[bytes] overload signatures
322 def write(self, data: Buffer, /) -> int: # type: ignore[override,unused-ignore]
323 return self._file.write(data)
325 def readline(self, size: int = -1) -> bytes:
326 return self._file.readline(size)
328 def readlines(self, hint: int = -1) -> list[bytes]:
329 return self._file.readlines(hint)
331 # TODO: Remove type: ignore when Python 3.10 support is dropped (Oct 2026)
332 # Python 3.10 has issues with IO[bytes] overload signatures
333 def writelines(self, lines: Iterable[Buffer], /) -> None: # type: ignore[override,unused-ignore]
334 return self._file.writelines(lines)
336 def seek(self, offset: int, whence: int = 0) -> int:
337 return self._file.seek(offset, whence)
339 def tell(self) -> int:
340 return self._file.tell()
342 def flush(self) -> None:
343 return self._file.flush()
345 def truncate(self, size: int | None = None) -> int:
346 return self._file.truncate(size)
348 def fileno(self) -> int:
349 return self._file.fileno()
351 def isatty(self) -> bool:
352 return self._file.isatty()
354 def readable(self) -> bool:
355 return self._file.readable()
357 def writable(self) -> bool:
358 return self._file.writable()
360 def seekable(self) -> bool:
361 return self._file.seekable()
363 def __next__(self) -> bytes:
364 return next(iter(self._file))