Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/reflog.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

98 statements  

1# reflog.py -- Parsing and writing reflog files 

2# Copyright (C) 2015 Jelmer Vernooij and others. 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Utilities for reading and generating reflogs.""" 

23 

24import collections 

25from collections.abc import Callable, Generator 

26from typing import IO, BinaryIO, Optional, Union 

27 

28from .file import _GitFile 

29from .objects import ZERO_SHA, format_timezone, parse_timezone 

30 

31Entry = collections.namedtuple( 

32 "Entry", 

33 ["old_sha", "new_sha", "committer", "timestamp", "timezone", "message"], 

34) 

35 

36 

37def parse_reflog_spec(refspec: Union[str, bytes]) -> tuple[bytes, int]: 

38 """Parse a reflog specification like 'HEAD@{1}' or 'refs/heads/master@{2}'. 

39 

40 Args: 

41 refspec: Reflog specification (e.g., 'HEAD@{1}', 'master@{0}') 

42 

43 Returns: 

44 Tuple of (ref_name, index) where index is in Git reflog order (0 = newest) 

45 

46 Raises: 

47 ValueError: If the refspec is not a valid reflog specification 

48 """ 

49 if isinstance(refspec, str): 

50 refspec = refspec.encode("utf-8") 

51 

52 if b"@{" not in refspec: 

53 raise ValueError( 

54 f"Invalid reflog spec: {refspec!r}. Expected format: ref@{{n}}" 

55 ) 

56 

57 ref, rest = refspec.split(b"@{", 1) 

58 if not rest.endswith(b"}"): 

59 raise ValueError( 

60 f"Invalid reflog spec: {refspec!r}. Expected format: ref@{{n}}" 

61 ) 

62 

63 index_str = rest[:-1] 

64 if not index_str.isdigit(): 

65 raise ValueError( 

66 f"Invalid reflog index: {index_str!r}. Expected integer in ref@{{n}}" 

67 ) 

68 

69 # Use HEAD if no ref specified (e.g., "@{1}") 

70 if not ref: 

71 ref = b"HEAD" 

72 

73 return ref, int(index_str) 

74 

75 

76def format_reflog_line( 

77 old_sha: Optional[bytes], 

78 new_sha: bytes, 

79 committer: bytes, 

80 timestamp: Union[int, float], 

81 timezone: int, 

82 message: bytes, 

83) -> bytes: 

84 """Generate a single reflog line. 

85 

86 Args: 

87 old_sha: Old Commit SHA 

88 new_sha: New Commit SHA 

89 committer: Committer name and e-mail 

90 timestamp: Timestamp 

91 timezone: Timezone 

92 message: Message 

93 """ 

94 if old_sha is None: 

95 old_sha = ZERO_SHA 

96 return ( 

97 old_sha 

98 + b" " 

99 + new_sha 

100 + b" " 

101 + committer 

102 + b" " 

103 + str(int(timestamp)).encode("ascii") 

104 + b" " 

105 + format_timezone(timezone) 

106 + b"\t" 

107 + message 

108 ) 

109 

110 

111def parse_reflog_line(line: bytes) -> Entry: 

112 """Parse a reflog line. 

113 

114 Args: 

115 line: Line to parse 

116 Returns: Tuple of (old_sha, new_sha, committer, timestamp, timezone, 

117 message) 

118 """ 

119 (begin, message) = line.split(b"\t", 1) 

120 (old_sha, new_sha, rest) = begin.split(b" ", 2) 

121 (committer, timestamp_str, timezone_str) = rest.rsplit(b" ", 2) 

122 return Entry( 

123 old_sha, 

124 new_sha, 

125 committer, 

126 int(timestamp_str), 

127 parse_timezone(timezone_str)[0], 

128 message, 

129 ) 

130 

131 

132def read_reflog( 

133 f: Union[BinaryIO, IO[bytes], _GitFile], 

134) -> Generator[Entry, None, None]: 

135 """Read reflog. 

136 

137 Args: 

138 f: File-like object 

139 Returns: Iterator over Entry objects 

140 """ 

141 for line in f: 

142 yield parse_reflog_line(line.rstrip(b"\n")) 

143 

144 

145def drop_reflog_entry(f: BinaryIO, index: int, rewrite: bool = False) -> None: 

146 """Drop the specified reflog entry. 

147 

148 Args: 

149 f: File-like object 

150 index: Reflog entry index (in Git reflog reverse 0-indexed order) 

151 rewrite: If a reflog entry's predecessor is removed, set its 

152 old SHA to the new SHA of the entry that now precedes it 

153 """ 

154 if index < 0: 

155 raise ValueError(f"Invalid reflog index {index}") 

156 

