Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/reflog.py: 16%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# reflog.py -- Parsing and writing reflog files
2# Copyright (C) 2015 Jelmer Vernooij and others.
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Utilities for reading and generating reflogs."""
24import collections
25from collections.abc import Callable, Generator
26from typing import IO, BinaryIO, Optional, Union
28from .file import _GitFile
29from .objects import ZERO_SHA, format_timezone, parse_timezone
31Entry = collections.namedtuple(
32 "Entry",
33 ["old_sha", "new_sha", "committer", "timestamp", "timezone", "message"],
34)
37def parse_reflog_spec(refspec: Union[str, bytes]) -> tuple[bytes, int]:
38 """Parse a reflog specification like 'HEAD@{1}' or 'refs/heads/master@{2}'.
40 Args:
41 refspec: Reflog specification (e.g., 'HEAD@{1}', 'master@{0}')
43 Returns:
44 Tuple of (ref_name, index) where index is in Git reflog order (0 = newest)
46 Raises:
47 ValueError: If the refspec is not a valid reflog specification
48 """
49 if isinstance(refspec, str):
50 refspec = refspec.encode("utf-8")
52 if b"@{" not in refspec:
53 raise ValueError(
54 f"Invalid reflog spec: {refspec!r}. Expected format: ref@{{n}}"
55 )
57 ref, rest = refspec.split(b"@{", 1)
58 if not rest.endswith(b"}"):
59 raise ValueError(
60 f"Invalid reflog spec: {refspec!r}. Expected format: ref@{{n}}"
61 )
63 index_str = rest[:-1]
64 if not index_str.isdigit():
65 raise ValueError(
66 f"Invalid reflog index: {index_str!r}. Expected integer in ref@{{n}}"
67 )
69 # Use HEAD if no ref specified (e.g., "@{1}")
70 if not ref:
71 ref = b"HEAD"
73 return ref, int(index_str)
76def format_reflog_line(
77 old_sha: Optional[bytes],
78 new_sha: bytes,
79 committer: bytes,
80 timestamp: Union[int, float],
81 timezone: int,
82 message: bytes,
83) -> bytes:
84 """Generate a single reflog line.
86 Args:
87 old_sha: Old Commit SHA
88 new_sha: New Commit SHA
89 committer: Committer name and e-mail
90 timestamp: Timestamp
91 timezone: Timezone
92 message: Message
93 """
94 if old_sha is None:
95 old_sha = ZERO_SHA
96 return (
97 old_sha
98 + b" "
99 + new_sha
100 + b" "
101 + committer
102 + b" "
103 + str(int(timestamp)).encode("ascii")
104 + b" "
105 + format_timezone(timezone)
106 + b"\t"
107 + message
108 )
111def parse_reflog_line(line: bytes) -> Entry:
112 """Parse a reflog line.
114 Args:
115 line: Line to parse
116 Returns: Tuple of (old_sha, new_sha, committer, timestamp, timezone,
117 message)
118 """
119 (begin, message) = line.split(b"\t", 1)
120 (old_sha, new_sha, rest) = begin.split(b" ", 2)
121 (committer, timestamp_str, timezone_str) = rest.rsplit(b" ", 2)
122 return Entry(
123 old_sha,
124 new_sha,
125 committer,
126 int(timestamp_str),
127 parse_timezone(timezone_str)[0],
128 message,
129 )
132def read_reflog(
133 f: Union[BinaryIO, IO[bytes], _GitFile],
134) -> Generator[Entry, None, None]:
135 """Read reflog.
137 Args:
138 f: File-like object
139 Returns: Iterator over Entry objects
140 """
141 for line in f:
142 yield parse_reflog_line(line.rstrip(b"\n"))
145def drop_reflog_entry(f: BinaryIO, index: int, rewrite: bool = False) -> None:
146 """Drop the specified reflog entry.
148 Args:
149 f: File-like object
150 index: Reflog entry index (in Git reflog reverse 0-indexed order)
151 rewrite: If a reflog entry's predecessor is removed, set its
152 old SHA to the new SHA of the entry that now precedes it
153 """
154 if index < 0:
155 raise ValueError(f"Invalid reflog index {index}")
157 log = []
158 offset = f.tell()
159 for line in f:
160 log.append((offset, parse_reflog_line(line)))
161 offset = f.tell()
163 inverse_index = len(log) - index - 1
164 write_offset = log[inverse_index][0]
165 f.seek(write_offset)
167 if index == 0:
168 f.truncate()
169 return
171 del log[inverse_index]
172 if rewrite and index > 0 and log:
173 if inverse_index == 0:
174 previous_new = ZERO_SHA
175 else:
176 previous_new = log[inverse_index - 1][1].new_sha
177 offset, entry = log[inverse_index]
178 log[inverse_index] = (
179 offset,
180 Entry(
181 previous_new,
182 entry.new_sha,
183 entry.committer,
184 entry.timestamp,
185 entry.timezone,
186 entry.message,
187 ),
188 )
190 for _, entry in log[inverse_index:]:
191 f.write(
192 format_reflog_line(
193 entry.old_sha,
194 entry.new_sha,
195 entry.committer,
196 entry.timestamp,
197 entry.timezone,
198 entry.message,
199 )
200 )
201 f.truncate()
204def expire_reflog(
205 f: BinaryIO,
206 expire_time: Optional[int] = None,
207 expire_unreachable_time: Optional[int] = None,
208 # String annotation to work around typing module bug in Python 3.9.0/3.9.1
209 # See: https://github.com/jelmer/dulwich/issues/1948
210 reachable_checker: "Optional[Callable[[bytes], bool]]" = None,
211) -> int:
212 """Expire reflog entries based on age and reachability.
214 Args:
215 f: File-like object for the reflog
216 expire_time: Expire entries older than this timestamp (seconds since epoch).
217 If None, entries are not expired based on age alone.
218 expire_unreachable_time: Expire unreachable entries older than this
219 timestamp. If None, unreachable entries are not expired.
220 reachable_checker: Optional callable that takes a SHA and returns True
221 if the commit is reachable. If None, all entries are considered
222 reachable.
224 Returns:
225 Number of entries expired
226 """
227 if expire_time is None and expire_unreachable_time is None:
228 return 0
230 entries = []
231 offset = f.tell()
232 for line in f:
233 entries.append((offset, parse_reflog_line(line)))
234 offset = f.tell()
236 # Filter entries that should be kept
237 kept_entries = []
238 expired_count = 0
240 for offset, entry in entries:
241 should_expire = False
243 # Check if entry is reachable
244 is_reachable = True
245 if reachable_checker is not None:
246 is_reachable = reachable_checker(entry.new_sha)
248 # Apply expiration rules
249 # Check the appropriate expiration time based on reachability
250 if is_reachable:
251 if expire_time is not None and entry.timestamp < expire_time:
252 should_expire = True
253 else:
254 if (
255 expire_unreachable_time is not None
256 and entry.timestamp < expire_unreachable_time
257 ):
258 should_expire = True
260 if should_expire:
261 expired_count += 1
262 else:
263 kept_entries.append((offset, entry))
265 # Write back the kept entries
266 if expired_count > 0:
267 f.seek(0)
268 for _, entry in kept_entries:
269 f.write(
270 format_reflog_line(
271 entry.old_sha,
272 entry.new_sha,
273 entry.committer,
274 entry.timestamp,
275 entry.timezone,
276 entry.message,
277 )
278 )
279 f.truncate()
281 return expired_count
284def iter_reflogs(logs_dir: str) -> Generator[bytes, None, None]:
285 """Iterate over all reflogs in a repository.
287 Args:
288 logs_dir: Path to the logs directory (e.g., .git/logs)
290 Yields:
291 Reference names (as bytes) that have reflogs
292 """
293 import os
294 from pathlib import Path
296 if not os.path.exists(logs_dir):
297 return
299 logs_path = Path(logs_dir)
300 for log_file in logs_path.rglob("*"):
301 if log_file.is_file():
302 # Get the ref name by removing the logs_dir prefix
303 ref_name = str(log_file.relative_to(logs_path))
304 # Convert path separators to / for refs
305 ref_name = ref_name.replace(os.sep, "/")
306 yield ref_name.encode("utf-8")