Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/file.py: 65%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

147 statements  

1# file.py -- Safe access to git files 

2# Copyright (C) 2010 Google, Inc. 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Safe access to git files.""" 

23 

24__all__ = [ 

25 "FileLocked", 

26 "GitFile", 

27 "ensure_dir_exists", 

28] 

29 

30import os 

31import sys 

32import warnings 

33from collections.abc import Iterable, Iterator 

34from types import TracebackType 

35from typing import IO, Any, ClassVar, Literal, overload 

36 

37from ._typing import Buffer 

38 

39if sys.version_info >= (3, 11): 

40 from typing import Self 

41else: 

42 from typing_extensions import Self 

43 

44 

45def ensure_dir_exists( 

46 dirname: str | bytes | os.PathLike[str] | os.PathLike[bytes], 

47) -> None: 

48 """Ensure a directory exists, creating if necessary.""" 

49 try: 

50 os.makedirs(dirname) 

51 except FileExistsError: 

52 pass 

53 

54 

55def _fancy_rename(oldname: str | bytes, newname: str | bytes) -> None: 

56 """Rename file with temporary backup file to rollback if rename fails.""" 

57 if not os.path.exists(newname): 

58 os.rename(oldname, newname) 

59 return 

60 

61 # Defer the tempfile import since it pulls in a lot of other things. 

62 import tempfile 

63 

64 # destination file exists 

65 (fd, tmpfile) = tempfile.mkstemp(".tmp", prefix=str(oldname), dir=".") 

66 os.close(fd) 

67 os.remove(tmpfile) 

68 os.rename(newname, tmpfile) 

69 try: 

70 os.rename(oldname, newname) 

71 except OSError: 

72 os.rename(tmpfile, newname) 

73 raise 

74 os.remove(tmpfile) 

75 

76 

77@overload 

78def GitFile( 

79 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes], 

80 mode: Literal["wb"], 

81 bufsize: int = -1, 

82 mask: int = 0o644, 

83 fsync: bool = True, 

84) -> "_GitFile": ... 

85 

86 

87@overload 

88def GitFile( 

89 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes], 

90 mode: Literal["rb"] = "rb", 

91 bufsize: int = -1, 

92 mask: int = 0o644, 

93 fsync: bool = True, 

94) -> IO[bytes]: ... 

95 

96 

97@overload 

98def GitFile( 

99 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes], 

100 mode: str = "rb", 

101 bufsize: int = -1, 

102 mask: int = 0o644, 

103 fsync: bool = True, 

104) -> "IO[bytes] | _GitFile": ... 

105 

106 

107def GitFile( 

108 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes], 

109 mode: str = "rb", 

110 bufsize: int = -1, 

111 mask: int = 0o644, 

112 fsync: bool = True, 

113) -> "IO[bytes] | _GitFile": 

114 """Create a file object that obeys the git file locking protocol. 

115 

116 Returns: a builtin file object or a _GitFile object 

117 

118 Note: See _GitFile for a description of the file locking protocol. 

119 

120 Only read-only and write-only (binary) modes are supported; r+, w+, and a 

121 are not. To read and write from the same file, you can take advantage of 

122 the fact that opening a file for write does not actually open the file you 

123 request. 

124 

125 The default file mask makes any created files user-writable and 

126 world-readable. 

127 

128 Args: 

129 filename: Path to the file 

130 mode: File mode (only 'rb' and 'wb' are supported) 

131 bufsize: Buffer size for file operations 

132 mask: File mask for created files 

133 fsync: Whether to call fsync() before closing (default: True) 

134 

135 """ 

136 if "a" in mode: 

137 raise OSError("append mode not supported for Git files") 

138 if "+" in mode: 

139 raise OSError("read/write mode not supported for Git files") 

140 if "b" not in mode: 

141 raise OSError("text mode not supported for Git files") 

142 if "w" in mode: 

143 return _GitFile(filename, mode, bufsize, mask, fsync) 

144 else: 

145 return open(filename, mode, bufsize) 

146 

147 

148class FileLocked(Exception): 

149 """File is already locked.""" 

150 

151 def __init__( 

152 self, 

153 filename: str | bytes, 

154 lockfilename: str | bytes, 

155 ) -> None: 

156 """Initialize FileLocked. 

157 

158 Args: 

159 filename: Name of the file that is locked 

160 lockfilename: Name of the lock file 

161 """ 

162 self.filename = filename 

163 self.lockfilename = lockfilename 

164 super().__init__(filename, lockfilename) 

165 

166 

167class _GitFile(IO[bytes]): 

168 """File that follows the git locking protocol for writes. 

169 

170 All writes to a file foo will be written into foo.lock in the same 

171 directory, and the lockfile will be renamed to overwrite the original file 

172 on close. 

173 

174 Note: You *must* call close() or abort() on a _GitFile for the lock to be 

175 released. Typically this will happen in a finally block. 

176 """ 

177 

178 _file: IO[bytes] 

179 _filename: str | bytes 

180 _lockfilename: str | bytes 

181 _closed: bool 

182 

183 PROXY_PROPERTIES: ClassVar[set[str]] = { 

184 "encoding", 

185 "errors", 

186 "mode", 

187 "name", 

188 "newlines", 

189 "softspace", 

190 } 

191 PROXY_METHODS: ClassVar[set[str]] = { 

192 "__iter__", 

193 "__next__", 

194 "flush", 

195 "fileno", 

196 "isatty", 

197 "read", 

198 "readable", 

199 "readline", 

200 "readlines", 

201 "seek", 

202 "seekable", 

203 "tell", 

204 "truncate", 

205 "writable", 

206 "write", 

207 "writelines", 

208 } 

