Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/file.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

102 statements  

1# file.py -- Safe access to git files 

2# Copyright (C) 2010 Google, Inc. 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as public by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Safe access to git files.""" 

23 

24import os 

25import sys 

26import warnings 

27from typing import ClassVar, Union 

28 

29 

30def ensure_dir_exists(dirname) -> None: 

31 """Ensure a directory exists, creating if necessary.""" 

32 try: 

33 os.makedirs(dirname) 

34 except FileExistsError: 

35 pass 

36 

37 

38def _fancy_rename(oldname, newname) -> None: 

39 """Rename file with temporary backup file to rollback if rename fails.""" 

40 if not os.path.exists(newname): 

41 try: 

42 os.rename(oldname, newname) 

43 except OSError: 

44 raise 

45 return 

46 

47 # Defer the tempfile import since it pulls in a lot of other things. 

48 import tempfile 

49 

50 # destination file exists 

51 try: 

52 (fd, tmpfile) = tempfile.mkstemp(".tmp", prefix=oldname, dir=".") 

53 os.close(fd) 

54 os.remove(tmpfile) 

55 except OSError: 

56 # either file could not be created (e.g. permission problem) 

57 # or could not be deleted (e.g. rude virus scanner) 

58 raise 

59 try: 

60 os.rename(newname, tmpfile) 

61 except OSError: 

62 raise # no rename occurred 

63 try: 

64 os.rename(oldname, newname) 

65 except OSError: 

66 os.rename(tmpfile, newname) 

67 raise 

68 os.remove(tmpfile) 

69 

70 

71def GitFile( 

72 filename: Union[str, bytes, os.PathLike], mode="rb", bufsize=-1, mask=0o644 

73): 

74 """Create a file object that obeys the git file locking protocol. 

75 

76 Returns: a builtin file object or a _GitFile object 

77 

78 Note: See _GitFile for a description of the file locking protocol. 

79 

80 Only read-only and write-only (binary) modes are supported; r+, w+, and a 

81 are not. To read and write from the same file, you can take advantage of 

82 the fact that opening a file for write does not actually open the file you 

83 request. 

84 

85 The default file mask makes any created files user-writable and 

86 world-readable. 

87 

88 """ 

89 if "a" in mode: 

90 raise OSError("append mode not supported for Git files") 

91 if "+" in mode: 

92 raise OSError("read/write mode not supported for Git files") 

93 if "b" not in mode: 

94 raise OSError("text mode not supported for Git files") 

95 if "w" in mode: 

96 return _GitFile(filename, mode, bufsize, mask) 

97 else: 

98 return open(filename, mode, bufsize) 

99 

100 

101class FileLocked(Exception): 

102 """File is already locked.""" 

103 

104 def __init__(self, filename, lockfilename) -> None: 

105 self.filename = filename 

106 self.lockfilename = lockfilename 

107 super().__init__(filename, lockfilename) 

108 

109 

110class _GitFile: 

111 """File that follows the git locking protocol for writes. 

112 

113 All writes to a file foo will be written into foo.lock in the same 

114 directory, and the lockfile will be renamed to overwrite the original file 

115 on close. 

116 

117 Note: You *must* call close() or abort() on a _GitFile for the lock to be 

118 released. Typically this will happen in a finally block. 

119 """ 

120 

121 PROXY_PROPERTIES: ClassVar[set[str]] = { 

122 "closed", 

123 "encoding", 

124 "errors", 

125 "mode", 

126 "name", 

127 "newlines", 

128 "softspace", 

129 } 

130 PROXY_METHODS: ClassVar[set[str]] = { 

131 "__iter__", 

132 "flush", 

133 "fileno", 

134 "isatty", 

135 "read", 

136 "readline", 

137 "readlines", 

138 "seek", 

139 "tell", 

140 "truncate", 

141 "write", 

142 "writelines", 

143 } 

144 

145 def __init__( 

146 self, filename: Union[str, bytes, os.PathLike], mode, bufsize, mask 

147 ) -> None: 

148 # Convert PathLike to str/bytes for our internal use 

149 self._filename: Union[str, bytes] = os.fspath(filename) 

150 if isinstance(self._filename, bytes): 

151 self._lockfilename: Union[str, bytes] = self._filename + b".lock" 

152 else: 

153 self._lockfilename = self._filename + ".lock" 

154 try: 

155 fd = os.open( 

156 self._lockfilename, 

157 os.O_RDWR | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0), 

158 mask, 

159 ) 

160 except FileExistsError as exc: 

161 raise FileLocked(filename, self._lockfilename) from exc 

162 self._file = os.fdopen(fd, mode, bufsize) 

163 self._closed = False 

164 

165 for method in self.PROXY_METHODS: 

166 setattr(self, method, getattr(self._file, method)) 

167 

168 def abort(self) -> None: 

169 """Close and discard the lockfile without overwriting the target. 

170 

171 If the file is already closed, this is a no-op. 

172 """ 

173 if self._closed: 

174 return 

175 self._file.close() 

176 try: 

177 os.remove(self._lockfilename) 

178 self._closed = True 

179 except FileNotFoundError: 

180 # The file may have been removed already, which is ok. 

181 self._closed = True 

182 

183 def close(self) -> None: 

184 """Close this file, saving the lockfile over the original. 

185 

186 Note: If this method fails, it will attempt to delete the lockfile. 

187 However, it is not guaranteed to do so (e.g. if a filesystem 

188 becomes suddenly read-only), which will prevent future writes to 

189 this file until the lockfile is removed manually. 

190 

191 Raises: 

192 OSError: if the original file could not be overwritten. The 

193 lock file is still closed, so further attempts to write to the same 

194 file object will raise ValueError. 

195 """ 

196 if self._closed: 

197 return 

198 self._file.flush() 

199 os.fsync(self._file.fileno()) 

200 self._file.close() 

201 try: 

202 if getattr(os, "replace", None) is not None: 

203 os.replace(self._lockfilename, self._filename) 

204 else: 

205 if sys.platform != "win32": 

206 os.rename(self._lockfilename, self._filename) 

207 else: 

208 # Windows versions prior to Vista don't support atomic 

209 # renames 

210 _fancy_rename(self._lockfilename, self._filename) 

211 finally: 

212 self.abort() 

213 

214 def __del__(self) -> None: 

215 if not getattr(self, "_closed", True): 

216 warnings.warn(f"unclosed {self!r}", ResourceWarning, stacklevel=2) 

217 self.abort() 

218 

219 def __enter__(self): 

220 return self 

221 

222 def __exit__(self, exc_type, exc_val, exc_tb): 

223 if exc_type is not None: 

224 self.abort() 

225 else: 

226 self.close() 

227 

228 def __getattr__(self, name): 

229 """Proxy property calls to the underlying file.""" 

230 if name in self.PROXY_PROPERTIES: 

231 return getattr(self._file, name) 

232 raise AttributeError(name)