Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/file.py: 63%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

137 statements  

1# file.py -- Safe access to git files 

2# Copyright (C) 2010 Google, Inc. 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Safe access to git files.""" 

23 

24import os 

25import sys 

26import warnings 

27from collections.abc import Iterable, Iterator 

28from types import TracebackType 

29from typing import IO, Any, ClassVar, Literal, Optional, Union, overload 

30 

31if sys.version_info >= (3, 12): 

32 from collections.abc import Buffer 

33else: 

34 Buffer = Union[bytes, bytearray, memoryview] 

35 

36 

37def ensure_dir_exists(dirname: Union[str, bytes, os.PathLike]) -> None: 

38 """Ensure a directory exists, creating if necessary.""" 

39 try: 

40 os.makedirs(dirname) 

41 except FileExistsError: 

42 pass 

43 

44 

45def _fancy_rename(oldname: Union[str, bytes], newname: Union[str, bytes]) -> None: 

46 """Rename file with temporary backup file to rollback if rename fails.""" 

47 if not os.path.exists(newname): 

48 os.rename(oldname, newname) 

49 return 

50 

51 # Defer the tempfile import since it pulls in a lot of other things. 

52 import tempfile 

53 

54 # destination file exists 

55 (fd, tmpfile) = tempfile.mkstemp(".tmp", prefix=str(oldname), dir=".") 

56 os.close(fd) 

57 os.remove(tmpfile) 

58 os.rename(newname, tmpfile) 

59 try: 

60 os.rename(oldname, newname) 

61 except OSError: 

62 os.rename(tmpfile, newname) 

63 raise 

64 os.remove(tmpfile) 

65 

66 

67@overload 

68def GitFile( 

69 filename: Union[str, bytes, os.PathLike], 

70 mode: Literal["wb"], 

71 bufsize: int = -1, 

72 mask: int = 0o644, 

73) -> "_GitFile": ... 

74 

75 

76@overload 

77def GitFile( 

78 filename: Union[str, bytes, os.PathLike], 

79 mode: Literal["rb"] = "rb", 

80 bufsize: int = -1, 

81 mask: int = 0o644, 

82) -> IO[bytes]: ... 

83 

84 

85@overload 

86def GitFile( 

87 filename: Union[str, bytes, os.PathLike], 

88 mode: str = "rb", 

89 bufsize: int = -1, 

90 mask: int = 0o644, 

91) -> Union[IO[bytes], "_GitFile"]: ... 

92 

93 

94def GitFile( 

95 filename: Union[str, bytes, os.PathLike], 

96 mode: str = "rb", 

97 bufsize: int = -1, 

98 mask: int = 0o644, 

99) -> Union[IO[bytes], "_GitFile"]: 

100 """Create a file object that obeys the git file locking protocol. 

101 

102 Returns: a builtin file object or a _GitFile object 

103 

104 Note: See _GitFile for a description of the file locking protocol. 

105 

106 Only read-only and write-only (binary) modes are supported; r+, w+, and a 

107 are not. To read and write from the same file, you can take advantage of 

108 the fact that opening a file for write does not actually open the file you 

109 request. 

110 

111 The default file mask makes any created files user-writable and 

112 world-readable. 

113 

114 """ 

115 if "a" in mode: 

116 raise OSError("append mode not supported for Git files") 

117 if "+" in mode: 

118 raise OSError("read/write mode not supported for Git files") 

119 if "b" not in mode: 

120 raise OSError("text mode not supported for Git files") 

121 if "w" in mode: 

122 return _GitFile(filename, mode, bufsize, mask) 

123 else: 

124 return open(filename, mode, bufsize) 

125 

126 

127class FileLocked(Exception): 

128 """File is already locked.""" 

129 

130 def __init__( 

131 self, filename: Union[str, bytes, os.PathLike], lockfilename: Union[str, bytes] 

132 ) -> None: 

133 """Initialize FileLocked. 

134 

135 Args: 

136 filename: Name of the file that is locked 

137 lockfilename: Name of the lock file 

138 """ 

139 self.filename = filename 

140 self.lockfilename = lockfilename 

141 super().__init__(filename, lockfilename) 

142 

143 

144class _GitFile(IO[bytes]): 

145 """File that follows the git locking protocol for writes. 

146 

147 All writes to a file foo will be written into foo.lock in the same 

148 directory, and the lockfile will be renamed to overwrite the original file 

149 on close. 

150 

151 Note: You *must* call close() or abort() on a _GitFile for the lock to be 

152 released. Typically this will happen in a finally block. 

153 """ 

154 

155 PROXY_PROPERTIES: ClassVar[set[str]] = { 

156 "encoding", 

157 "errors", 

158 "mode", 

159 "name", 

160 "newlines", 

161 "softspace", 

162 } 

163 PROXY_METHODS: ClassVar[set[str]] = { 

164 "__iter__", 

165 "__next__", 

166 "flush", 

167 "fileno", 

168 "isatty", 

169 "read", 

170 "readable", 

171 "readline", 

172 "readlines", 

173 "seek", 

174 "seekable", 

175 "tell", 

176 "truncate", 

177 "writable", 

178 "write", 

179 "writelines", 

180 } 

181 

