Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/reflog.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

98 statements  

1# reflog.py -- Parsing and writing reflog files 

2# Copyright (C) 2015 Jelmer Vernooij and others. 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Utilities for reading and generating reflogs.""" 

23 

24import collections 

25from collections.abc import Callable, Generator 

26from typing import IO, BinaryIO 

27 

28from .file import _GitFile 

29from .objects import ZERO_SHA, format_timezone, parse_timezone 

30 

31Entry = collections.namedtuple( 

32 "Entry", 

33 ["old_sha", "new_sha", "committer", "timestamp", "timezone", "message"], 

34) 

35 

36 

37def parse_reflog_spec(refspec: str | bytes) -> tuple[bytes, int]: 

38 """Parse a reflog specification like 'HEAD@{1}' or 'refs/heads/master@{2}'. 

39 

40 Args: 

41 refspec: Reflog specification (e.g., 'HEAD@{1}', 'master@{0}') 

42 

43 Returns: 

44 Tuple of (ref_name, index) where index is in Git reflog order (0 = newest) 

45 

46 Raises: 

47 ValueError: If the refspec is not a valid reflog specification 

48 """ 

49 if isinstance(refspec, str): 

50 refspec = refspec.encode("utf-8") 

51 

52 if b"@{" not in refspec: 

53 raise ValueError( 

54 f"Invalid reflog spec: {refspec!r}. Expected format: ref@{{n}}" 

55 ) 

56 

57 ref, rest = refspec.split(b"@{", 1) 

58 if not rest.endswith(b"}"): 

59 raise ValueError( 

60 f"Invalid reflog spec: {refspec!r}. Expected format: ref@{{n}}" 

61 ) 

62 

63 index_str = rest[:-1] 

64 if not index_str.isdigit(): 

65 raise ValueError( 

66 f"Invalid reflog index: {index_str!r}. Expected integer in ref@{{n}}" 

67 ) 

68 

69 # Use HEAD if no ref specified (e.g., "@{1}") 

70 if not ref: 

71 ref = b"HEAD" 

72 

73 return ref, int(index_str) 

74 

75 

76def format_reflog_line( 

77 old_sha: bytes | None, 

78 new_sha: bytes, 

79 committer: bytes, 

80 timestamp: int | float, 

81 timezone: int, 

82 message: bytes, 

83) -> bytes: 

84 """Generate a single reflog line. 

85 

86 Args: 

87 old_sha: Old Commit SHA 

88 new_sha: New Commit SHA 

89 committer: Committer name and e-mail 

90 timestamp: Timestamp 

91 timezone: Timezone 

92 message: Message 

93 """ 

94 if old_sha is None: 

95 old_sha = ZERO_SHA 

96 return ( 

97 old_sha 

98 + b" " 

99 + new_sha 

100 + b" " 

101 + committer 

102 + b" " 

103 + str(int(timestamp)).encode("ascii") 

104 + b" " 

105 + format_timezone(timezone) 

106 + b"\t" 

107 + message 

108 ) 

109 

110 

111def parse_reflog_line(line: bytes) -> Entry: 

112 """Parse a reflog line. 

113 

114 Args: 

115 line: Line to parse 

116 Returns: Tuple of (old_sha, new_sha, committer, timestamp, timezone, 

117 message) 

118 """ 

119 (begin, message) = line.split(b"\t", 1) 

120 (old_sha, new_sha, rest) = begin.split(b" ", 2) 

121 (committer, timestamp_str, timezone_str) = rest.rsplit(b" ", 2) 

122 return Entry( 

123 old_sha, 

124 new_sha, 

125 committer, 

126 int(timestamp_str), 

127 parse_timezone(timezone_str)[0], 

128 message, 

129 ) 

130 

131 

132def read_reflog( 

133 f: BinaryIO | IO[bytes] | _GitFile, 

134) -> Generator[Entry, None, None]: 

135 """Read reflog. 

136 

137 Args: 

138 f: File-like object 

139 Returns: Iterator over Entry objects 

140 """ 

141 for line in f: 

142 yield parse_reflog_line(line.rstrip(b"\n")) 

143 

144 

145def drop_reflog_entry(f: BinaryIO, index: int, rewrite: bool = False) -> None: 

146 """Drop the specified reflog entry. 

147 

148 Args: 

149 f: File-like object 

150 index: Reflog entry index (in Git reflog reverse 0-indexed order) 

151 rewrite: If a reflog entry's predecessor is removed, set its 

152 old SHA to the new SHA of the entry that now precedes it 

153 """ 

154 if index < 0: 

