Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/file.py: 64%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

109 statements  

1# file.py -- Safe access to git files 

2# Copyright (C) 2010 Google, Inc. 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Safe access to git files.""" 

23 

24import os 

25import sys 

26import warnings 

27from collections.abc import Iterator 

28from types import TracebackType 

29from typing import IO, Any, ClassVar, Literal, Optional, Union, overload 

30 

31 

32def ensure_dir_exists(dirname: Union[str, bytes, os.PathLike]) -> None: 

33 """Ensure a directory exists, creating if necessary.""" 

34 try: 

35 os.makedirs(dirname) 

36 except FileExistsError: 

37 pass 

38 

39 

40def _fancy_rename(oldname: Union[str, bytes], newname: Union[str, bytes]) -> None: 

41 """Rename file with temporary backup file to rollback if rename fails.""" 

42 if not os.path.exists(newname): 

43 os.rename(oldname, newname) 

44 return 

45 

46 # Defer the tempfile import since it pulls in a lot of other things. 

47 import tempfile 

48 

49 # destination file exists 

50 (fd, tmpfile) = tempfile.mkstemp(".tmp", prefix=str(oldname), dir=".") 

51 os.close(fd) 

52 os.remove(tmpfile) 

53 os.rename(newname, tmpfile) 

54 try: 

55 os.rename(oldname, newname) 

56 except OSError: 

57 os.rename(tmpfile, newname) 

58 raise 

59 os.remove(tmpfile) 

60 

61 

62@overload 

63def GitFile( 

64 filename: Union[str, bytes, os.PathLike], 

65 mode: Literal["wb"], 

66 bufsize: int = -1, 

67 mask: int = 0o644, 

68) -> "_GitFile": ... 

69 

70 

71@overload 

72def GitFile( 

73 filename: Union[str, bytes, os.PathLike], 

74 mode: Literal["rb"] = "rb", 

75 bufsize: int = -1, 

76 mask: int = 0o644, 

77) -> IO[bytes]: ... 

78 

79 

80@overload 

81def GitFile( 

82 filename: Union[str, bytes, os.PathLike], 

83 mode: str = "rb", 

84 bufsize: int = -1, 

85 mask: int = 0o644, 

86) -> Union[IO[bytes], "_GitFile"]: ... 

87 

88 

89def GitFile( 

90 filename: Union[str, bytes, os.PathLike], 

91 mode: str = "rb", 

92 bufsize: int = -1, 

93 mask: int = 0o644, 

94) -> Union[IO[bytes], "_GitFile"]: 

95 """Create a file object that obeys the git file locking protocol. 

96 

97 Returns: a builtin file object or a _GitFile object 

98 

99 Note: See _GitFile for a description of the file locking protocol. 

100 

101 Only read-only and write-only (binary) modes are supported; r+, w+, and a 

102 are not. To read and write from the same file, you can take advantage of 

103 the fact that opening a file for write does not actually open the file you 

104 request. 

105 

106 The default file mask makes any created files user-writable and 

107 world-readable. 

108 

109 """ 

110 if "a" in mode: 

111 raise OSError("append mode not supported for Git files") 

112 if "+" in mode: 

113 raise OSError("read/write mode not supported for Git files") 

114 if "b" not in mode: 

115 raise OSError("text mode not supported for Git files") 

116 if "w" in mode: 

117 return _GitFile(filename, mode, bufsize, mask) 

118 else: 

119 return open(filename, mode, bufsize) 

120 

121 

122class FileLocked(Exception): 

123 """File is already locked.""" 

124 

125 def __init__( 

126 self, filename: Union[str, bytes, os.PathLike], lockfilename: Union[str, bytes] 

127 ) -> None: 

128 self.filename = filename 

129 self.lockfilename = lockfilename 

130 super().__init__(filename, lockfilename) 

131 

132 

133class _GitFile: 

134 """File that follows the git locking protocol for writes. 

135 

136 All writes to a file foo will be written into foo.lock in the same 

137 directory, and the lockfile will be renamed to overwrite the original file 

138 on close. 

139 

140 Note: You *must* call close() or abort() on a _GitFile for the lock to be 

141 released. Typically this will happen in a finally block. 

142 """ 

143 

144 PROXY_PROPERTIES: ClassVar[set[str]] = { 

145 "closed", 

146 "encoding", 

147 "errors", 

148 "mode", 

149 "name", 

150 "newlines", 

151 "softspace", 

152 } 

