Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/reflog.py: 17%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# reflog.py -- Parsing and writing reflog files
2# Copyright (C) 2015 Jelmer Vernooij and others.
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Utilities for reading and generating reflogs."""
24__all__ = [
25 "drop_reflog_entry",
26 "expire_reflog",
27 "format_reflog_line",
28 "iter_reflogs",
29 "parse_reflog_line",
30 "parse_reflog_spec",
31 "read_reflog",
32]
34import collections
35from collections.abc import Callable, Generator
36from typing import IO, BinaryIO
38from .file import _GitFile
39from .objects import ZERO_SHA, format_timezone, parse_timezone
41Entry = collections.namedtuple(
42 "Entry",
43 ["old_sha", "new_sha", "committer", "timestamp", "timezone", "message"],
44)
47def parse_reflog_spec(refspec: str | bytes) -> tuple[bytes, int]:
48 """Parse a reflog specification like 'HEAD@{1}' or 'refs/heads/master@{2}'.
50 Args:
51 refspec: Reflog specification (e.g., 'HEAD@{1}', 'master@{0}')
53 Returns:
54 Tuple of (ref_name, index) where index is in Git reflog order (0 = newest)
56 Raises:
57 ValueError: If the refspec is not a valid reflog specification
58 """
59 if isinstance(refspec, str):
60 refspec = refspec.encode("utf-8")
62 if b"@{" not in refspec:
63 raise ValueError(
64 f"Invalid reflog spec: {refspec!r}. Expected format: ref@{{n}}"
65 )
67 ref, rest = refspec.split(b"@{", 1)
68 if not rest.endswith(b"}"):
69 raise ValueError(
70 f"Invalid reflog spec: {refspec!r}. Expected format: ref@{{n}}"
71 )
73 index_str = rest[:-1]
74 if not index_str.isdigit():
75 raise ValueError(
76 f"Invalid reflog index: {index_str!r}. Expected integer in ref@{{n}}"
77 )
79 # Use HEAD if no ref specified (e.g., "@{1}")
80 if not ref:
81 ref = b"HEAD"
83 return ref, int(index_str)
86def format_reflog_line(
87 old_sha: bytes | None,
88 new_sha: bytes,
89 committer: bytes,
90 timestamp: int | float,
91 timezone: int,
92 message: bytes,
93) -> bytes:
94 """Generate a single reflog line.
96 Args:
97 old_sha: Old Commit SHA
98 new_sha: New Commit SHA
99 committer: Committer name and e-mail
100 timestamp: Timestamp
101 timezone: Timezone
102 message: Message
103 """
104 if old_sha is None:
105 old_sha = ZERO_SHA
106 return (
107 old_sha
108 + b" "
109 + new_sha
110 + b" "
111 + committer
112 + b" "
113 + str(int(timestamp)).encode("ascii")
114 + b" "
115 + format_timezone(timezone)
116 + b"\t"
117 + message
118 )
121def parse_reflog_line(line: bytes) -> Entry:
122 """Parse a reflog line.
124 Args:
125 line: Line to parse
126 Returns: Tuple of (old_sha, new_sha, committer, timestamp, timezone,
127 message)
128 """
129 (begin, message) = line.split(b"\t", 1)
130 (old_sha, new_sha, rest) = begin.split(b" ", 2)
131 (committer, timestamp_str, timezone_str) = rest.rsplit(b" ", 2)
132 return Entry(
133 old_sha,
134 new_sha,
135 committer,
136 int(timestamp_str),
137 parse_timezone(timezone_str)[0],
138 message,
139 )
142def read_reflog(
143 f: BinaryIO | IO[bytes] | _GitFile,
144) -> Generator[Entry, None, None]:
145 """Read reflog.
147 Args:
148 f: File-like object
149 Returns: Iterator over Entry objects
150 """
151 for line in f:
152 yield parse_reflog_line(line.rstrip(b"\n"))
155def drop_reflog_entry(f: BinaryIO, index: int, rewrite: bool = False) -> None:
156 """Drop the specified reflog entry.
158 Args:
159 f: File-like object
160 index: Reflog entry index (in Git reflog reverse 0-indexed order)
161 rewrite: If a reflog entry's predecessor is removed, set its
162 old SHA to the new SHA of the entry that now precedes it
163 """
164 if index < 0:
165 raise ValueError(f"Invalid reflog index {index}")
167 log = []
168 offset = f.tell()
169 for line in f:
170 log.append((offset, parse_reflog_line(line)))
171 offset = f.tell()
173 inverse_index = len(log) - index - 1
174 write_offset = log[inverse_index][0]
175 f.seek(write_offset)
177 if index == 0:
178 f.truncate()
179 return
181 del log[inverse_index]
182 if rewrite and index > 0 and log:
183 if inverse_index == 0:
184 previous_new = ZERO_SHA
185 else:
186 previous_new = log[inverse_index - 1][1].new_sha
187 offset, entry = log[inverse_index]
188 log[inverse_index] = (
189 offset,
190 Entry(
191 previous_new,
192 entry.new_sha,
193 entry.committer,
194 entry.timestamp,
195 entry.timezone,
196 entry.message,
197 ),
198 )
200 for _, entry in log[inverse_index:]:
201 f.write(
202 format_reflog_line(
203 entry.old_sha,
204 entry.new_sha,
205 entry.committer,
206 entry.timestamp,
207 entry.timezone,
208 entry.message,
209 )
210 )
211 f.truncate()
214def expire_reflog(
215 f: BinaryIO,
216 expire_time: int | None = None,
217 expire_unreachable_time: int | None = None,
218 reachable_checker: Callable[[bytes], bool] | None = None,
219) -> int:
220 """Expire reflog entries based on age and reachability.
222 Args:
223 f: File-like object for the reflog
224 expire_time: Expire entries older than this timestamp (seconds since epoch).
225 If None, entries are not expired based on age alone.
226 expire_unreachable_time: Expire unreachable entries older than this
227 timestamp. If None, unreachable entries are not expired.
228 reachable_checker: Optional callable that takes a SHA and returns True
229 if the commit is reachable. If None, all entries are considered
230 reachable.
232 Returns:
233 Number of entries expired
234 """
235 if expire_time is None and expire_unreachable_time is None:
236 return 0
238 entries = []
239 offset = f.tell()
240 for line in f:
241 entries.append((offset, parse_reflog_line(line)))
242 offset = f.tell()
244 # Filter entries that should be kept
245 kept_entries = []
246 expired_count = 0
248 for offset, entry in entries:
249 should_expire = False
251 # Check if entry is reachable
252 is_reachable = True
253 if reachable_checker is not None:
254 is_reachable = reachable_checker(entry.new_sha)
256 # Apply expiration rules
257 # Check the appropriate expiration time based on reachability
258 if is_reachable:
259 if expire_time is not None and entry.timestamp < expire_time:
260 should_expire = True
261 else:
262 if (
263 expire_unreachable_time is not None
264 and entry.timestamp < expire_unreachable_time
265 ):
266 should_expire = True
268 if should_expire:
269 expired_count += 1
270 else:
271 kept_entries.append((offset, entry))
273 # Write back the kept entries
274 if expired_count > 0:
275 f.seek(0)
276 for _, entry in kept_entries:
277 f.write(
278 format_reflog_line(
279 entry.old_sha,
280 entry.new_sha,
281 entry.committer,
282 entry.timestamp,
283 entry.timezone,
284 entry.message,
285 )
286 )
287 f.truncate()
289 return expired_count
292def iter_reflogs(logs_dir: str) -> Generator[bytes, None, None]:
293 """Iterate over all reflogs in a repository.
295 Args:
296 logs_dir: Path to the logs directory (e.g., .git/logs)
298 Yields:
299 Reference names (as bytes) that have reflogs
300 """
301 import os
302 from pathlib import Path
304 if not os.path.exists(logs_dir):
305 return
307 logs_path = Path(logs_dir)
308 for log_file in logs_path.rglob("*"):
309 if log_file.is_file():
310 # Get the ref name by removing the logs_dir prefix
311 ref_name = str(log_file.relative_to(logs_path))
312 # Convert path separators to / for refs
313 ref_name = ref_name.replace(os.sep, "/")
314 yield ref_name.encode("utf-8")