Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/file.py: 64%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

143 statements  

1# file.py -- Safe access to git files 

2# Copyright (C) 2010 Google, Inc. 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Safe access to git files.""" 

23 

24import os 

25import sys 

26import warnings 

27from collections.abc import Iterable, Iterator 

28from types import TracebackType 

29from typing import IO, Any, ClassVar, Literal, overload 

30 

31from ._typing import Buffer 

32 

33 

34def ensure_dir_exists( 

35 dirname: str | bytes | os.PathLike[str] | os.PathLike[bytes], 

36) -> None: 

37 """Ensure a directory exists, creating if necessary.""" 

38 try: 

39 os.makedirs(dirname) 

40 except FileExistsError: 

41 pass 

42 

43 

44def _fancy_rename(oldname: str | bytes, newname: str | bytes) -> None: 

45 """Rename file with temporary backup file to rollback if rename fails.""" 

46 if not os.path.exists(newname): 

47 os.rename(oldname, newname) 

48 return 

49 

50 # Defer the tempfile import since it pulls in a lot of other things. 

51 import tempfile 

52 

53 # destination file exists 

54 (fd, tmpfile) = tempfile.mkstemp(".tmp", prefix=str(oldname), dir=".") 

55 os.close(fd) 

56 os.remove(tmpfile) 

57 os.rename(newname, tmpfile) 

58 try: 

59 os.rename(oldname, newname) 

60 except OSError: 

61 os.rename(tmpfile, newname) 

62 raise 

63 os.remove(tmpfile) 

64 

65 

66@overload 

67def GitFile( 

68 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes], 

69 mode: Literal["wb"], 

70 bufsize: int = -1, 

71 mask: int = 0o644, 

72 fsync: bool = True, 

73) -> "_GitFile": ... 

74 

75 

76@overload 

77def GitFile( 

78 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes], 

79 mode: Literal["rb"] = "rb", 

80 bufsize: int = -1, 

81 mask: int = 0o644, 

82 fsync: bool = True, 

83) -> IO[bytes]: ... 

84 

85 

86@overload 

87def GitFile( 

88 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes], 

89 mode: str = "rb", 

90 bufsize: int = -1, 

91 mask: int = 0o644, 

92 fsync: bool = True, 

93) -> "IO[bytes] | _GitFile": ... 

94 

95 

96def GitFile( 

97 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes], 

98 mode: str = "rb", 

99 bufsize: int = -1, 

100 mask: int = 0o644, 

101 fsync: bool = True, 

102) -> "IO[bytes] | _GitFile": 

103 """Create a file object that obeys the git file locking protocol. 

104 

105 Returns: a builtin file object or a _GitFile object 

106 

107 Note: See _GitFile for a description of the file locking protocol. 

108 

109 Only read-only and write-only (binary) modes are supported; r+, w+, and a 

110 are not. To read and write from the same file, you can take advantage of 

111 the fact that opening a file for write does not actually open the file you 

112 request. 

113 

114 The default file mask makes any created files user-writable and 

115 world-readable. 

116 

117 Args: 

118 filename: Path to the file 

119 mode: File mode (only 'rb' and 'wb' are supported) 

120 bufsize: Buffer size for file operations 

121 mask: File mask for created files 

122 fsync: Whether to call fsync() before closing (default: True) 

123 

124 """ 

125 if "a" in mode: 

126 raise OSError("append mode not supported for Git files") 

127 if "+" in mode: 

128 raise OSError("read/write mode not supported for Git files") 

129 if "b" not in mode: 

130 raise OSError("text mode not supported for Git files") 

131 if "w" in mode: 

132 return _GitFile(filename, mode, bufsize, mask, fsync) 

133 else: 

134 return open(filename, mode, bufsize) 

135 

136 

137class FileLocked(Exception): 

138 """File is already locked.""" 

139 

140 def __init__( 

141 self, 

142 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes], 

143 lockfilename: str | bytes, 

144 ) -> None: 

145 """Initialize FileLocked. 

146 

147 Args: 

148 filename: Name of the file that is locked 

149 lockfilename: Name of the lock file 

150 """ 

151 self.filename = filename 

152 self.lockfilename = lockfilename 

153 super().__init__(filename, lockfilename) 

154 

155 

156class _GitFile(IO[bytes]): 

157 """File that follows the git locking protocol for writes. 

158 

159 All writes to a file foo will be written into foo.lock in the same 

160 directory, and the lockfile will be renamed to overwrite the original file 

161 on close. 

162 

163 Note: You *must* call close() or abort() on a _GitFile for the lock to be 

164 released. Typically this will happen in a finally block. 

165 """ 

166 

167 _file: IO[bytes] 

168 _filename: str | bytes 

169 _lockfilename: str | bytes 

170 _closed: bool 

171 

172 PROXY_PROPERTIES: ClassVar[set[str]] = { 

173 "encoding", 

174 "errors", 

175 "mode", 

176 "name", 

177 "newlines", 

178 "softspace", 

179 } 

180 PROXY_METHODS: ClassVar[set[str]] = { 

181 "__iter__", 

182 "__next__", 

183 "flush", 

184 "fileno", 

185 "isatty", 

186 "read", 

187 "readable", 

188 "readline", 

189 "readlines", 

190 "seek", 

191 "seekable", 

192 "tell", 

193 "truncate", 

194 "writable", 

195 "write", 

196 "writelines", 

197 } 

198 

