Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/file.py: 65%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

144 statements  

1# file.py -- Safe access to git files 

2# Copyright (C) 2010 Google, Inc. 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Safe access to git files.""" 

23 

24__all__ = [ 

25 "FileLocked", 

26 "GitFile", 

27 "ensure_dir_exists", 

28] 

29 

30import os 

31import sys 

32import warnings 

33from collections.abc import Iterable, Iterator 

34from types import TracebackType 

35from typing import IO, Any, ClassVar, Literal, overload 

36 

37from ._typing import Buffer 

38 

39 

40def ensure_dir_exists( 

41 dirname: str | bytes | os.PathLike[str] | os.PathLike[bytes], 

42) -> None: 

43 """Ensure a directory exists, creating if necessary.""" 

44 try: 

45 os.makedirs(dirname) 

46 except FileExistsError: 

47 pass 

48 

49 

50def _fancy_rename(oldname: str | bytes, newname: str | bytes) -> None: 

51 """Rename file with temporary backup file to rollback if rename fails.""" 

52 if not os.path.exists(newname): 

53 os.rename(oldname, newname) 

54 return 

55 

56 # Defer the tempfile import since it pulls in a lot of other things. 

57 import tempfile 

58 

59 # destination file exists 

60 (fd, tmpfile) = tempfile.mkstemp(".tmp", prefix=str(oldname), dir=".") 

61 os.close(fd) 

62 os.remove(tmpfile) 

63 os.rename(newname, tmpfile) 

64 try: 

65 os.rename(oldname, newname) 

66 except OSError: 

67 os.rename(tmpfile, newname) 

68 raise 

69 os.remove(tmpfile) 

70 

71 

72@overload 

73def GitFile( 

74 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes], 

75 mode: Literal["wb"], 

76 bufsize: int = -1, 

77 mask: int = 0o644, 

78 fsync: bool = True, 

79) -> "_GitFile": ... 

80 

81 

82@overload 

83def GitFile( 

84 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes], 

85 mode: Literal["rb"] = "rb", 

86 bufsize: int = -1, 

87 mask: int = 0o644, 

88 fsync: bool = True, 

89) -> IO[bytes]: ... 

90 

91 

92@overload 

93def GitFile( 

94 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes], 

95 mode: str = "rb", 

96 bufsize: int = -1, 

97 mask: int = 0o644, 

98 fsync: bool = True, 

99) -> "IO[bytes] | _GitFile": ... 

100 

101 

102def GitFile( 

103 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes], 

104 mode: str = "rb", 

105 bufsize: int = -1, 

106 mask: int = 0o644, 

107 fsync: bool = True, 

108) -> "IO[bytes] | _GitFile": 

109 """Create a file object that obeys the git file locking protocol. 

110 

111 Returns: a builtin file object or a _GitFile object 

112 

113 Note: See _GitFile for a description of the file locking protocol. 

114 

115 Only read-only and write-only (binary) modes are supported; r+, w+, and a 

116 are not. To read and write from the same file, you can take advantage of 

117 the fact that opening a file for write does not actually open the file you 

118 request. 

119 

120 The default file mask makes any created files user-writable and 

121 world-readable. 

122 

123 Args: 

124 filename: Path to the file 

125 mode: File mode (only 'rb' and 'wb' are supported) 

126 bufsize: Buffer size for file operations 

127 mask: File mask for created files 

128 fsync: Whether to call fsync() before closing (default: True) 

129 

130 """ 

131 if "a" in mode: 

132 raise OSError("append mode not supported for Git files") 

133 if "+" in mode: 

134 raise OSError("read/write mode not supported for Git files") 

135 if "b" not in mode: 

136 raise OSError("text mode not supported for Git files") 

137 if "w" in mode: 

138 return _GitFile(filename, mode, bufsize, mask, fsync) 

139 else: 

140 return open(filename, mode, bufsize) 

141 

142 

143class FileLocked(Exception): 

144 """File is already locked.""" 

145 

146 def __init__( 

147 self, 

148 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes], 

149 lockfilename: str | bytes, 

150 ) -> None: 

151 """Initialize FileLocked. 

152 

153 Args: 

154 filename: Name of the file that is locked 

155 lockfilename: Name of the lock file 

156 """ 

157 self.filename = filename 

158 self.lockfilename = lockfilename 

159 super().__init__(filename, lockfilename) 

160 

161 

162class _GitFile(IO[bytes]): 

163 """File that follows the git locking protocol for writes. 

164 

165 All writes to a file foo will be written into foo.lock in the same 

166 directory, and the lockfile will be renamed to overwrite the original file 

167 on close. 

168 

169 Note: You *must* call close() or abort() on a _GitFile for the lock to be 

170 released. Typically this will happen in a finally block. 

171 """ 

172 

173 _file: IO[bytes] 

174 _filename: str | bytes 

175 _lockfilename: str | bytes 

176 _closed: bool 

177 

178 PROXY_PROPERTIES: ClassVar[set[str]] = { 

179 "encoding", 

180 "errors", 

181 "mode", 

182 "name", 

183 "newlines", 

184 "softspace", 

185 } 

186 PROXY_METHODS: ClassVar[set[str]] = { 

187 "__iter__", 

188 "__next__", 

189 "flush", 

190 "fileno", 

191 "isatty", 

192 "read", 

193 "readable", 

194 "readline", 

195 "readlines", 

196 "seek", 

197 "seekable", 

198 "tell", 

199 "truncate", 

200 "writable", 

201 "write", 

202 "writelines", 

203 } 

