Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/file.py: 64%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

145 statements  

1# file.py -- Safe access to git files 

2# Copyright (C) 2010 Google, Inc. 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Safe access to git files.""" 

23 

24import os 

25import sys 

26import warnings 

27from collections.abc import Iterable, Iterator 

28from types import TracebackType 

29from typing import IO, Any, ClassVar, Literal, Optional, Union, overload 

30 

31if sys.version_info >= (3, 12): 

32 from collections.abc import Buffer 

33else: 

34 Buffer = Union[bytes, bytearray, memoryview] 

35 

36 

37def ensure_dir_exists( 

38 dirname: Union[str, bytes, os.PathLike[str], os.PathLike[bytes]], 

39) -> None: 

40 """Ensure a directory exists, creating if necessary.""" 

41 try: 

42 os.makedirs(dirname) 

43 except FileExistsError: 

44 pass 

45 

46 

47def _fancy_rename(oldname: Union[str, bytes], newname: Union[str, bytes]) -> None: 

48 """Rename file with temporary backup file to rollback if rename fails.""" 

49 if not os.path.exists(newname): 

50 os.rename(oldname, newname) 

51 return 

52 

53 # Defer the tempfile import since it pulls in a lot of other things. 

54 import tempfile 

55 

56 # destination file exists 

57 (fd, tmpfile) = tempfile.mkstemp(".tmp", prefix=str(oldname), dir=".") 

58 os.close(fd) 

59 os.remove(tmpfile) 

60 os.rename(newname, tmpfile) 

61 try: 

62 os.rename(oldname, newname) 

63 except OSError: 

64 os.rename(tmpfile, newname) 

65 raise 

66 os.remove(tmpfile) 

67 

68 

69@overload 

70def GitFile( 

71 filename: Union[str, bytes, os.PathLike[str], os.PathLike[bytes]], 

72 mode: Literal["wb"], 

73 bufsize: int = -1, 

74 mask: int = 0o644, 

75 fsync: bool = True, 

76) -> "_GitFile": ... 

77 

78 

79@overload 

80def GitFile( 

81 filename: Union[str, bytes, os.PathLike[str], os.PathLike[bytes]], 

82 mode: Literal["rb"] = "rb", 

83 bufsize: int = -1, 

84 mask: int = 0o644, 

85 fsync: bool = True, 

86) -> IO[bytes]: ... 

87 

88 

89@overload 

90def GitFile( 

91 filename: Union[str, bytes, os.PathLike[str], os.PathLike[bytes]], 

92 mode: str = "rb", 

93 bufsize: int = -1, 

94 mask: int = 0o644, 

95 fsync: bool = True, 

96) -> Union[IO[bytes], "_GitFile"]: ... 

97 

98 

99def GitFile( 

100 filename: Union[str, bytes, os.PathLike[str], os.PathLike[bytes]], 

101 mode: str = "rb", 

102 bufsize: int = -1, 

103 mask: int = 0o644, 

104 fsync: bool = True, 

105) -> Union[IO[bytes], "_GitFile"]: 

106 """Create a file object that obeys the git file locking protocol. 

107 

108 Returns: a builtin file object or a _GitFile object 

109 

110 Note: See _GitFile for a description of the file locking protocol. 

111 

112 Only read-only and write-only (binary) modes are supported; r+, w+, and a 

113 are not. To read and write from the same file, you can take advantage of 

114 the fact that opening a file for write does not actually open the file you 

115 request. 

116 

117 The default file mask makes any created files user-writable and 

118 world-readable. 

119 

120 Args: 

121 filename: Path to the file 

122 mode: File mode (only 'rb' and 'wb' are supported) 

123 bufsize: Buffer size for file operations 

124 mask: File mask for created files 

125 fsync: Whether to call fsync() before closing (default: True) 

126 

127 """ 

128 if "a" in mode: 

129 raise OSError("append mode not supported for Git files") 