199 def __init__( 

200 self, 

201 filename: str | bytes | os.PathLike[str] | os.PathLike[bytes], 

202 mode: str, 

203 bufsize: int, 

204 mask: int, 

205 fsync: bool = True, 

206 ) -> None: 

207 # Convert PathLike to str/bytes for our internal use 

208 self._filename: str | bytes = os.fspath(filename) 

209 self._fsync = fsync 

210 if isinstance(self._filename, bytes): 

211 self._lockfilename: str | bytes = self._filename + b".lock" 

212 else: 

213 self._lockfilename = self._filename + ".lock" 

214 try: 

215 fd = os.open( 

216 self._lockfilename, 

217 os.O_RDWR | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0), 

218 mask, 

219 ) 

220 except FileExistsError as exc: 

221 raise FileLocked(filename, self._lockfilename) from exc 

222 self._file = os.fdopen(fd, mode, bufsize) 

223 self._closed = False 

224 

225 def __iter__(self) -> Iterator[bytes]: 

226 """Iterate over lines in the file.""" 

227 return iter(self._file) 

228 

229 def abort(self) -> None: 

230 """Close and discard the lockfile without overwriting the target. 

231 

232 If the file is already closed, this is a no-op. 

233 """ 

234 if self._closed: 

235 return 

236 self._file.close() 

237 try: 

238 os.remove(self._lockfilename) 

239 self._closed = True 

240 except FileNotFoundError: 

241 # The file may have been removed already, which is ok. 

242 self._closed = True 

243 

244 def close(self) -> None: 

245 """Close this file, saving the lockfile over the original. 

246 

247 Note: If this method fails, it will attempt to delete the lockfile. 

248 However, it is not guaranteed to do so (e.g. if a filesystem 

249 becomes suddenly read-only), which will prevent future writes to 

250 this file until the lockfile is removed manually. 

251 

252 Raises: 

253 OSError: if the original file could not be overwritten. The 

254 lock file is still closed, so further attempts to write to the same 

255 file object will raise ValueError. 

256 """ 

257 if self._closed: 

258 return 

259 self._file.flush() 

260 if self._fsync: 

261 os.fsync(self._file.fileno()) 

262 self._file.close() 

263 try: 

264 if getattr(os, "replace", None) is not None: 

265 os.replace(self._lockfilename, self._filename) 

266 else: 

267 if sys.platform != "win32": 

268 os.rename(self._lockfilename, self._filename) 

269 else: 

270 # Windows versions prior to Vista don't support atomic 

271 # renames 

272 _fancy_rename(self._lockfilename, self._filename) 

273 finally: 

274 self.abort() 

275 

276 def __del__(self) -> None: 

277 if not getattr(self, "_closed", True): 

278 warnings.warn(f"unclosed {self!r}", ResourceWarning, stacklevel=2) 

279 self.abort() 

280 

281 def __enter__(self) -> "_GitFile": 

282 return self 

283 

284 def __exit__( 

285 self, 

286 exc_type: type[BaseException] | None, 

287 exc_val: BaseException | None, 

288 exc_tb: TracebackType | None, 

289 ) -> None: 

290 if exc_type is not None: 

291 self.abort() 

292 else: 

293 self.close() 

294 

295 def __fspath__(self) -> str | bytes: 

296 """Return the file path for os.fspath() compatibility.""" 

297 return self._filename 

298 

299 @property 

300 def closed(self) -> bool: 

301 """Return whether the file is closed.""" 

302 return self._closed 

303 

304 def __getattr__(self, name: str) -> Any: # noqa: ANN401 

305 """Proxy property calls to the underlying file.""" 

306 if name in self.PROXY_PROPERTIES: 

307 return getattr(self._file, name) 

308 raise AttributeError(name) 

309 

310 # Implement IO[bytes] methods by delegating to the underlying file 

311 def read(self, size: int = -1) -> bytes: 

312 return self._file.read(size) 

313 

314 # TODO: Remove type: ignore when Python 3.10 support is dropped (Oct 2026) 

315 # Python 3.10 has issues with IO[bytes] overload signatures 

316 def write(self, data: Buffer, /) -> int: # type: ignore[override,unused-ignore] 

317 return self._file.write(data) 

318 

319 def readline(self, size: int = -1) -> bytes: 

320 return self._file.readline(size) 

321 

322 def readlines(self, hint: int = -1) -> list[bytes]: 

323 return self._file.readlines(hint) 

324 

325 # TODO: Remove type: ignore when Python 3.10 support is dropped (Oct 2026) 

326 # Python 3.10 has issues with IO[bytes] overload signatures 

327 def writelines(self, lines: Iterable[Buffer], /) -> None: # type: ignore[override,unused-ignore] 

328 return self._file.writelines(lines) 

329 

330 def seek(self, offset: int, whence: int = 0) -> int: 

331 return self._file.seek(offset, whence) 

332 

333 def tell(self) -> int: 

334 return self._file.tell() 

335 

336 def flush(self) -> None: 

337 return self._file.flush() 

338 

339 def truncate(self, size: int | None = None) -> int: 

340 return self._file.truncate(size) 

341 

342 def fileno(self) -> int: 

343 return self._file.fileno() 

344 

345 def isatty(self) -> bool: 

346 return self._file.isatty() 

347 

348 def readable(self) -> bool: 

349 return self._file.readable() 

350 

351 def writable(self) -> bool: 

352 return self._file.writable() 

353 

354 def seekable(self) -> bool: 

355 return self._file.seekable() 

356 

357 def __next__(self) -> bytes: 

358 return next(iter(self._file))