155 raise ValueError(f"Invalid reflog index {index}") 

156 

157 log = [] 

158 offset = f.tell() 

159 for line in f: 

160 log.append((offset, parse_reflog_line(line))) 

161 offset = f.tell() 

162 

163 inverse_index = len(log) - index - 1 

164 write_offset = log[inverse_index][0] 

165 f.seek(write_offset) 

166 

167 if index == 0: 

168 f.truncate() 

169 return 

170 

171 del log[inverse_index] 

172 if rewrite and index > 0 and log: 

173 if inverse_index == 0: 

174 previous_new = ZERO_SHA 

175 else: 

176 previous_new = log[inverse_index - 1][1].new_sha 

177 offset, entry = log[inverse_index] 

178 log[inverse_index] = ( 

179 offset, 

180 Entry( 

181 previous_new, 

182 entry.new_sha, 

183 entry.committer, 

184 entry.timestamp, 

185 entry.timezone, 

186 entry.message, 

187 ), 

188 ) 

189 

190 for _, entry in log[inverse_index:]: 

191 f.write( 

192 format_reflog_line( 

193 entry.old_sha, 

194 entry.new_sha, 

195 entry.committer, 

196 entry.timestamp, 

197 entry.timezone, 

198 entry.message, 

199 ) 

200 ) 

201 f.truncate() 

202 

203 

204def expire_reflog( 

205 f: BinaryIO, 

206 expire_time: int | None = None, 

207 expire_unreachable_time: int | None = None, 

208 reachable_checker: Callable[[bytes], bool] | None = None, 

209) -> int: 

210 """Expire reflog entries based on age and reachability. 

211 

212 Args: 

213 f: File-like object for the reflog 

214 expire_time: Expire entries older than this timestamp (seconds since epoch). 

215 If None, entries are not expired based on age alone. 

216 expire_unreachable_time: Expire unreachable entries older than this 

217 timestamp. If None, unreachable entries are not expired. 

218 reachable_checker: Optional callable that takes a SHA and returns True 

219 if the commit is reachable. If None, all entries are considered 

220 reachable. 

221 

222 Returns: 

223 Number of entries expired 

224 """ 

225 if expire_time is None and expire_unreachable_time is None: 

226 return 0 

227 

228 entries = [] 

229 offset = f.tell() 

230 for line in f: 

231 entries.append((offset, parse_reflog_line(line))) 

232 offset = f.tell() 

233 

234 # Filter entries that should be kept 

235 kept_entries = [] 

236 expired_count = 0 

237 

238 for offset, entry in entries: 

239 should_expire = False 

240 

241 # Check if entry is reachable 

242 is_reachable = True 

243 if reachable_checker is not None: 

244 is_reachable = reachable_checker(entry.new_sha) 

245 

246 # Apply expiration rules 

247 # Check the appropriate expiration time based on reachability 

248 if is_reachable: 

249 if expire_time is not None and entry.timestamp < expire_time: 

250 should_expire = True 

251 else: 

252 if ( 

253 expire_unreachable_time is not None 

254 and entry.timestamp < expire_unreachable_time 

255 ): 

256 should_expire = True 

257 

258 if should_expire: 

259 expired_count += 1 

260 else: 

261 kept_entries.append((offset, entry)) 

262 

263 # Write back the kept entries 

264 if expired_count > 0: 

265 f.seek(0) 

266 for _, entry in kept_entries: 

267 f.write( 

268 format_reflog_line( 

269 entry.old_sha, 

270 entry.new_sha, 

271 entry.committer, 

272 entry.timestamp, 

273 entry.timezone, 

274 entry.message, 

275 ) 

276 ) 

277 f.truncate() 

278 

279 return expired_count 

280 

281 

282def iter_reflogs(logs_dir: str) -> Generator[bytes, None, None]: 

283 """Iterate over all reflogs in a repository. 

284 

285 Args: 

286 logs_dir: Path to the logs directory (e.g., .git/logs) 

287 

288 Yields: 

289 Reference names (as bytes) that have reflogs 

290 """ 

291 import os 

292 from pathlib import Path 

293 

294 if not os.path.exists(logs_dir): 

295 return 

296 

297 logs_path = Path(logs_dir) 

298 for log_file in logs_path.rglob("*"): 

299 if log_file.is_file(): 

300 # Get the ref name by removing the logs_dir prefix 

301 ref_name = str(log_file.relative_to(logs_path)) 

302 # Convert path separators to / for refs 

303 ref_name = ref_name.replace(os.sep, "/") 

304 yield ref_name.encode("utf-8")