Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/reflog.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

99 statements  

1# reflog.py -- Parsing and writing reflog files 

2# Copyright (C) 2015 Jelmer Vernooij and others. 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Utilities for reading and generating reflogs.""" 

23 

24__all__ = [ 

25 "drop_reflog_entry", 

26 "expire_reflog", 

27 "format_reflog_line", 

28 "iter_reflogs", 

29 "parse_reflog_line", 

30 "parse_reflog_spec", 

31 "read_reflog", 

32] 

33 

34import collections 

35from collections.abc import Callable, Generator 

36from typing import IO, BinaryIO 

37 

38from .file import _GitFile 

39from .objects import ZERO_SHA, format_timezone, parse_timezone 

40 

41Entry = collections.namedtuple( 

42 "Entry", 

43 ["old_sha", "new_sha", "committer", "timestamp", "timezone", "message"], 

44) 

45 

46 

47def parse_reflog_spec(refspec: str | bytes) -> tuple[bytes, int]: 

48 """Parse a reflog specification like 'HEAD@{1}' or 'refs/heads/master@{2}'. 

49 

50 Args: 

51 refspec: Reflog specification (e.g., 'HEAD@{1}', 'master@{0}') 

52 

53 Returns: 

54 Tuple of (ref_name, index) where index is in Git reflog order (0 = newest) 

55 

56 Raises: 

57 ValueError: If the refspec is not a valid reflog specification 

58 """ 

59 if isinstance(refspec, str): 

60 refspec = refspec.encode("utf-8") 

61 

62 if b"@{" not in refspec: 

63 raise ValueError( 

64 f"Invalid reflog spec: {refspec!r}. Expected format: ref@{{n}}" 

65 ) 

66 

67 ref, rest = refspec.split(b"@{", 1) 

68 if not rest.endswith(b"}"): 

69 raise ValueError( 

70 f"Invalid reflog spec: {refspec!r}. Expected format: ref@{{n}}" 

71 ) 

72 

73 index_str = rest[:-1] 

74 if not index_str.isdigit(): 

75 raise ValueError( 

76 f"Invalid reflog index: {index_str!r}. Expected integer in ref@{{n}}" 

77 ) 

78 

79 # Use HEAD if no ref specified (e.g., "@{1}") 

80 if not ref: 

81 ref = b"HEAD" 

82 

83 return ref, int(index_str) 

84 

85 

86def format_reflog_line( 

87 old_sha: bytes | None, 

88 new_sha: bytes, 

89 committer: bytes, 

90 timestamp: int | float, 

91 timezone: int, 

92 message: bytes, 

93) -> bytes: 

94 """Generate a single reflog line. 

95 

96 Args: 

97 old_sha: Old Commit SHA 

98 new_sha: New Commit SHA 

99 committer: Committer name and e-mail 

100 timestamp: Timestamp 

101 timezone: Timezone 

102 message: Message 

103 """ 

104 if old_sha is None: 

105 old_sha = ZERO_SHA 

106 return ( 

107 old_sha 

108 + b" " 

109 + new_sha 

110 + b" " 

111 + committer 

112 + b" " 

113 + str(int(timestamp)).encode("ascii") 

114 + b" " 

115 + format_timezone(timezone) 

116 + b"\t" 

117 + message 

118 ) 

119 

120 

121def parse_reflog_line(line: bytes) -> Entry: 

122 """Parse a reflog line. 

123 

124 Args: 

125 line: Line to parse 

126 Returns: Tuple of (old_sha, new_sha, committer, timestamp, timezone, 

127 message) 

128 """ 

129 (begin, message) = line.split(b"\t", 1) 

130 (old_sha, new_sha, rest) = begin.split(b" ", 2) 

131 (committer, timestamp_str, timezone_str) = rest.rsplit(b" ", 2) 

132 return Entry( 

133 old_sha, 

134 new_sha, 

135 committer, 

136 int(timestamp_str), 

137 parse_timezone(timezone_str)[0], 

138 message, 

139 ) 

140 

141 

142def read_reflog( 

143 f: BinaryIO | IO[bytes] | _GitFile, 

144) -> Generator[Entry, None, None]: 

145 """Read reflog. 

146 

147 Args: 

148 f: File-like object 

149 Returns: Iterator over Entry objects 

150 """ 

151 for line in f: 

152 yield parse_reflog_line(line.rstrip(b"\n")) 

153 

154 

155def drop_reflog_entry(f: BinaryIO, index: int, rewrite: bool = False) -> None: 

156 """Drop the specified reflog entry. 

157 

158 Args: 

159 f: File-like object 

160 index: Reflog entry index (in Git reflog reverse 0-indexed order) 

161 rewrite: If a reflog entry's predecessor is removed, set its 

162 old SHA to the new SHA of the entry that now precedes it 

163 """ 

