Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/file.py: 62%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

93 statements  

1# file.py -- Safe access to git files 

2# Copyright (C) 2010 Google, Inc. 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as public by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Safe access to git files.""" 

23 

24import os 

25import sys 

26import warnings 

27from typing import ClassVar, Union 

28 

29 

30def ensure_dir_exists(dirname) -> None: 

31 """Ensure a directory exists, creating if necessary.""" 

32 try: 

33 os.makedirs(dirname) 

34 except FileExistsError: 

35 pass 

36 

37 

38def _fancy_rename(oldname, newname) -> None: 

39 """Rename file with temporary backup file to rollback if rename fails.""" 

40 if not os.path.exists(newname): 

41 os.rename(oldname, newname) 

42 return 

43 

44 # Defer the tempfile import since it pulls in a lot of other things. 

45 import tempfile 

46 

47 # destination file exists 

48 (fd, tmpfile) = tempfile.mkstemp(".tmp", prefix=oldname, dir=".") 

49 os.close(fd) 

50 os.remove(tmpfile) 

51 os.rename(newname, tmpfile) 

52 try: 

53 os.rename(oldname, newname) 

54 except OSError: 

55 os.rename(tmpfile, newname) 

56 raise 

57 os.remove(tmpfile) 

58 

59 

60def GitFile( 

61 filename: Union[str, bytes, os.PathLike], mode="rb", bufsize=-1, mask=0o644 

62): 

63 """Create a file object that obeys the git file locking protocol. 

64 

65 Returns: a builtin file object or a _GitFile object 

66 

67 Note: See _GitFile for a description of the file locking protocol. 

68 

69 Only read-only and write-only (binary) modes are supported; r+, w+, and a 

70 are not. To read and write from the same file, you can take advantage of 

71 the fact that opening a file for write does not actually open the file you 

72 request. 

73 

74 The default file mask makes any created files user-writable and 

75 world-readable. 

76 

77 """ 

78 if "a" in mode: 

79 raise OSError("append mode not supported for Git files") 

80 if "+" in mode: 

81 raise OSError("read/write mode not supported for Git files") 

82 if "b" not in mode: 

83 raise OSError("text mode not supported for Git files") 

84 if "w" in mode: 

85 return _GitFile(filename, mode, bufsize, mask) 

86 else: 

87 return open(filename, mode, bufsize) 

88 

89 

90class FileLocked(Exception): 

91 """File is already locked.""" 

92 

93 def __init__(self, filename, lockfilename) -> None: 

94 self.filename = filename 

95 self.lockfilename = lockfilename 

96 super().__init__(filename, lockfilename) 

97 

98 

99class _GitFile: 

100 """File that follows the git locking protocol for writes. 

101 

102 All writes to a file foo will be written into foo.lock in the same 

103 directory, and the lockfile will be renamed to overwrite the original file 

104 on close. 

105 

106 Note: You *must* call close() or abort() on a _GitFile for the lock to be 

107 released. Typically this will happen in a finally block. 

108 """ 

109 

110 PROXY_PROPERTIES: ClassVar[set[str]] = { 

111 "closed", 

112 "encoding", 

113 "errors", 

114 "mode", 

115 "name", 

116 "newlines", 

117 "softspace", 

118 } 

119 PROXY_METHODS: ClassVar[set[str]] = { 

120 "__iter__", 

121 "flush", 

122 "fileno", 

123 "isatty", 

124 "read", 

125 "readline", 

126 "readlines", 

127 "seek", 

128 "tell", 

129 "truncate", 

130 "write", 

131 "writelines", 

132 } 

133 

134 def __init__( 

135 self, filename: Union[str, bytes, os.PathLike], mode, bufsize, mask 

136 ) -> None: 

137 # Convert PathLike to str/bytes for our internal use 

138 self._filename: Union[str, bytes] = os.fspath(filename) 

139 if isinstance(self._filename, bytes): 

140 self._lockfilename: Union[str, bytes] = self._filename + b".lock" 

141 else: 

142 self._lockfilename = self._filename + ".lock" 

143 try: 

144 fd = os.open( 

145 self._lockfilename, 

146 os.O_RDWR | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0), 

147 mask, 

148 ) 

149 except FileExistsError as exc: 

150 raise FileLocked(filename, self._lockfilename) from exc 

151 self._file = os.fdopen(fd, mode, bufsize) 

152 self._closed = False 

153 

154 for method in self.PROXY_METHODS: 

155 setattr(self, method, getattr(self._file, method)) 

156 

157 def abort(self) -> None: 

158 """Close and discard the lockfile without overwriting the target. 

159 

160 If the file is already closed, this is a no-op. 

161 """ 

162 if self._closed: 

163 return 

164 self._file.close() 

165 try: 

166 os.remove(self._lockfilename) 

167 self._closed = True 

168 except FileNotFoundError: 

169 # The file may have been removed already, which is ok. 

170 self._closed = True 

171 

172 def close(self) -> None: 

173 """Close this file, saving the lockfile over the original. 

174 

175 Note: If this method fails, it will attempt to delete the lockfile. 

176 However, it is not guaranteed to do so (e.g. if a filesystem 

177 becomes suddenly read-only), which will prevent future writes to 

178 this file until the lockfile is removed manually. 

179 

180 Raises: 

181 OSError: if the original file could not be overwritten. The 

182 lock file is still closed, so further attempts to write to the same 

183 file object will raise ValueError. 

184 """ 

185 if self._closed: 

186 return 

187 self._file.flush() 

188 os.fsync(self._file.fileno()) 

189 self._file.close() 

190 try: 

191 if getattr(os, "replace", None) is not None: 

192 os.replace(self._lockfilename, self._filename) 

193 else: 

194 if sys.platform != "win32": 

195 os.rename(self._lockfilename, self._filename) 

196 else: 

197 # Windows versions prior to Vista don't support atomic 

198 # renames 

199 _fancy_rename(self._lockfilename, self._filename) 

200 finally: 

201 self.abort() 

202 

203 def __del__(self) -> None: 

204 if not getattr(self, "_closed", True): 

205 warnings.warn(f"unclosed {self!r}", ResourceWarning, stacklevel=2) 

206 self.abort() 

207 

208 def __enter__(self): 

209 return self 

210 

211 def __exit__(self, exc_type, exc_val, exc_tb): 

212 if exc_type is not None: 

213 self.abort() 

214 else: 

215 self.close() 

216 

217 def __getattr__(self, name): 

218 """Proxy property calls to the underlying file.""" 

219 if name in self.PROXY_PROPERTIES: 

220 return getattr(self._file, name) 

221 raise AttributeError(name)