Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/file.py: 64%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

143 statements  

1# file.py -- Safe access to git files 

2# Copyright (C) 2010 Google, Inc. 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Safe access to git files.""" 

23 

24import os 

25import sys 

26import warnings 

27from collections.abc import Iterable, Iterator 

28from types import TracebackType 

29from typing import IO, Any, ClassVar, Literal, Optional, Union, overload 

30 

31if sys.version_info >= (3, 12): 

32 from collections.abc import Buffer 

33else: 

34 Buffer = Union[bytes, bytearray, memoryview] 

35 

36 

37def ensure_dir_exists( 

38 dirname: Union[str, bytes, os.PathLike[str], os.PathLike[bytes]], 

39) -> None: 

40 """Ensure a directory exists, creating if necessary.""" 

41 try: 

42 os.makedirs(dirname) 

43 except FileExistsError: 

44 pass 

45 

46 

47def _fancy_rename(oldname: Union[str, bytes], newname: Union[str, bytes]) -> None: 

48 """Rename file with temporary backup file to rollback if rename fails.""" 

49 if not os.path.exists(newname): 

50 os.rename(oldname, newname) 

51 return 

52 

53 # Defer the tempfile import since it pulls in a lot of other things. 

54 import tempfile 

55 

56 # destination file exists 

57 (fd, tmpfile) = tempfile.mkstemp(".tmp", prefix=str(oldname), dir=".") 

58 os.close(fd) 

59 os.remove(tmpfile) 

60 os.rename(newname, tmpfile) 

61 try: 

62 os.rename(oldname, newname) 

63 except OSError: 

64 os.rename(tmpfile, newname) 

65 raise 

66 os.remove(tmpfile) 

67 

68 

69@overload 

70def GitFile( 

71 filename: Union[str, bytes, os.PathLike[str], os.PathLike[bytes]], 

72 mode: Literal["wb"], 

73 bufsize: int = -1, 

74 mask: int = 0o644, 

75) -> "_GitFile": ... 

76 

77 

78@overload 

79def GitFile( 

80 filename: Union[str, bytes, os.PathLike[str], os.PathLike[bytes]], 

81 mode: Literal["rb"] = "rb", 

82 bufsize: int = -1, 

83 mask: int = 0o644, 

84) -> IO[bytes]: ... 

85 

86 

87@overload 

88def GitFile( 

89 filename: Union[str, bytes, os.PathLike[str], os.PathLike[bytes]], 

90 mode: str = "rb", 

91 bufsize: int = -1, 

92 mask: int = 0o644, 

93) -> Union[IO[bytes], "_GitFile"]: ... 

94 

95 

96def GitFile( 

97 filename: Union[str, bytes, os.PathLike[str], os.PathLike[bytes]], 

98 mode: str = "rb", 

99 bufsize: int = -1, 

100 mask: int = 0o644, 

101) -> Union[IO[bytes], "_GitFile"]: 

102 """Create a file object that obeys the git file locking protocol. 

103 

104 Returns: a builtin file object or a _GitFile object 

105 

106 Note: See _GitFile for a description of the file locking protocol. 

107 

108 Only read-only and write-only (binary) modes are supported; r+, w+, and a 

109 are not. To read and write from the same file, you can take advantage of 

110 the fact that opening a file for write does not actually open the file you 

111 request. 

112 

113 The default file mask makes any created files user-writable and 

114 world-readable. 

115 

116 """ 

117 if "a" in mode: 

118 raise OSError("append mode not supported for Git files") 

119 if "+" in mode: 

120 raise OSError("read/write mode not supported for Git files") 

121 if "b" not in mode: 

122 raise OSError("text mode not supported for Git files") 

123 if "w" in mode: 

124 return _GitFile(filename, mode, bufsize, mask) 

125 else: 

126 return open(filename, mode, bufsize) 

127 

128 

129class FileLocked(Exception): 

130 """File is already locked.""" 

131 

132 def __init__( 

133 self, 

134 filename: Union[str, bytes, os.PathLike[str], os.PathLike[bytes]], 

135 lockfilename: Union[str, bytes], 

136 ) -> None: 

137 """Initialize FileLocked. 

138 

139 Args: 

140 filename: Name of the file that is locked 

141 lockfilename: Name of the lock file 

142 """ 

143 self.filename = filename 

144 self.lockfilename = lockfilename 

145 super().__init__(filename, lockfilename) 

146 

147 

148class _GitFile(IO[bytes]): 

149 """File that follows the git locking protocol for writes. 

150 

151 All writes to a file foo will be written into foo.lock in the same 

152 directory, and the lockfile will be renamed to overwrite the original file 

153 on close. 

154 

155 Note: You *must* call close() or abort() on a _GitFile for the lock to be 

156 released. Typically this will happen in a finally block. 

157 """ 

158 

159 _file: IO[bytes] 

160 _filename: Union[str, bytes] 

161 _lockfilename: Union[str, bytes] 

162 _closed: bool 

163 

164 PROXY_PROPERTIES: ClassVar[set[str]] = { 

165 "encoding", 

166 "errors", 

167 "mode", 

168 "name", 

169 "newlines", 

170 "softspace", 

171 } 

172 PROXY_METHODS: ClassVar[set[str]] = { 

173 "__iter__", 

174 "__next__", 

175 "flush", 

176 "fileno", 

177 "isatty", 

178 "read", 

179 "readable", 

180 "readline", 

181 "readlines", 

182 "seek", 

183 "seekable", 

184 "tell", 

185 "truncate", 

186 "writable", 

187 "write", 

188 "writelines", 

189 } 