164 if index < 0: 

165 raise ValueError(f"Invalid reflog index {index}") 

166 

167 log = [] 

168 offset = f.tell() 

169 for line in f: 

170 log.append((offset, parse_reflog_line(line))) 

171 offset = f.tell() 

172 

173 inverse_index = len(log) - index - 1 

174 write_offset = log[inverse_index][0] 

175 f.seek(write_offset) 

176 

177 if index == 0: 

178 f.truncate() 

179 return 

180 

181 del log[inverse_index] 

182 if rewrite and index > 0 and log: 

183 if inverse_index == 0: 

184 previous_new = ZERO_SHA 

185 else: 

186 previous_new = log[inverse_index - 1][1].new_sha 

187 offset, entry = log[inverse_index] 

188 log[inverse_index] = ( 

189 offset, 

190 Entry( 

191 previous_new, 

192 entry.new_sha, 

193 entry.committer, 

194 entry.timestamp, 

195 entry.timezone, 

196 entry.message, 

197 ), 

198 ) 

199 

200 for _, entry in log[inverse_index:]: 

201 f.write( 

202 format_reflog_line( 

203 entry.old_sha, 

204 entry.new_sha, 

205 entry.committer, 

206 entry.timestamp, 

207 entry.timezone, 

208 entry.message, 

209 ) 

210 ) 

211 f.truncate() 

212 

213 

214def expire_reflog( 

215 f: BinaryIO, 

216 expire_time: int | None = None, 

217 expire_unreachable_time: int | None = None, 

218 reachable_checker: Callable[[bytes], bool] | None = None, 

219) -> int: 

220 """Expire reflog entries based on age and reachability. 

221 

222 Args: 

223 f: File-like object for the reflog 

224 expire_time: Expire entries older than this timestamp (seconds since epoch). 

225 If None, entries are not expired based on age alone. 

226 expire_unreachable_time: Expire unreachable entries older than this 

227 timestamp. If None, unreachable entries are not expired. 

228 reachable_checker: Optional callable that takes a SHA and returns True 

229 if the commit is reachable. If None, all entries are considered 

230 reachable. 

231 

232 Returns: 

233 Number of entries expired 

234 """ 

235 if expire_time is None and expire_unreachable_time is None: 

236 return 0 

237 

238 entries = [] 

239 offset = f.tell() 

240 for line in f: 

241 entries.append((offset, parse_reflog_line(line))) 

242 offset = f.tell() 

243 

244 # Filter entries that should be kept 

245 kept_entries = [] 

246 expired_count = 0 

247 

248 for offset, entry in entries: 

249 should_expire = False 

250 

251 # Check if entry is reachable 

252 is_reachable = True 

253 if reachable_checker is not None: 

254 is_reachable = reachable_checker(entry.new_sha) 

255 

256 # Apply expiration rules 

257 # Check the appropriate expiration time based on reachability 

258 if is_reachable: 

259 if expire_time is not None and entry.timestamp < expire_time: 

260 should_expire = True 

261 else: 

262 if ( 

263 expire_unreachable_time is not None 

264 and entry.timestamp < expire_unreachable_time 

265 ): 

266 should_expire = True 

267 

268 if should_expire: 

269 expired_count += 1 

270 else: 

271 kept_entries.append((offset, entry)) 

272 

273 # Write back the kept entries 

274 if expired_count > 0: 

275 f.seek(0) 

276 for _, entry in kept_entries: 

277 f.write( 

278 format_reflog_line( 

279 entry.old_sha, 

280 entry.new_sha, 

281 entry.committer, 

282 entry.timestamp, 

283 entry.timezone, 

284 entry.message, 

285 ) 

286 ) 

287 f.truncate() 

288 

289 return expired_count 

290 

291 

292def iter_reflogs(logs_dir: str) -> Generator[bytes, None, None]: 

293 """Iterate over all reflogs in a repository. 

294 

295 Args: 

296 logs_dir: Path to the logs directory (e.g., .git/logs) 

297 

298 Yields: 

299 Reference names (as bytes) that have reflogs 

300 """ 

301 import os 

302 from pathlib import Path 

303 

304 if not os.path.exists(logs_dir): 

305 return 

306 

307 logs_path = Path(logs_dir) 

308 for log_file in logs_path.rglob("*"): 

309 if log_file.is_file(): 

310 # Get the ref name by removing the logs_dir prefix 

311 ref_name = str(log_file.relative_to(logs_path)) 

312 # Convert path separators to / for refs 

313 ref_name = ref_name.replace(os.sep, "/") 

314 yield ref_name.encode("utf-8")