209 

210 def __init__( 

211 self, 

212 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes], 

213 mode: str, 

214 bufsize: int, 

215 mask: int, 

216 fsync: bool = True, 

217 ) -> None: 

218 # Convert PathLike to str/bytes for our internal use 

219 self._filename: str | bytes = os.fspath(filename) 

220 self._fsync = fsync 

221 if isinstance(self._filename, bytes): 

222 self._lockfilename: str | bytes = self._filename + b".lock" 

223 else: 

224 self._lockfilename = self._filename + ".lock" 

225 try: 

226 fd = os.open( 

227 self._lockfilename, 

228 os.O_RDWR | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0), 

229 mask, 

230 ) 

231 except FileExistsError as exc: 

232 raise FileLocked(self._filename, self._lockfilename) from exc 

233 self._file = os.fdopen(fd, mode, bufsize) 

234 self._closed = False 

235 

236 def __iter__(self) -> Iterator[bytes]: 

237 """Iterate over lines in the file.""" 

238 return iter(self._file) 

239 

240 def abort(self) -> None: 

241 """Close and discard the lockfile without overwriting the target. 

242 

243 If the file is already closed, this is a no-op. 

244 """ 

245 if self._closed: 

246 return 

247 self._file.close() 

248 try: 

249 os.remove(self._lockfilename) 

250 self._closed = True 

251 except FileNotFoundError: 

252 # The file may have been removed already, which is ok. 

253 self._closed = True 

254 

255 def close(self) -> None: 

256 """Close this file, saving the lockfile over the original. 

257 

258 Note: If this method fails, it will attempt to delete the lockfile. 

259 However, it is not guaranteed to do so (e.g. if a filesystem 

260 becomes suddenly read-only), which will prevent future writes to 

261 this file until the lockfile is removed manually. 

262 

263 Raises: 

264 OSError: if the original file could not be overwritten. The 

265 lock file is still closed, so further attempts to write to the same 

266 file object will raise ValueError. 

267 """ 

268 if self._closed: 

269 return 

270 self._file.flush() 

271 if self._fsync: 

272 os.fsync(self._file.fileno()) 

273 self._file.close() 

274 try: 

275 if getattr(os, "replace", None) is not None: 

276 os.replace(self._lockfilename, self._filename) 

277 else: 

278 if sys.platform != "win32": 

279 os.rename(self._lockfilename, self._filename) 

280 else: 

281 # Windows versions prior to Vista don't support atomic 

282 # renames 

283 _fancy_rename(self._lockfilename, self._filename) 

284 finally: 

285 self.abort() 

286 

287 def __del__(self) -> None: 

288 if not getattr(self, "_closed", True): 

289 warnings.warn(f"unclosed {self!r}", ResourceWarning, stacklevel=2) 

290 self.abort() 

291 

292 def __enter__(self) -> Self: 

293 return self 

294 

295 def __exit__( 

296 self, 

297 exc_type: type[BaseException] | None, 

298 exc_val: BaseException | None, 

299 exc_tb: TracebackType | None, 

300 ) -> None: 

301 if exc_type is not None: 

302 self.abort() 

303 else: 

304 self.close() 

305 

306 def __fspath__(self) -> str | bytes: 

307 """Return the file path for os.fspath() compatibility.""" 

308 return self._filename 

309 

310 @property 

311 def closed(self) -> bool: 

312 """Return whether the file is closed.""" 

313 return self._closed 

314 

315 def __getattr__(self, name: str) -> Any: # noqa: ANN401 

316 """Proxy property calls to the underlying file.""" 

317 if name in self.PROXY_PROPERTIES: 

318 return getattr(self._file, name) 

319 raise AttributeError(name) 

320 

321 # Implement IO[bytes] methods by delegating to the underlying file 

322 def read(self, size: int = -1) -> bytes: 

323 return self._file.read(size) 

324 

325 # TODO: Remove type: ignore when Python 3.10 support is dropped (Oct 2026) 

326 # Python 3.10 has issues with IO[bytes] overload signatures 

327 def write(self, data: Buffer, /) -> int: # type: ignore[override,unused-ignore] 

328 return self._file.write(data) 

329 

330 def readline(self, size: int = -1) -> bytes: 

331 return self._file.readline(size) 

332 

333 def readlines(self, hint: int = -1) -> list[bytes]: 

334 return self._file.readlines(hint) 

335 

336 # TODO: Remove type: ignore when Python 3.10 support is dropped (Oct 2026) 

337 # Python 3.10 has issues with IO[bytes] overload signatures 

338 def writelines(self, lines: Iterable[Buffer], /) -> None: # type: ignore[override,unused-ignore] 

339 return self._file.writelines(lines) 

340 

341 def seek(self, offset: int, whence: int = 0) -> int: 

342 return self._file.seek(offset, whence) 

343 

344 def tell(self) -> int: 

345 return self._file.tell() 

346 

347 def flush(self) -> None: 

348 return self._file.flush() 

349 

350 def truncate(self, size: int | None = None) -> int: 

351 return self._file.truncate(size) 

352 

353 def fileno(self) -> int: 

354 return self._file.fileno() 

355 

356 def isatty(self) -> bool: 

357 return self._file.isatty() 

358 

359 def readable(self) -> bool: 

360 return self._file.readable() 

361 

362 def writable(self) -> bool: 

363 return self._file.writable() 

364 

365 def seekable(self) -> bool: 

366 return self._file.seekable() 

367 

368 def __next__(self) -> bytes: 

369 return next(iter(self._file))