130 if "+" in mode: 

131 raise OSError("read/write mode not supported for Git files") 

132 if "b" not in mode: 

133 raise OSError("text mode not supported for Git files") 

134 if "w" in mode: 

135 return _GitFile(filename, mode, bufsize, mask, fsync) 

136 else: 

137 return open(filename, mode, bufsize) 

138 

139 

140class FileLocked(Exception): 

141 """File is already locked.""" 

142 

143 def __init__( 

144 self, 

145 filename: Union[str, bytes, os.PathLike[str], os.PathLike[bytes]], 

146 lockfilename: Union[str, bytes], 

147 ) -> None: 

148 """Initialize FileLocked. 

149 

150 Args: 

151 filename: Name of the file that is locked 

152 lockfilename: Name of the lock file 

153 """ 

154 self.filename = filename 

155 self.lockfilename = lockfilename 

156 super().__init__(filename, lockfilename) 

157 

158 

159class _GitFile(IO[bytes]): 

160 """File that follows the git locking protocol for writes. 

161 

162 All writes to a file foo will be written into foo.lock in the same 

163 directory, and the lockfile will be renamed to overwrite the original file 

164 on close. 

165 

166 Note: You *must* call close() or abort() on a _GitFile for the lock to be 

167 released. Typically this will happen in a finally block. 

168 """ 

169 

170 _file: IO[bytes] 

171 _filename: Union[str, bytes] 

172 _lockfilename: Union[str, bytes] 

173 _closed: bool 

174 

175 PROXY_PROPERTIES: ClassVar[set[str]] = { 

176 "encoding", 

177 "errors", 

178 "mode", 

179 "name", 

180 "newlines", 

181 "softspace", 

182 } 

183 PROXY_METHODS: ClassVar[set[str]] = { 

184 "__iter__", 

185 "__next__", 

186 "flush", 

187 "fileno", 

188 "isatty", 

189 "read", 

190 "readable", 

191 "readline", 

192 "readlines", 

193 "seek", 

194 "seekable", 

195 "tell", 

196 "truncate", 

197 "writable", 

198 "write", 

199 "writelines", 

200 } 

201 

202 def __init__( 

203 self, 

204 filename: Union[str, bytes, os.PathLike[str], os.PathLike[bytes]], 

205 mode: str, 

206 bufsize: int, 

207 mask: int, 

208 fsync: bool = True, 

209 ) -> None: 

210 # Convert PathLike to str/bytes for our internal use 

211 self._filename: Union[str, bytes] = os.fspath(filename) 

212 self._fsync = fsync 

213 if isinstance(self._filename, bytes): 

214 self._lockfilename: Union[str, bytes] = self._filename + b".lock" 

215 else: 

216 self._lockfilename = self._filename + ".lock" 

217 try: 

218 fd = os.open( 

219 self._lockfilename, 

220 os.O_RDWR | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0), 

221 mask, 

222 ) 

223 except FileExistsError as exc: 

224 raise FileLocked(filename, self._lockfilename) from exc 

225 self._file = os.fdopen(fd, mode, bufsize) 

226 self._closed = False 

227 

228 def __iter__(self) -> Iterator[bytes]: 

229 """Iterate over lines in the file.""" 

230 return iter(self._file) 

231 

232 def abort(self) -> None: 

233 """Close and discard the lockfile without overwriting the target. 

234 

235 If the file is already closed, this is a no-op. 

236 """ 

237 if self._closed: 

238 return 

239 self._file.close() 

240 try: 

241 os.remove(self._lockfilename) 

242 self._closed = True 

243 except FileNotFoundError: 

244 # The file may have been removed already, which is ok. 

245 self._closed = True 

246 

247 def close(self) -> None: 