190 

191 def __init__( 

192 self, 

193 filename: Union[str, bytes, os.PathLike[str], os.PathLike[bytes]], 

194 mode: str, 

195 bufsize: int, 

196 mask: int, 

197 ) -> None: 

198 # Convert PathLike to str/bytes for our internal use 

199 self._filename: Union[str, bytes] = os.fspath(filename) 

200 if isinstance(self._filename, bytes): 

201 self._lockfilename: Union[str, bytes] = self._filename + b".lock" 

202 else: 

203 self._lockfilename = self._filename + ".lock" 

204 try: 

205 fd = os.open( 

206 self._lockfilename, 

207 os.O_RDWR | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0), 

208 mask, 

209 ) 

210 except FileExistsError as exc: 

211 raise FileLocked(filename, self._lockfilename) from exc 

212 self._file = os.fdopen(fd, mode, bufsize) 

213 self._closed = False 

214 

215 def __iter__(self) -> Iterator[bytes]: 

216 """Iterate over lines in the file.""" 

217 return iter(self._file) 

218 

219 def abort(self) -> None: 

220 """Close and discard the lockfile without overwriting the target. 

221 

222 If the file is already closed, this is a no-op. 

223 """ 

224 if self._closed: 

225 return 

226 self._file.close() 

227 try: 

228 os.remove(self._lockfilename) 

229 self._closed = True 

230 except FileNotFoundError: 

231 # The file may have been removed already, which is ok. 

232 self._closed = True 

233 

234 def close(self) -> None: 

235 """Close this file, saving the lockfile over the original. 

236 

237 Note: If this method fails, it will attempt to delete the lockfile. 

238 However, it is not guaranteed to do so (e.g. if a filesystem 

239 becomes suddenly read-only), which will prevent future writes to 

240 this file until the lockfile is removed manually. 

241 

242 Raises: 

243 OSError: if the original file could not be overwritten. The 

244 lock file is still closed, so further attempts to write to the same 

245 file object will raise ValueError. 

246 """ 

247 if self._closed: 

248 return 

249 self._file.flush() 

250 os.fsync(self._file.fileno()) 

251 self._file.close() 

252 try: 

253 if getattr(os, "replace", None) is not None: 

254 os.replace(self._lockfilename, self._filename) 

255 else: 

256 if sys.platform != "win32": 

257 os.rename(self._lockfilename, self._filename) 

258 else: 

259 # Windows versions prior to Vista don't support atomic 

260 # renames 

261 _fancy_rename(self._lockfilename, self._filename) 

262 finally: 

263 self.abort() 

264 

265 def __del__(self) -> None: 

266 if not getattr(self, "_closed", True): 

267 warnings.warn(f"unclosed {self!r}", ResourceWarning, stacklevel=2) 

268 self.abort() 

269 

270 def __enter__(self) -> "_GitFile": 

271 return self 

272 

273 def __exit__( 

274 self, 

275 exc_type: Optional[type[BaseException]], 

276 exc_val: Optional[BaseException], 

277 exc_tb: Optional[TracebackType], 

278 ) -> None: 

279 if exc_type is not None: 

280 self.abort() 

281 else: 

282 self.close() 

283 

284 def __fspath__(self) -> Union[str, bytes]: 

285 """Return the file path for os.fspath() compatibility.""" 

286 return self._filename 

287 

288 @property 

289 def closed(self) -> bool: 

290 """Return whether the file is closed.""" 

291 return self._closed 

292 

293 def __getattr__(self, name: str) -> Any: # noqa: ANN401 

294 """Proxy property calls to the underlying file.""" 

295 if name in self.PROXY_PROPERTIES: 

296 return getattr(self._file, name) 

297 raise AttributeError(name) 

298 

299 # Implement IO[bytes] methods by delegating to the underlying file 

300 def read(self, size: int = -1) -> bytes: 

301 return self._file.read(size) 

302 

303 # TODO: Remove type: ignore when Python 3.10 support is dropped (Oct 2026) 

304 # Python 3.9/3.10 have issues with IO[bytes] overload signatures 

305 def write(self, data: Buffer, /) -> int: # type: ignore[override,unused-ignore] 

306 return self._file.write(data) 

307 

308 def readline(self, size: int = -1) -> bytes: 

309 return self._file.readline(size) 

310 

311 def readlines(self, hint: int = -1) -> list[bytes]: 

312 return self._file.readlines(hint) 

313 

314 # TODO: Remove type: ignore when Python 3.10 support is dropped (Oct 2026) 

315 # Python 3.9/3.10 have issues with IO[bytes] overload signatures 

316 def writelines(self, lines: Iterable[Buffer], /) -> None: # type: ignore[override,unused-ignore] 

317 return self._file.writelines(lines) 

318 

319 def seek(self, offset: int, whence: int = 0) -> int: 

320 return self._file.seek(offset, whence) 

321 

322 def tell(self) -> int: 

323 return self._file.tell() 

324 

325 def flush(self) -> None: 

326 return self._file.flush() 

327 

328 def truncate(self, size: Optional[int] = None) -> int: 

329 return self._file.truncate(size) 

330 

331 def fileno(self) -> int: 

332 return self._file.fileno() 

333 

334 def isatty(self) -> bool: 

335 return self._file.isatty() 

336 

337 def readable(self) -> bool: 

338 return self._file.readable() 

339 

340 def writable(self) -> bool: 

341 return self._file.writable() 

342 

343 def seekable(self) -> bool: 

344 return self._file.seekable() 

345 

346 def __next__(self) -> bytes: 

347 return next(iter(self._file))