Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/git/refs/log.py: 48%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# This module is part of GitPython and is released under the
2# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
4__all__ = ["RefLog", "RefLogEntry"]
6from mmap import mmap
7import re
8import time as _time
10from git.compat import defenc
11from git.objects.util import (
12 Serializable,
13 altz_to_utctz_str,
14 parse_date,
15)
16from git.util import (
17 Actor,
18 LockedFD,
19 LockFile,
20 assure_directory_exists,
21 bin_to_hex,
22 file_contents_ro_filepath,
23 to_native_path,
24)
26# typing ------------------------------------------------------------------
28from typing import Iterator, List, Tuple, TYPE_CHECKING, Union
30from git.types import PathLike
32if TYPE_CHECKING:
33 from io import BytesIO
35 from git.config import GitConfigParser, SectionConstraint
36 from git.refs import SymbolicReference
38# ------------------------------------------------------------------------------
41class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]):
42 """Named tuple allowing easy access to the revlog data fields."""
44 _re_hexsha_only = re.compile(r"^[0-9A-Fa-f]{40}$")
46 __slots__ = ()
48 def __repr__(self) -> str:
49 """Representation of ourselves in git reflog format."""
50 return self.format()
52 def format(self) -> str:
53 """:return: A string suitable to be placed in a reflog file."""
54 act = self.actor
55 time = self.time
56 return "{} {} {} <{}> {!s} {}\t{}\n".format(
57 self.oldhexsha,
58 self.newhexsha,
59 act.name,
60 act.email,
61 time[0],
62 altz_to_utctz_str(time[1]),
63 self.message,
64 )
66 @property
67 def oldhexsha(self) -> str:
68 """The hexsha to the commit the ref pointed to before the change."""
69 return self[0]
71 @property
72 def newhexsha(self) -> str:
73 """The hexsha to the commit the ref now points to, after the change."""
74 return self[1]
76 @property
77 def actor(self) -> Actor:
78 """Actor instance, providing access."""
79 return self[2]
81 @property
82 def time(self) -> Tuple[int, int]:
83 """Time as tuple:
85 * [0] = ``int(time)``
86 * [1] = ``int(timezone_offset)`` in :attr:`time.altzone` format
87 """
88 return self[3]
90 @property
91 def message(self) -> str:
92 """Message describing the operation that acted on the reference."""
93 return self[4]
95 @classmethod
96 def new(
97 cls,
98 oldhexsha: str,
99 newhexsha: str,
100 actor: Actor,
101 time: int,
102 tz_offset: int,
103 message: str,
104 ) -> "RefLogEntry": # skipcq: PYL-W0621
105 """:return: New instance of a :class:`RefLogEntry`"""
106 if not isinstance(actor, Actor):
107 raise ValueError("Need actor instance, got %s" % actor)
108 # END check types
109 return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), message))
111 @classmethod
112 def from_line(cls, line: bytes) -> "RefLogEntry":
113 """:return: New :class:`RefLogEntry` instance from the given revlog line.
115 :param line:
116 Line bytes without trailing newline
118 :raise ValueError:
119 If `line` could not be parsed.
120 """
121 line_str = line.decode(defenc)
122 fields = line_str.split("\t", 1)
123 if len(fields) == 1:
124 info, msg = fields[0], None
125 elif len(fields) == 2:
126 info, msg = fields
127 else:
128 raise ValueError("Line must have up to two TAB-separated fields. Got %s" % repr(line_str))
129 # END handle first split
131 oldhexsha = info[:40]
132 newhexsha = info[41:81]
133 for hexsha in (oldhexsha, newhexsha):
134 if not cls._re_hexsha_only.match(hexsha):
135 raise ValueError("Invalid hexsha: %r" % (hexsha,))
136 # END if hexsha re doesn't match
137 # END for each hexsha
139 email_end = info.find(">", 82)
140 if email_end == -1:
141 raise ValueError("Missing token: >")
142 # END handle missing end brace
144 actor = Actor._from_string(info[82 : email_end + 1])
145 time, tz_offset = parse_date(info[email_end + 2 :]) # skipcq: PYL-W0621
147 return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), msg)) # type: ignore [arg-type]
150class RefLog(List[RefLogEntry], Serializable):
151 R"""A reflog contains :class:`RefLogEntry`\s, each of which defines a certain state
152 of the head in question. Custom query methods allow to retrieve log entries by date
153 or by other criteria.
155 Reflog entries are ordered. The first added entry is first in the list. The last
156 entry, i.e. the last change of the head or reference, is last in the list.
157 """
159 __slots__ = ("_path",)
161 def __new__(cls, filepath: Union[PathLike, None] = None) -> "RefLog":
162 inst = super().__new__(cls)
163 return inst
165 def __init__(self, filepath: Union[PathLike, None] = None) -> None:
166 """Initialize this instance with an optional filepath, from which we will
167 initialize our data. The path is also used to write changes back using the
168 :meth:`write` method."""
169 self._path = filepath
170 if filepath is not None:
171 self._read_from_file()
172 # END handle filepath
174 def _read_from_file(self) -> None:
175 try:
176 fmap = file_contents_ro_filepath(self._path, stream=True, allow_mmap=True)
177 except OSError:
178 # It is possible and allowed that the file doesn't exist!
179 return
180 # END handle invalid log
182 try:
183 self._deserialize(fmap)
184 finally:
185 fmap.close()
186 # END handle closing of handle
188 # { Interface
190 @classmethod
191 def from_file(cls, filepath: PathLike) -> "RefLog":
192 """
193 :return:
194 A new :class:`RefLog` instance containing all entries from the reflog at the
195 given `filepath`.
197 :param filepath:
198 Path to reflog.
200 :raise ValueError:
201 If the file could not be read or was corrupted in some way.
202 """
203 return cls(filepath)
205 @classmethod
206 def path(cls, ref: "SymbolicReference") -> str:
207 """
208 :return:
209 String to absolute path at which the reflog of the given ref instance would
210 be found. The path is not guaranteed to point to a valid file though.
212 :param ref:
213 :class:`~git.refs.symbolic.SymbolicReference` instance
215 :raise ValueError:
216 If `ref.path` is invalid or escapes the repository's reflog directory.
217 """
218 return to_native_path(ref._get_validated_reflog_path(ref.repo, ref.path))
220 @classmethod
221 def iter_entries(cls, stream: Union[str, "BytesIO", mmap]) -> Iterator[RefLogEntry]:
222 """
223 :return:
224 Iterator yielding :class:`RefLogEntry` instances, one for each line read
225 from the given stream.
227 :param stream:
228 File-like object containing the revlog in its native format or string
229 instance pointing to a file to read.
230 """
231 new_entry = RefLogEntry.from_line
232 if isinstance(stream, str):
233 # Default args return mmap since Python 3.
234 _stream = file_contents_ro_filepath(stream)
235 assert isinstance(_stream, mmap)
236 else:
237 _stream = stream
238 # END handle stream type
239 while True:
240 line = _stream.readline()
241 if not line:
242 return
243 yield new_entry(line.strip())
244 # END endless loop
246 @classmethod
247 def entry_at(cls, filepath: PathLike, index: int) -> "RefLogEntry":
248 """
249 :return:
250 :class:`RefLogEntry` at the given index.
252 :param filepath:
253 Full path to the index file from which to read the entry.
255 :param index:
256 Python list compatible index, i.e. it may be negative to specify an entry
257 counted from the end of the list.
259 :raise IndexError:
260 If the entry didn't exist.
262 :note:
263 This method is faster as it only parses the entry at index, skipping all
264 other lines. Nonetheless, the whole file has to be read if the index is
265 negative.
266 """
267 with open(filepath, "rb") as fp:
268 if index < 0:
269 return RefLogEntry.from_line(fp.readlines()[index].strip())
270 # Read until index is reached.
272 for i in range(index + 1):
273 line = fp.readline()
274 if not line:
275 raise IndexError(f"Index file ended at line {i + 1}, before given index was reached")
276 # END abort on eof
277 # END handle runup
279 return RefLogEntry.from_line(line.strip())
280 # END handle index
282 def to_file(self, filepath: PathLike) -> None:
283 """Write the contents of the reflog instance to a file at the given filepath.
285 :param filepath:
286 Path to file. Parent directories are assumed to exist.
287 """
288 lfd = LockedFD(filepath)
289 assure_directory_exists(filepath, is_file=True)
291 fp = lfd.open(write=True, stream=True)
292 try:
293 self._serialize(fp)
294 lfd.commit()
295 except BaseException:
296 lfd.rollback()
297 raise
298 # END handle change
300 @classmethod
301 def append_entry(
302 cls,
303 config_reader: Union[Actor, "GitConfigParser", "SectionConstraint", None],
304 filepath: PathLike,
305 oldbinsha: bytes,
306 newbinsha: bytes,
307 message: str,
308 write: bool = True,
309 ) -> "RefLogEntry":
310 """Append a new log entry to the revlog at filepath.
312 :param config_reader:
313 Configuration reader of the repository - used to obtain user information.
314 May also be an :class:`~git.util.Actor` instance identifying the committer
315 directly or ``None``.
317 :param filepath:
318 Full path to the log file.
320 :param oldbinsha:
321 Binary sha of the previous commit.
323 :param newbinsha:
324 Binary sha of the current commit.
326 :param message:
327 Message describing the change to the reference.
329 :param write:
330 If ``True``, the changes will be written right away.
331 Otherwise the change will not be written.
333 :return:
334 :class:`RefLogEntry` objects which was appended to the log.
336 :note:
337 As we are append-only, concurrent access is not a problem as we do not
338 interfere with readers.
339 """
341 if len(oldbinsha) != 20 or len(newbinsha) != 20:
342 raise ValueError("Shas need to be given in binary format")
343 # END handle sha type
344 assure_directory_exists(filepath, is_file=True)
345 first_line = message.split("\n")[0]
346 if isinstance(config_reader, Actor):
347 committer = config_reader # mypy thinks this is Actor | Gitconfigparser, but why?
348 else:
349 committer = Actor.committer(config_reader)
350 entry = RefLogEntry(
351 (
352 bin_to_hex(oldbinsha).decode("ascii"),
353 bin_to_hex(newbinsha).decode("ascii"),
354 committer,
355 (int(_time.time()), _time.altzone),
356 first_line,
357 )
358 )
360 if write:
361 lf = LockFile(filepath)
362 lf._obtain_lock_or_raise()
363 fd = open(filepath, "ab")
364 try:
365 fd.write(entry.format().encode(defenc))
366 finally:
367 fd.close()
368 lf._release_lock()
369 # END handle write operation
370 return entry
372 def write(self) -> "RefLog":
373 """Write this instance's data to the file we are originating from.
375 :return:
376 self
377 """
378 if self._path is None:
379 raise ValueError("Instance was not initialized with a path, use to_file(...) instead")
380 # END assert path
381 self.to_file(self._path)
382 return self
384 # } END interface
386 # { Serializable Interface
388 def _serialize(self, stream: "BytesIO") -> "RefLog":
389 write = stream.write
391 # Write all entries.
392 for e in self:
393 write(e.format().encode(defenc))
394 # END for each entry
395 return self
397 def _deserialize(self, stream: "BytesIO") -> "RefLog":
398 self.extend(self.iter_entries(stream))
399 return self
401 # } END serializable interface