Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/git/refs/log.py: 49%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# This module is part of GitPython and is released under the
2# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
4__all__ = ["RefLog", "RefLogEntry"]
6from mmap import mmap
7import os.path as osp
8import re
9import time as _time
11from git.compat import defenc
12from git.objects.util import (
13 Serializable,
14 altz_to_utctz_str,
15 parse_date,
16)
17from git.util import (
18 Actor,
19 LockedFD,
20 LockFile,
21 assure_directory_exists,
22 bin_to_hex,
23 file_contents_ro_filepath,
24 to_native_path,
25)
27# typing ------------------------------------------------------------------
29from typing import Iterator, List, Tuple, TYPE_CHECKING, Union
31from git.types import PathLike
33if TYPE_CHECKING:
34 from io import BytesIO
36 from git.config import GitConfigParser, SectionConstraint
37 from git.refs import SymbolicReference
39# ------------------------------------------------------------------------------
42class RefLogEntry(Tuple[str, str, Actor, Tuple[int, int], str]):
43 """Named tuple allowing easy access to the revlog data fields."""
45 _re_hexsha_only = re.compile(r"^[0-9A-Fa-f]{40}$")
47 __slots__ = ()
49 def __repr__(self) -> str:
50 """Representation of ourselves in git reflog format."""
51 return self.format()
53 def format(self) -> str:
54 """:return: A string suitable to be placed in a reflog file."""
55 act = self.actor
56 time = self.time
57 return "{} {} {} <{}> {!s} {}\t{}\n".format(
58 self.oldhexsha,
59 self.newhexsha,
60 act.name,
61 act.email,
62 time[0],
63 altz_to_utctz_str(time[1]),
64 self.message,
65 )
67 @property
68 def oldhexsha(self) -> str:
69 """The hexsha to the commit the ref pointed to before the change."""
70 return self[0]
72 @property
73 def newhexsha(self) -> str:
74 """The hexsha to the commit the ref now points to, after the change."""
75 return self[1]
77 @property
78 def actor(self) -> Actor:
79 """Actor instance, providing access."""
80 return self[2]
82 @property
83 def time(self) -> Tuple[int, int]:
84 """Time as tuple:
86 * [0] = ``int(time)``
87 * [1] = ``int(timezone_offset)`` in :attr:`time.altzone` format
88 """
89 return self[3]
91 @property
92 def message(self) -> str:
93 """Message describing the operation that acted on the reference."""
94 return self[4]
96 @classmethod
97 def new(
98 cls,
99 oldhexsha: str,
100 newhexsha: str,
101 actor: Actor,
102 time: int,
103 tz_offset: int,
104 message: str,
105 ) -> "RefLogEntry": # skipcq: PYL-W0621
106 """:return: New instance of a :class:`RefLogEntry`"""
107 if not isinstance(actor, Actor):
108 raise ValueError("Need actor instance, got %s" % actor)
109 # END check types
110 return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), message))
112 @classmethod
113 def from_line(cls, line: bytes) -> "RefLogEntry":
114 """:return: New :class:`RefLogEntry` instance from the given revlog line.
116 :param line:
117 Line bytes without trailing newline
119 :raise ValueError:
120 If `line` could not be parsed.
121 """
122 line_str = line.decode(defenc)
123 fields = line_str.split("\t", 1)
124 if len(fields) == 1:
125 info, msg = fields[0], None
126 elif len(fields) == 2:
127 info, msg = fields
128 else:
129 raise ValueError("Line must have up to two TAB-separated fields." " Got %s" % repr(line_str))
130 # END handle first split
132 oldhexsha = info[:40]
133 newhexsha = info[41:81]
134 for hexsha in (oldhexsha, newhexsha):
135 if not cls._re_hexsha_only.match(hexsha):
136 raise ValueError("Invalid hexsha: %r" % (hexsha,))
137 # END if hexsha re doesn't match
138 # END for each hexsha
140 email_end = info.find(">", 82)
141 if email_end == -1:
142 raise ValueError("Missing token: >")
143 # END handle missing end brace
145 actor = Actor._from_string(info[82 : email_end + 1])
146 time, tz_offset = parse_date(info[email_end + 2 :]) # skipcq: PYL-W0621
148 return RefLogEntry((oldhexsha, newhexsha, actor, (time, tz_offset), msg))
151class RefLog(List[RefLogEntry], Serializable):
152 R"""A reflog contains :class:`RefLogEntry`\s, each of which defines a certain state
153 of the head in question. Custom query methods allow to retrieve log entries by date
154 or by other criteria.
156 Reflog entries are ordered. The first added entry is first in the list. The last
157 entry, i.e. the last change of the head or reference, is last in the list.
158 """
160 __slots__ = ("_path",)
162 def __new__(cls, filepath: Union[PathLike, None] = None) -> "RefLog":
163 inst = super().__new__(cls)
164 return inst
166 def __init__(self, filepath: Union[PathLike, None] = None) -> None:
167 """Initialize this instance with an optional filepath, from which we will
168 initialize our data. The path is also used to write changes back using the
169 :meth:`write` method."""
170 self._path = filepath
171 if filepath is not None:
172 self._read_from_file()
173 # END handle filepath
175 def _read_from_file(self) -> None:
176 try:
177 fmap = file_contents_ro_filepath(self._path, stream=True, allow_mmap=True)
178 except OSError:
179 # It is possible and allowed that the file doesn't exist!
180 return
181 # END handle invalid log
183 try:
184 self._deserialize(fmap)
185 finally:
186 fmap.close()
187 # END handle closing of handle
189 # { Interface
191 @classmethod
192 def from_file(cls, filepath: PathLike) -> "RefLog":
193 """
194 :return:
195 A new :class:`RefLog` instance containing all entries from the reflog at the
196 given `filepath`.
198 :param filepath:
199 Path to reflog.
201 :raise ValueError:
202 If the file could not be read or was corrupted in some way.
203 """
204 return cls(filepath)
206 @classmethod
207 def path(cls, ref: "SymbolicReference") -> str:
208 """
209 :return:
210 String to absolute path at which the reflog of the given ref instance would
211 be found. The path is not guaranteed to point to a valid file though.
213 :param ref:
214 :class:`~git.refs.symbolic.SymbolicReference` instance
215 """
216 return osp.join(ref.repo.git_dir, "logs", to_native_path(ref.path))
218 @classmethod
219 def iter_entries(cls, stream: Union[str, "BytesIO", mmap]) -> Iterator[RefLogEntry]:
220 """
221 :return:
222 Iterator yielding :class:`RefLogEntry` instances, one for each line read
223 from the given stream.
225 :param stream:
226 File-like object containing the revlog in its native format or string
227 instance pointing to a file to read.
228 """
229 new_entry = RefLogEntry.from_line
230 if isinstance(stream, str):
231 # Default args return mmap since Python 3.
232 _stream = file_contents_ro_filepath(stream)
233 assert isinstance(_stream, mmap)
234 else:
235 _stream = stream
236 # END handle stream type
237 while True:
238 line = _stream.readline()
239 if not line:
240 return
241 yield new_entry(line.strip())
242 # END endless loop
244 @classmethod
245 def entry_at(cls, filepath: PathLike, index: int) -> "RefLogEntry":
246 """
247 :return:
248 :class:`RefLogEntry` at the given index.
250 :param filepath:
251 Full path to the index file from which to read the entry.
253 :param index:
254 Python list compatible index, i.e. it may be negative to specify an entry
255 counted from the end of the list.
257 :raise IndexError:
258 If the entry didn't exist.
260 :note:
261 This method is faster as it only parses the entry at index, skipping all
262 other lines. Nonetheless, the whole file has to be read if the index is
263 negative.
264 """
265 with open(filepath, "rb") as fp:
266 if index < 0:
267 return RefLogEntry.from_line(fp.readlines()[index].strip())
268 # Read until index is reached.
270 for i in range(index + 1):
271 line = fp.readline()
272 if not line:
273 raise IndexError(f"Index file ended at line {i + 1}, before given index was reached")
274 # END abort on eof
275 # END handle runup
277 return RefLogEntry.from_line(line.strip())
278 # END handle index
280 def to_file(self, filepath: PathLike) -> None:
281 """Write the contents of the reflog instance to a file at the given filepath.
283 :param filepath:
284 Path to file. Parent directories are assumed to exist.
285 """
286 lfd = LockedFD(filepath)
287 assure_directory_exists(filepath, is_file=True)
289 fp = lfd.open(write=True, stream=True)
290 try:
291 self._serialize(fp)
292 lfd.commit()
293 except BaseException:
294 lfd.rollback()
295 raise
296 # END handle change
298 @classmethod
299 def append_entry(
300 cls,
301 config_reader: Union[Actor, "GitConfigParser", "SectionConstraint", None],
302 filepath: PathLike,
303 oldbinsha: bytes,
304 newbinsha: bytes,
305 message: str,
306 write: bool = True,
307 ) -> "RefLogEntry":
308 """Append a new log entry to the revlog at filepath.
310 :param config_reader:
311 Configuration reader of the repository - used to obtain user information.
312 May also be an :class:`~git.util.Actor` instance identifying the committer
313 directly or ``None``.
315 :param filepath:
316 Full path to the log file.
318 :param oldbinsha:
319 Binary sha of the previous commit.
321 :param newbinsha:
322 Binary sha of the current commit.
324 :param message:
325 Message describing the change to the reference.
327 :param write:
328 If ``True``, the changes will be written right away.
329 Otherwise the change will not be written.
331 :return:
332 :class:`RefLogEntry` objects which was appended to the log.
334 :note:
335 As we are append-only, concurrent access is not a problem as we do not
336 interfere with readers.
337 """
339 if len(oldbinsha) != 20 or len(newbinsha) != 20:
340 raise ValueError("Shas need to be given in binary format")
341 # END handle sha type
342 assure_directory_exists(filepath, is_file=True)
343 first_line = message.split("\n")[0]
344 if isinstance(config_reader, Actor):
345 committer = config_reader # mypy thinks this is Actor | Gitconfigparser, but why?
346 else:
347 committer = Actor.committer(config_reader)
348 entry = RefLogEntry(
349 (
350 bin_to_hex(oldbinsha).decode("ascii"),
351 bin_to_hex(newbinsha).decode("ascii"),
352 committer,
353 (int(_time.time()), _time.altzone),
354 first_line,
355 )
356 )
358 if write:
359 lf = LockFile(filepath)
360 lf._obtain_lock_or_raise()
361 fd = open(filepath, "ab")
362 try:
363 fd.write(entry.format().encode(defenc))
364 finally:
365 fd.close()
366 lf._release_lock()
367 # END handle write operation
368 return entry
370 def write(self) -> "RefLog":
371 """Write this instance's data to the file we are originating from.
373 :return:
374 self
375 """
376 if self._path is None:
377 raise ValueError("Instance was not initialized with a path, use to_file(...) instead")
378 # END assert path
379 self.to_file(self._path)
380 return self
382 # } END interface
384 # { Serializable Interface
386 def _serialize(self, stream: "BytesIO") -> "RefLog":
387 write = stream.write
389 # Write all entries.
390 for e in self:
391 write(e.format().encode(defenc))
392 # END for each entry
393 return self
395 def _deserialize(self, stream: "BytesIO") -> "RefLog":
396 self.extend(self.iter_entries(stream))
397 return self
399 # } END serializable interface