204 

205 def __init__( 

206 self, 

207 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes], 

208 mode: str, 

209 bufsize: int, 

210 mask: int, 

211 fsync: bool = True, 

212 ) -> None: 

213 # Convert PathLike to str/bytes for our internal use 

214 self._filename: str | bytes = os.fspath(filename) 

215 self._fsync = fsync 

216 if isinstance(self._filename, bytes): 

217 self._lockfilename: str | bytes = self._filename + b".lock" 

218 else: 

219 self._lockfilename = self._filename + ".lock" 

220 try: 

221 fd = os.open( 

222 self._lockfilename, 

223 os.O_RDWR | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0), 

224 mask, 

225 ) 

226 except FileExistsError as exc: 

227 raise FileLocked(filename, self._lockfilename) from exc 

228 self._file = os.fdopen(fd, mode, bufsize) 

229 self._closed = False 

230 

231 def __iter__(self) -> Iterator[bytes]: 

232 """Iterate over lines in the file.""" 

233 return iter(self._file) 

234 

235 def abort(self) -> None: 

236 """Close and discard the lockfile without overwriting the target. 

237 

238 If the file is already closed, this is a no-op. 

239 """ 

240 if self._closed: 

241 return 

242 self._file.close() 

243 try: 

244 os.remove(self._lockfilename) 

245 self._closed = True 

246 except FileNotFoundError: 

247 # The file may have been removed already, which is ok. 

248 self._closed = True 

249 

250 def close(self) -> None: 

251 """Close this file, saving the lockfile over the original. 

252 

253 Note: If this method fails, it will attempt to delete the lockfile. 

254 However, it is not guaranteed to do so (e.g. if a filesystem 

255 becomes suddenly read-only), which will prevent future writes to 

256 this file until the lockfile is removed manually. 

257 

258 Raises: 

259 OSError: if the original file could not be overwritten. The 

260 lock file is still closed, so further attempts to write to the same 

261 file object will raise ValueError. 

262 """ 

263 if self._closed: 

264 return 

265 self._file.flush() 

266 if self._fsync: 

267 os.fsync(self._file.fileno()) 

268 self._file.close() 

269 try: 

270 if getattr(os, "replace", None) is not None: 

271 os.replace(self._lockfilename, self._filename) 

272 else: 

273 if sys.platform != "win32": 

274 os.rename(self._lockfilename, self._filename) 

275 else: 

276 # Windows versions prior to Vista don't support atomic 

277 # renames 

278 _fancy_rename(self._lockfilename, self._filename) 

279 finally: 

280 self.abort() 

281 

282 def __del__(self) -> None: 

283 if not getattr(self, "_closed", True): 

284 warnings.warn(f"unclosed {self!r}", ResourceWarning, stacklevel=2) 

285 self.abort() 

286 

287 def __enter__(self) -> "_GitFile": 

288 return self 

289 

290 def __exit__( 

291 self, 

292 exc_type: type[BaseException] | None, 

293 exc_val: BaseException | None, 

294 exc_tb: TracebackType | None, 

295 ) -> None: 

296 if exc_type is not None: 

297 self.abort() 

298 else: 

299 self.close() 

300 

301 def __fspath__(self) -> str | bytes: 

302 """Return the file path for os.fspath() compatibility.""" 

303 return self._filename 

304 

305 @property 

306 def closed(self) -> bool: 

307 """Return whether the file is closed.""" 

308 return self._closed 

309 

310 def __getattr__(self, name: str) -> Any: # noqa: ANN401 

311 """Proxy property calls to the underlying file.""" 

312 if name in self.PROXY_PROPERTIES: 

313 return getattr(self._file, name) 

314 raise AttributeError(name) 

315 

316 # Implement IO[bytes] methods by delegating to the underlying file 

317 def read(self, size: int = -1) -> bytes: 

318 return self._file.read(size) 

319 

320 # TODO: Remove type: ignore when Python 3.10 support is dropped (Oct 2026) 

321 # Python 3.10 has issues with IO[bytes] overload signatures 

322 def write(self, data: Buffer, /) -> int: # type: ignore[override,unused-ignore] 

323 return self._file.write(data) 

324 

325 def readline(self, size: int = -1) -> bytes: 

326 return self._file.readline(size) 

327 

328 def readlines(self, hint: int = -1) -> list[bytes]: 

329 return self._file.readlines(hint) 

330 

331 # TODO: Remove type: ignore when Python 3.10 support is dropped (Oct 2026) 

332 # Python 3.10 has issues with IO[bytes] overload signatures 

333 def writelines(self, lines: Iterable[Buffer], /) -> None: # type: ignore[override,unused-ignore] 

334 return self._file.writelines(lines) 

335 

336 def seek(self, offset: int, whence: int = 0) -> int: 

337 return self._file.seek(offset, whence) 

338 

339 def tell(self) -> int: 

340 return self._file.tell() 

341 

342 def flush(self) -> None: 

343 return self._file.flush() 

344 

345 def truncate(self, size: int | None = None) -> int: 

346 return self._file.truncate(size) 

347 

348 def fileno(self) -> int: 

349 return self._file.fileno() 

350 

351 def isatty(self) -> bool: 

352 return self._file.isatty() 

353 

354 def readable(self) -> bool: 

355 return self._file.readable() 

356 

357 def writable(self) -> bool: 

358 return self._file.writable() 

359 

360 def seekable(self) -> bool: 

361 return self._file.seekable() 

362 

363 def __next__(self) -> bytes: 

364 return next(iter(self._file))