153 PROXY_METHODS: ClassVar[set[str]] = { 

154 "__iter__", 

155 "flush", 

156 "fileno", 

157 "isatty", 

158 "read", 

159 "readline", 

160 "readlines", 

161 "seek", 

162 "tell", 

163 "truncate", 

164 "write", 

165 "writelines", 

166 } 

167 

168 def __init__( 

169 self, 

170 filename: Union[str, bytes, os.PathLike], 

171 mode: str, 

172 bufsize: int, 

173 mask: int, 

174 ) -> None: 

175 # Convert PathLike to str/bytes for our internal use 

176 self._filename: Union[str, bytes] = os.fspath(filename) 

177 if isinstance(self._filename, bytes): 

178 self._lockfilename: Union[str, bytes] = self._filename + b".lock" 

179 else: 

180 self._lockfilename = self._filename + ".lock" 

181 try: 

182 fd = os.open( 

183 self._lockfilename, 

184 os.O_RDWR | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0), 

185 mask, 

186 ) 

187 except FileExistsError as exc: 

188 raise FileLocked(filename, self._lockfilename) from exc 

189 self._file = os.fdopen(fd, mode, bufsize) 

190 self._closed = False 

191 

192 for method in self.PROXY_METHODS: 

193 setattr(self, method, getattr(self._file, method)) 

194 

195 def __iter__(self) -> Iterator[bytes]: 

196 """Iterate over lines in the file.""" 

197 return iter(self._file) 

198 

199 def abort(self) -> None: 

200 """Close and discard the lockfile without overwriting the target. 

201 

202 If the file is already closed, this is a no-op. 

203 """ 

204 if self._closed: 

205 return 

206 self._file.close() 

207 try: 

208 os.remove(self._lockfilename) 

209 self._closed = True 

210 except FileNotFoundError: 

211 # The file may have been removed already, which is ok. 

212 self._closed = True 

213 

214 def close(self) -> None: 

215 """Close this file, saving the lockfile over the original. 

216 

217 Note: If this method fails, it will attempt to delete the lockfile. 

218 However, it is not guaranteed to do so (e.g. if a filesystem 

219 becomes suddenly read-only), which will prevent future writes to 

220 this file until the lockfile is removed manually. 

221 

222 Raises: 

223 OSError: if the original file could not be overwritten. The 

224 lock file is still closed, so further attempts to write to the same 

225 file object will raise ValueError. 

226 """ 

227 if self._closed: 

228 return 

229 self._file.flush() 

230 os.fsync(self._file.fileno()) 

231 self._file.close() 

232 try: 

233 if getattr(os, "replace", None) is not None: 

234 os.replace(self._lockfilename, self._filename) 

235 else: 

236 if sys.platform != "win32": 

237 os.rename(self._lockfilename, self._filename) 

238 else: 

239 # Windows versions prior to Vista don't support atomic 

240 # renames 

241 _fancy_rename(self._lockfilename, self._filename) 

242 finally: 

243 self.abort() 

244 

245 def __del__(self) -> None: 

246 if not getattr(self, "_closed", True): 

247 warnings.warn(f"unclosed {self!r}", ResourceWarning, stacklevel=2) 

248 self.abort() 

249 

250 def __enter__(self) -> "_GitFile": 

251 return self 

252 

253 def __exit__( 

254 self, 

255 exc_type: Optional[type[BaseException]], 

256 exc_val: Optional[BaseException], 

257 exc_tb: Optional[TracebackType], 

258 ) -> None: 

259 if exc_type is not None: 

260 self.abort() 

261 else: 

262 self.close() 

263 

264 def __getattr__(self, name: str) -> Any: # noqa: ANN401 

265 """Proxy property calls to the underlying file.""" 

266 if name in self.PROXY_PROPERTIES: 

267 return getattr(self._file, name) 

268 raise AttributeError(name) 

269 

270 def readable(self) -> bool: 

271 """Return whether the file is readable.""" 

272 return self._file.readable() 

273 

274 def writable(self) -> bool: 

275 """Return whether the file is writable.""" 

276 return self._file.writable() 

277 

278 def seekable(self) -> bool: 

279 """Return whether the file is seekable.""" 

280 return self._file.seekable()