248 """Close this file, saving the lockfile over the original. 

249 

250 Note: If this method fails, it will attempt to delete the lockfile. 

251 However, it is not guaranteed to do so (e.g. if a filesystem 

252 becomes suddenly read-only), which will prevent future writes to 

253 this file until the lockfile is removed manually. 

254 

255 Raises: 

256 OSError: if the original file could not be overwritten. The 

257 lock file is still closed, so further attempts to write to the same 

258 file object will raise ValueError. 

259 """ 

260 if self._closed: 

261 return 

262 self._file.flush() 

263 if self._fsync: 

264 os.fsync(self._file.fileno()) 

265 self._file.close() 

266 try: 

267 if getattr(os, "replace", None) is not None: 

268 os.replace(self._lockfilename, self._filename) 

269 else: 

270 if sys.platform != "win32": 

271 os.rename(self._lockfilename, self._filename) 

272 else: 

273 # Windows versions prior to Vista don't support atomic 

274 # renames 

275 _fancy_rename(self._lockfilename, self._filename) 

276 finally: 

277 self.abort() 

278 

279 def __del__(self) -> None: 

280 if not getattr(self, "_closed", True): 

281 warnings.warn(f"unclosed {self!r}", ResourceWarning, stacklevel=2) 

282 self.abort() 

283 

284 def __enter__(self) -> "_GitFile": 

285 return self 

286 

287 def __exit__( 

288 self, 

289 exc_type: Optional[type[BaseException]], 

290 exc_val: Optional[BaseException], 

291 exc_tb: Optional[TracebackType], 

292 ) -> None: 

293 if exc_type is not None: 

294 self.abort() 

295 else: 

296 self.close() 

297 

298 def __fspath__(self) -> Union[str, bytes]: 

299 """Return the file path for os.fspath() compatibility.""" 

300 return self._filename 

301 

302 @property 

303 def closed(self) -> bool: 

304 """Return whether the file is closed.""" 

305 return self._closed 

306 

307 def __getattr__(self, name: str) -> Any: # noqa: ANN401 

308 """Proxy property calls to the underlying file.""" 

309 if name in self.PROXY_PROPERTIES: 

310 return getattr(self._file, name) 

311 raise AttributeError(name) 

312 

313 # Implement IO[bytes] methods by delegating to the underlying file 

314 def read(self, size: int = -1) -> bytes: 

315 return self._file.read(size) 

316 

317 # TODO: Remove type: ignore when Python 3.10 support is dropped (Oct 2026) 

318 # Python 3.9/3.10 have issues with IO[bytes] overload signatures 

319 def write(self, data: Buffer, /) -> int: # type: ignore[override,unused-ignore] 

320 return self._file.write(data) 

321 

322 def readline(self, size: int = -1) -> bytes: 

323 return self._file.readline(size) 

324 

325 def readlines(self, hint: int = -1) -> list[bytes]: 

326 return self._file.readlines(hint) 

327 

328 # TODO: Remove type: ignore when Python 3.10 support is dropped (Oct 2026) 

329 # Python 3.9/3.10 have issues with IO[bytes] overload signatures 

330 def writelines(self, lines: Iterable[Buffer], /) -> None: # type: ignore[override,unused-ignore] 

331 return self._file.writelines(lines) 

332 

333 def seek(self, offset: int, whence: int = 0) -> int: 

334 return self._file.seek(offset, whence) 

335 

336 def tell(self) -> int: 

337 return self._file.tell() 

338 

339 def flush(self) -> None: 

340 return self._file.flush() 

341 

342 def truncate(self, size: Optional[int] = None) -> int: 

343 return self._file.truncate(size) 

344 

345 def fileno(self) -> int: 

346 return self._file.fileno() 

347 

348 def isatty(self) -> bool: 

349 return self._file.isatty() 

350 

351 def readable(self) -> bool: 

352 return self._file.readable() 

353 

354 def writable(self) -> bool: 

355 return self._file.writable() 

356 

357 def seekable(self) -> bool: 

358 return self._file.seekable() 

359 

360 def __next__(self) -> bytes: 

361 return next(iter(self._file))