Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/file.py: 57%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# file.py -- Safe access to git files
2# Copyright (C) 2010 Google, Inc.
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as public by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Safe access to git files."""
24import os
25import sys
26import warnings
27from typing import ClassVar, Union
30def ensure_dir_exists(dirname) -> None:
31 """Ensure a directory exists, creating if necessary."""
32 try:
33 os.makedirs(dirname)
34 except FileExistsError:
35 pass
38def _fancy_rename(oldname, newname) -> None:
39 """Rename file with temporary backup file to rollback if rename fails."""
40 if not os.path.exists(newname):
41 try:
42 os.rename(oldname, newname)
43 except OSError:
44 raise
45 return
47 # Defer the tempfile import since it pulls in a lot of other things.
48 import tempfile
50 # destination file exists
51 try:
52 (fd, tmpfile) = tempfile.mkstemp(".tmp", prefix=oldname, dir=".")
53 os.close(fd)
54 os.remove(tmpfile)
55 except OSError:
56 # either file could not be created (e.g. permission problem)
57 # or could not be deleted (e.g. rude virus scanner)
58 raise
59 try:
60 os.rename(newname, tmpfile)
61 except OSError:
62 raise # no rename occurred
63 try:
64 os.rename(oldname, newname)
65 except OSError:
66 os.rename(tmpfile, newname)
67 raise
68 os.remove(tmpfile)
71def GitFile(
72 filename: Union[str, bytes, os.PathLike], mode="rb", bufsize=-1, mask=0o644
73):
74 """Create a file object that obeys the git file locking protocol.
76 Returns: a builtin file object or a _GitFile object
78 Note: See _GitFile for a description of the file locking protocol.
80 Only read-only and write-only (binary) modes are supported; r+, w+, and a
81 are not. To read and write from the same file, you can take advantage of
82 the fact that opening a file for write does not actually open the file you
83 request.
85 The default file mask makes any created files user-writable and
86 world-readable.
88 """
89 if "a" in mode:
90 raise OSError("append mode not supported for Git files")
91 if "+" in mode:
92 raise OSError("read/write mode not supported for Git files")
93 if "b" not in mode:
94 raise OSError("text mode not supported for Git files")
95 if "w" in mode:
96 return _GitFile(filename, mode, bufsize, mask)
97 else:
98 return open(filename, mode, bufsize)
101class FileLocked(Exception):
102 """File is already locked."""
104 def __init__(self, filename, lockfilename) -> None:
105 self.filename = filename
106 self.lockfilename = lockfilename
107 super().__init__(filename, lockfilename)
110class _GitFile:
111 """File that follows the git locking protocol for writes.
113 All writes to a file foo will be written into foo.lock in the same
114 directory, and the lockfile will be renamed to overwrite the original file
115 on close.
117 Note: You *must* call close() or abort() on a _GitFile for the lock to be
118 released. Typically this will happen in a finally block.
119 """
121 PROXY_PROPERTIES: ClassVar[set[str]] = {
122 "closed",
123 "encoding",
124 "errors",
125 "mode",
126 "name",
127 "newlines",
128 "softspace",
129 }
130 PROXY_METHODS: ClassVar[set[str]] = {
131 "__iter__",
132 "flush",
133 "fileno",
134 "isatty",
135 "read",
136 "readline",
137 "readlines",
138 "seek",
139 "tell",
140 "truncate",
141 "write",
142 "writelines",
143 }
145 def __init__(
146 self, filename: Union[str, bytes, os.PathLike], mode, bufsize, mask
147 ) -> None:
148 # Convert PathLike to str/bytes for our internal use
149 self._filename: Union[str, bytes] = os.fspath(filename)
150 if isinstance(self._filename, bytes):
151 self._lockfilename: Union[str, bytes] = self._filename + b".lock"
152 else:
153 self._lockfilename = self._filename + ".lock"
154 try:
155 fd = os.open(
156 self._lockfilename,
157 os.O_RDWR | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0),
158 mask,
159 )
160 except FileExistsError as exc:
161 raise FileLocked(filename, self._lockfilename) from exc
162 self._file = os.fdopen(fd, mode, bufsize)
163 self._closed = False
165 for method in self.PROXY_METHODS:
166 setattr(self, method, getattr(self._file, method))
168 def abort(self) -> None:
169 """Close and discard the lockfile without overwriting the target.
171 If the file is already closed, this is a no-op.
172 """
173 if self._closed:
174 return
175 self._file.close()
176 try:
177 os.remove(self._lockfilename)
178 self._closed = True
179 except FileNotFoundError:
180 # The file may have been removed already, which is ok.
181 self._closed = True
183 def close(self) -> None:
184 """Close this file, saving the lockfile over the original.
186 Note: If this method fails, it will attempt to delete the lockfile.
187 However, it is not guaranteed to do so (e.g. if a filesystem
188 becomes suddenly read-only), which will prevent future writes to
189 this file until the lockfile is removed manually.
191 Raises:
192 OSError: if the original file could not be overwritten. The
193 lock file is still closed, so further attempts to write to the same
194 file object will raise ValueError.
195 """
196 if self._closed:
197 return
198 self._file.flush()
199 os.fsync(self._file.fileno())
200 self._file.close()
201 try:
202 if getattr(os, "replace", None) is not None:
203 os.replace(self._lockfilename, self._filename)
204 else:
205 if sys.platform != "win32":
206 os.rename(self._lockfilename, self._filename)
207 else:
208 # Windows versions prior to Vista don't support atomic
209 # renames
210 _fancy_rename(self._lockfilename, self._filename)
211 finally:
212 self.abort()
214 def __del__(self) -> None:
215 if not getattr(self, "_closed", True):
216 warnings.warn(f"unclosed {self!r}", ResourceWarning, stacklevel=2)
217 self.abort()
219 def __enter__(self):
220 return self
222 def __exit__(self, exc_type, exc_val, exc_tb):
223 if exc_type is not None:
224 self.abort()
225 else:
226 self.close()
228 def __getattr__(self, name):
229 """Proxy property calls to the underlying file."""
230 if name in self.PROXY_PROPERTIES:
231 return getattr(self._file, name)
232 raise AttributeError(name)