182 def __init__( 

183 self, 

184 filename: Union[str, bytes, os.PathLike], 

185 mode: str, 

186 bufsize: int, 

187 mask: int, 

188 ) -> None: 

189 # Convert PathLike to str/bytes for our internal use 

190 self._filename: Union[str, bytes] = os.fspath(filename) 

191 if isinstance(self._filename, bytes): 

192 self._lockfilename: Union[str, bytes] = self._filename + b".lock" 

193 else: 

194 self._lockfilename = self._filename + ".lock" 

195 try: 

196 fd = os.open( 

197 self._lockfilename, 

198 os.O_RDWR | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0), 

199 mask, 

200 ) 

201 except FileExistsError as exc: 

202 raise FileLocked(filename, self._lockfilename) from exc 

203 self._file = os.fdopen(fd, mode, bufsize) 

204 self._closed = False 

205 

206 def __iter__(self) -> Iterator[bytes]: 

207 """Iterate over lines in the file.""" 

208 return iter(self._file) 

209 

210 def abort(self) -> None: 

211 """Close and discard the lockfile without overwriting the target. 

212 

213 If the file is already closed, this is a no-op. 

214 """ 

215 if self._closed: 

216 return 

217 self._file.close() 

218 try: 

219 os.remove(self._lockfilename) 

220 self._closed = True 

221 except FileNotFoundError: 

222 # The file may have been removed already, which is ok. 

223 self._closed = True 

224 

225 def close(self) -> None: 

226 """Close this file, saving the lockfile over the original. 

227 

228 Note: If this method fails, it will attempt to delete the lockfile. 

229 However, it is not guaranteed to do so (e.g. if a filesystem 

230 becomes suddenly read-only), which will prevent future writes to 

231 this file until the lockfile is removed manually. 

232 

233 Raises: 

234 OSError: if the original file could not be overwritten. The 

235 lock file is still closed, so further attempts to write to the same 

236 file object will raise ValueError. 

237 """ 

238 if self._closed: 

239 return 

240 self._file.flush() 

241 os.fsync(self._file.fileno()) 

242 self._file.close() 

243 try: 

244 if getattr(os, "replace", None) is not None: 

245 os.replace(self._lockfilename, self._filename) 

246 else: 

247 if sys.platform != "win32": 

248 os.rename(self._lockfilename, self._filename) 

249 else: 

250 # Windows versions prior to Vista don't support atomic 

251 # renames 

252 _fancy_rename(self._lockfilename, self._filename) 

253 finally: 

254 self.abort() 

255 

256 def __del__(self) -> None: 

257 if not getattr(self, "_closed", True): 

258 warnings.warn(f"unclosed {self!r}", ResourceWarning, stacklevel=2) 

259 self.abort() 

260 

261 def __enter__(self) -> "_GitFile": 

262 return self 

263 

264 def __exit__( 

265 self, 

266 exc_type: Optional[type[BaseException]], 

267 exc_val: Optional[BaseException], 

268 exc_tb: Optional[TracebackType], 

269 ) -> None: 

270 if exc_type is not None: 

271 self.abort() 

272 else: 

273 self.close() 

274 

275 @property 

276 def closed(self) -> bool: 

277 """Return whether the file is closed.""" 

278 return self._closed 

279 

280 def __getattr__(self, name: str) -> Any: # noqa: ANN401 

281 """Proxy property calls to the underlying file.""" 

282 if name in self.PROXY_PROPERTIES: 

283 return getattr(self._file, name) 

284 raise AttributeError(name) 

285 

286 # Implement IO[bytes] methods by delegating to the underlying file 

287 def read(self, size: int = -1) -> bytes: 

288 return self._file.read(size) 

289 

290 # TODO: Remove type: ignore when Python 3.10 support is dropped (Oct 2026) 

291 # Python 3.9/3.10 have issues with IO[bytes] overload signatures 

292 def write(self, data: Buffer, /) -> int: # type: ignore[override] 

293 return self._file.write(data) 

294 

295 def readline(self, size: int = -1) -> bytes: 

296 return self._file.readline(size) 

297 

298 def readlines(self, hint: int = -1) -> list[bytes]: 

299 return self._file.readlines(hint) 

300 

301 # TODO: Remove type: ignore when Python 3.10 support is dropped (Oct 2026) 

302 # Python 3.9/3.10 have issues with IO[bytes] overload signatures 

303 def writelines(self, lines: Iterable[Buffer], /) -> None: # type: ignore[override] 

304 return self._file.writelines(lines) 

305 

306 def seek(self, offset: int, whence: int = 0) -> int: 

307 return self._file.seek(offset, whence) 

308 

309 def tell(self) -> int: 

310 return self._file.tell() 

311 

312 def flush(self) -> None: 

313 return self._file.flush() 

314 

315 def truncate(self, size: Optional[int] = None) -> int: 

316 return self._file.truncate(size) 

317 

318 def fileno(self) -> int: 

319 return self._file.fileno() 

320 

321 def isatty(self) -> bool: 

322 return self._file.isatty() 

323 

324 def readable(self) -> bool: 

325 return self._file.readable() 

326 

327 def writable(self) -> bool: 

328 return self._file.writable() 

329 

330 def seekable(self) -> bool: 

331 return self._file.seekable() 

332 

333 def __next__(self) -> bytes: 

334 return next(iter(self._file))