157 log = [] 

158 offset = f.tell() 

159 for line in f: 

160 log.append((offset, parse_reflog_line(line))) 

161 offset = f.tell() 

162 

163 inverse_index = len(log) - index - 1 

164 write_offset = log[inverse_index][0] 

165 f.seek(write_offset) 

166 

167 if index == 0: 

168 f.truncate() 

169 return 

170 

171 del log[inverse_index] 

172 if rewrite and index > 0 and log: 

173 if inverse_index == 0: 

174 previous_new = ZERO_SHA 

175 else: 

176 previous_new = log[inverse_index - 1][1].new_sha 

177 offset, entry = log[inverse_index] 

178 log[inverse_index] = ( 

179 offset, 

180 Entry( 

181 previous_new, 

182 entry.new_sha, 

183 entry.committer, 

184 entry.timestamp, 

185 entry.timezone, 

186 entry.message, 

187 ), 

188 ) 

189 

190 for _, entry in log[inverse_index:]: 

191 f.write( 

192 format_reflog_line( 

193 entry.old_sha, 

194 entry.new_sha, 

195 entry.committer, 

196 entry.timestamp, 

197 entry.timezone, 

198 entry.message, 

199 ) 

200 ) 

201 f.truncate() 

202 

203 

204def expire_reflog( 

205 f: BinaryIO, 

206 expire_time: Optional[int] = None, 

207 expire_unreachable_time: Optional[int] = None, 

208 # String annotation to work around typing module bug in Python 3.9.0/3.9.1 

209 # See: https://github.com/jelmer/dulwich/issues/1948 

210 reachable_checker: "Optional[Callable[[bytes], bool]]" = None, 

211) -> int: 

212 """Expire reflog entries based on age and reachability. 

213 

214 Args: 

215 f: File-like object for the reflog 

216 expire_time: Expire entries older than this timestamp (seconds since epoch). 

217 If None, entries are not expired based on age alone. 

218 expire_unreachable_time: Expire unreachable entries older than this 

219 timestamp. If None, unreachable entries are not expired. 

220 reachable_checker: Optional callable that takes a SHA and returns True 

221 if the commit is reachable. If None, all entries are considered 

222 reachable. 

223 

224 Returns: 

225 Number of entries expired 

226 """ 

227 if expire_time is None and expire_unreachable_time is None: 

228 return 0 

229 

230 entries = [] 

231 offset = f.tell() 

232 for line in f: 

233 entries.append((offset, parse_reflog_line(line))) 

234 offset = f.tell() 

235 

236 # Filter entries that should be kept 

237 kept_entries = [] 

238 expired_count = 0 

239 

240 for offset, entry in entries: 

241 should_expire = False 

242 

243 # Check if entry is reachable 

244 is_reachable = True 

245 if reachable_checker is not None: 

246 is_reachable = reachable_checker(entry.new_sha) 

247 

248 # Apply expiration rules 

249 # Check the appropriate expiration time based on reachability 

250 if is_reachable: 

251 if expire_time is not None and entry.timestamp < expire_time: 

252 should_expire = True 

253 else: 

254 if ( 

255 expire_unreachable_time is not None 

256 and entry.timestamp < expire_unreachable_time 

257 ): 

258 should_expire = True 

259 

260 if should_expire: 

261 expired_count += 1 

262 else: 

263 kept_entries.append((offset, entry)) 

264 

265 # Write back the kept entries 

266 if expired_count > 0: 

267 f.seek(0) 

268 for _, entry in kept_entries: 

269 f.write( 

270 format_reflog_line( 

271 entry.old_sha, 

272 entry.new_sha, 

273 entry.committer, 

274 entry.timestamp, 

275 entry.timezone, 

276 entry.message, 

277 ) 

278 ) 

279 f.truncate() 

280 

281 return expired_count 

282 

283 

284def iter_reflogs(logs_dir: str) -> Generator[bytes, None, None]: 

285 """Iterate over all reflogs in a repository. 

286 

287 Args: 

288 logs_dir: Path to the logs directory (e.g., .git/logs) 

289 

290 Yields: 

291 Reference names (as bytes) that have reflogs 

292 """ 

293 import os 

294 from pathlib import Path 

295 

296 if not os.path.exists(logs_dir): 

297 return 

298 

299 logs_path = Path(logs_dir) 

300 for log_file in logs_path.rglob("*"): 

301 if log_file.is_file(): 

302 # Get the ref name by removing the logs_dir prefix 

303 ref_name = str(log_file.relative_to(logs_path)) 

304 # Convert path separators to / for refs 

305 ref_name = ref_name.replace(os.sep, "/") 

306 yield ref_name.encode("utf-8")