Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/git/objects/util.py: 42%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
2#
3# This module is part of GitPython and is released under the
4# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
6"""Utility functions for working with git objects."""
8__all__ = [
9 "get_object_type_by_name",
10 "parse_date",
11 "parse_actor_and_date",
12 "ProcessStreamAdapter",
13 "Traversable",
14 "altz_to_utctz_str",
15 "utctz_to_altz",
16 "verify_utctz",
17 "Actor",
18 "tzoffset",
19 "utc",
20]
22from abc import ABC, abstractmethod
23import calendar
24from collections import deque
25from datetime import datetime, timedelta, tzinfo
26import re
27from string import digits
28import time
29import warnings
31from git.util import Actor, IterableList, IterableObj
33# typing ------------------------------------------------------------
35from typing import (
36 Any,
37 Callable,
38 Deque,
39 Iterator,
40 NamedTuple,
41 Sequence,
42 TYPE_CHECKING,
43 Tuple,
44 Type,
45 TypeVar,
46 Union,
47 cast,
48 overload,
49)
51from git.types import Has_id_attribute, Literal
53if TYPE_CHECKING:
54 from io import BytesIO, StringIO
55 from subprocess import Popen
57 from git.types import Protocol, runtime_checkable
59 from .blob import Blob
60 from .commit import Commit
61 from .submodule.base import Submodule
62 from .tag import TagObject
63 from .tree import TraversedTreeTup, Tree
64else:
65 Protocol = ABC
67 def runtime_checkable(f):
68 return f
71class TraverseNT(NamedTuple):
72 depth: int
73 item: Union["Traversable", "Blob"]
74 src: Union["Traversable", None]
77T_TIobj = TypeVar("T_TIobj", bound="TraversableIterableObj") # For TraversableIterableObj.traverse()
79TraversedTup = Union[
80 Tuple[Union["Traversable", None], "Traversable"], # For Commit, Submodule.
81 "TraversedTreeTup", # For Tree.traverse().
82]
84# --------------------------------------------------------------------
86ZERO = timedelta(0)
88# { Functions
91def mode_str_to_int(modestr: Union[bytes, str]) -> int:
92 """Convert mode bits from an octal mode string to an integer mode for git.
94 :param modestr:
95 String like ``755`` or ``644`` or ``100644`` - only the last 6 chars will be
96 used.
98 :return:
99 String identifying a mode compatible to the mode methods ids of the :mod:`stat`
100 module regarding the rwx permissions for user, group and other, special flags
101 and file system flags, such as whether it is a symlink.
102 """
103 mode = 0
104 for iteration, char in enumerate(reversed(modestr[-6:])):
105 char = cast(Union[str, int], char)
106 mode += int(char) << iteration * 3
107 # END for each char
108 return mode
111def get_object_type_by_name(
112 object_type_name: bytes,
113) -> Union[Type["Commit"], Type["TagObject"], Type["Tree"], Type["Blob"]]:
114 """Retrieve the Python class GitPython uses to represent a kind of Git object.
116 :return:
117 A type suitable to handle the given as `object_type_name`.
118 This type can be called create new instances.
120 :param object_type_name:
121 Member of :attr:`Object.TYPES <git.objects.base.Object.TYPES>`.
123 :raise ValueError:
124 If `object_type_name` is unknown.
125 """
126 if object_type_name == b"commit":
127 from . import commit
129 return commit.Commit
130 elif object_type_name == b"tag":
131 from . import tag
133 return tag.TagObject
134 elif object_type_name == b"blob":
135 from . import blob
137 return blob.Blob
138 elif object_type_name == b"tree":
139 from . import tree
141 return tree.Tree
142 else:
143 raise ValueError("Cannot handle unknown object type: %s" % object_type_name.decode())
146def utctz_to_altz(utctz: str) -> int:
147 """Convert a git timezone offset into a timezone offset west of UTC in seconds
148 (compatible with :attr:`time.altzone`).
150 :param utctz:
151 git utc timezone string, e.g. +0200
152 """
153 int_utctz = int(utctz)
154 seconds = (abs(int_utctz) // 100) * 3600 + (abs(int_utctz) % 100) * 60
155 return seconds if int_utctz < 0 else -seconds
158def altz_to_utctz_str(altz: float) -> str:
159 """Convert a timezone offset west of UTC in seconds into a Git timezone offset
160 string.
162 :param altz:
163 Timezone offset in seconds west of UTC.
164 """
165 hours = abs(altz) // 3600
166 minutes = (abs(altz) % 3600) // 60
167 sign = "-" if altz >= 60 else "+"
168 return "{}{:02}{:02}".format(sign, hours, minutes)
171def verify_utctz(offset: str) -> str:
172 """
173 :raise ValueError:
174 If `offset` is incorrect.
176 :return:
177 `offset`
178 """
179 fmt_exc = ValueError("Invalid timezone offset format: %s" % offset)
180 if len(offset) != 5:
181 raise fmt_exc
182 if offset[0] not in "+-":
183 raise fmt_exc
184 if offset[1] not in digits or offset[2] not in digits or offset[3] not in digits or offset[4] not in digits:
185 raise fmt_exc
186 # END for each char
187 return offset
190class tzoffset(tzinfo):
191 def __init__(self, secs_west_of_utc: float, name: Union[None, str] = None) -> None:
192 self._offset = timedelta(seconds=-secs_west_of_utc)
193 self._name = name or "fixed"
195 def __reduce__(self) -> Tuple[Type["tzoffset"], Tuple[float, str]]:
196 return tzoffset, (-self._offset.total_seconds(), self._name)
198 def utcoffset(self, dt: Union[datetime, None]) -> timedelta:
199 return self._offset
201 def tzname(self, dt: Union[datetime, None]) -> str:
202 return self._name
204 def dst(self, dt: Union[datetime, None]) -> timedelta:
205 return ZERO
208utc = tzoffset(0, "UTC")
211def from_timestamp(timestamp: float, tz_offset: float) -> datetime:
212 """Convert a `timestamp` + `tz_offset` into an aware :class:`~datetime.datetime`
213 instance."""
214 utc_dt = datetime.fromtimestamp(timestamp, utc)
215 try:
216 local_dt = utc_dt.astimezone(tzoffset(tz_offset))
217 return local_dt
218 except ValueError:
219 return utc_dt
222def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]:
223 """Parse the given date as one of the following:
225 * Aware datetime instance
226 * Git internal format: timestamp offset
227 * :rfc:`2822`: ``Thu, 07 Apr 2005 22:13:13 +0200``
228 * ISO 8601: ``2005-04-07T22:13:13`` - The ``T`` can be a space as well.
230 :return:
231 Tuple(int(timestamp_UTC), int(offset)), both in seconds since epoch
233 :raise ValueError:
234 If the format could not be understood.
236 :note:
237 Date can also be ``YYYY.MM.DD``, ``MM/DD/YYYY`` and ``DD.MM.YYYY``.
238 """
239 if isinstance(string_date, datetime):
240 if string_date.tzinfo:
241 utcoffset = cast(timedelta, string_date.utcoffset()) # typeguard, if tzinfoand is not None
242 offset = -int(utcoffset.total_seconds())
243 return int(string_date.astimezone(utc).timestamp()), offset
244 else:
245 raise ValueError(f"string_date datetime object without tzinfo, {string_date}")
247 # Git time
248 try:
249 if string_date.count(" ") == 1 and string_date.rfind(":") == -1:
250 timestamp, offset_str = string_date.split()
251 if timestamp.startswith("@"):
252 timestamp = timestamp[1:]
253 timestamp_int = int(timestamp)
254 return timestamp_int, utctz_to_altz(verify_utctz(offset_str))
255 else:
256 offset_str = "+0000" # Local time by default.
257 if string_date[-5] in "-+":
258 offset_str = verify_utctz(string_date[-5:])
259 string_date = string_date[:-6] # skip space as well
260 # END split timezone info
261 offset = utctz_to_altz(offset_str)
263 # Now figure out the date and time portion - split time.
264 date_formats = []
265 splitter = -1
266 if "," in string_date:
267 date_formats.append("%a, %d %b %Y")
268 splitter = string_date.rfind(" ")
269 else:
270 # ISO plus additional
271 date_formats.append("%Y-%m-%d")
272 date_formats.append("%Y.%m.%d")
273 date_formats.append("%m/%d/%Y")
274 date_formats.append("%d.%m.%Y")
276 splitter = string_date.rfind("T")
277 if splitter == -1:
278 splitter = string_date.rfind(" ")
279 # END handle 'T' and ' '
280 # END handle RFC or ISO
282 assert splitter > -1
284 # Split date and time.
285 time_part = string_date[splitter + 1 :] # Skip space.
286 date_part = string_date[:splitter]
288 # Parse time.
289 tstruct = time.strptime(time_part, "%H:%M:%S")
291 for fmt in date_formats:
292 try:
293 dtstruct = time.strptime(date_part, fmt)
294 utctime = calendar.timegm(
295 (
296 dtstruct.tm_year,
297 dtstruct.tm_mon,
298 dtstruct.tm_mday,
299 tstruct.tm_hour,
300 tstruct.tm_min,
301 tstruct.tm_sec,
302 dtstruct.tm_wday,
303 dtstruct.tm_yday,
304 tstruct.tm_isdst,
305 )
306 )
307 return int(utctime), offset
308 except ValueError:
309 continue
310 # END exception handling
311 # END for each fmt
313 # Still here ? fail.
314 raise ValueError("no format matched")
315 # END handle format
316 except Exception as e:
317 raise ValueError(f"Unsupported date format or type: {string_date}, type={type(string_date)}") from e
318 # END handle exceptions
321# Precompiled regexes
322_re_actor_epoch = re.compile(r"^.+? (.*) (\d+) ([+-]\d+).*$")
323_re_only_actor = re.compile(r"^.+? (.*)$")
326def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]:
327 """Parse out the actor (author or committer) info from a line like::
329 author Tom Preston-Werner <tom@mojombo.com> 1191999972 -0700
331 :return:
332 [Actor, int_seconds_since_epoch, int_timezone_offset]
333 """
334 actor, epoch, offset = "", "0", "0"
335 m = _re_actor_epoch.search(line)
336 if m:
337 actor, epoch, offset = m.groups()
338 else:
339 m = _re_only_actor.search(line)
340 actor = m.group(1) if m else line or ""
341 return (Actor._from_string(actor), int(epoch), utctz_to_altz(offset))
344# } END functions
347# { Classes
350class ProcessStreamAdapter:
351 """Class wiring all calls to the contained Process instance.
353 Use this type to hide the underlying process to provide access only to a specified
354 stream. The process is usually wrapped into an :class:`~git.cmd.Git.AutoInterrupt`
355 class to kill it if the instance goes out of scope.
356 """
358 __slots__ = ("_proc", "_stream")
360 def __init__(self, process: "Popen", stream_name: str) -> None:
361 self._proc = process
362 self._stream: StringIO = getattr(process, stream_name) # guessed type
364 def __getattr__(self, attr: str) -> Any:
365 return getattr(self._stream, attr)
368@runtime_checkable
369class Traversable(Protocol):
370 """Simple interface to perform depth-first or breadth-first traversals in one
371 direction.
373 Subclasses only need to implement one function.
375 Instances of the subclass must be hashable.
377 Defined subclasses:
379 * :class:`Commit <git.objects.Commit>`
380 * :class:`Tree <git.objects.tree.Tree>`
381 * :class:`Submodule <git.objects.submodule.base.Submodule>`
382 """
384 __slots__ = ()
386 @classmethod
387 @abstractmethod
388 def _get_intermediate_items(cls, item: Any) -> Sequence["Traversable"]:
389 """
390 :return:
391 Tuple of items connected to the given item.
392 Must be implemented in subclass.
394 class Commit:: (cls, Commit) -> Tuple[Commit, ...]
395 class Submodule:: (cls, Submodule) -> Iterablelist[Submodule]
396 class Tree:: (cls, Tree) -> Tuple[Tree, ...]
397 """
398 raise NotImplementedError("To be implemented in subclass")
400 @abstractmethod
401 def list_traverse(self, *args: Any, **kwargs: Any) -> Any:
402 """Traverse self and collect all items found.
404 Calling this directly on the abstract base class, including via a ``super()``
405 proxy, is deprecated. Only overridden implementations should be called.
406 """
407 warnings.warn(
408 "list_traverse() method should only be called from subclasses."
409 " Calling from Traversable abstract class will raise NotImplementedError in 4.0.0."
410 " The concrete subclasses in GitPython itself are 'Commit', 'RootModule', 'Submodule', and 'Tree'.",
411 DeprecationWarning,
412 stacklevel=2,
413 )
414 return self._list_traverse(*args, **kwargs)
416 def _list_traverse(
417 self, as_edge: bool = False, *args: Any, **kwargs: Any
418 ) -> IterableList[Union["Commit", "Submodule", "Tree", "Blob"]]:
419 """Traverse self and collect all items found.
421 :return:
422 :class:`~git.util.IterableList` with the results of the traversal as
423 produced by :meth:`traverse`::
425 Commit -> IterableList[Commit]
426 Submodule -> IterableList[Submodule]
427 Tree -> IterableList[Union[Submodule, Tree, Blob]]
428 """
429 # Commit and Submodule have id.__attribute__ as IterableObj.
430 # Tree has id.__attribute__ inherited from IndexObject.
431 if isinstance(self, Has_id_attribute):
432 id = self._id_attribute_
433 else:
434 # Shouldn't reach here, unless Traversable subclass created with no
435 # _id_attribute_.
436 id = ""
437 # Could add _id_attribute_ to Traversable, or make all Traversable also
438 # Iterable?
440 if not as_edge:
441 out: IterableList[Union["Commit", "Submodule", "Tree", "Blob"]] = IterableList(id)
442 out.extend(self.traverse(as_edge=as_edge, *args, **kwargs)) # noqa: B026
443 return out
444 # Overloads in subclasses (mypy doesn't allow typing self: subclass).
445 # Union[IterableList['Commit'], IterableList['Submodule'], IterableList[Union['Submodule', 'Tree', 'Blob']]]
446 else:
447 # Raise DeprecationWarning, it doesn't make sense to use this.
448 out_list: IterableList = IterableList(self.traverse(*args, **kwargs))
449 return out_list
451 @abstractmethod
452 def traverse(self, *args: Any, **kwargs: Any) -> Any:
453 """Iterator yielding items found when traversing self.
455 Calling this directly on the abstract base class, including via a ``super()``
456 proxy, is deprecated. Only overridden implementations should be called.
457 """
458 warnings.warn(
459 "traverse() method should only be called from subclasses."
460 " Calling from Traversable abstract class will raise NotImplementedError in 4.0.0."
461 " The concrete subclasses in GitPython itself are 'Commit', 'RootModule', 'Submodule', and 'Tree'.",
462 DeprecationWarning,
463 stacklevel=2,
464 )
465 return self._traverse(*args, **kwargs)
467 def _traverse(
468 self,
469 predicate: Callable[[Union["Traversable", "Blob", TraversedTup], int], bool] = lambda i, d: True,
470 prune: Callable[[Union["Traversable", "Blob", TraversedTup], int], bool] = lambda i, d: False,
471 depth: int = -1,
472 branch_first: bool = True,
473 visit_once: bool = True,
474 ignore_self: int = 1,
475 as_edge: bool = False,
476 ) -> Union[Iterator[Union["Traversable", "Blob"]], Iterator[TraversedTup]]:
477 """Iterator yielding items found when traversing `self`.
479 :param predicate:
480 A function ``f(i,d)`` that returns ``False`` if item i at depth ``d`` should
481 not be included in the result.
483 :param prune:
484 A function ``f(i,d)`` that returns ``True`` if the search should stop at
485 item ``i`` at depth ``d``. Item ``i`` will not be returned.
487 :param depth:
488 Defines at which level the iteration should not go deeper if -1. There is no
489 limit if 0, you would effectively only get `self`, the root of the
490 iteration. If 1, you would only get the first level of
491 predecessors/successors.
493 :param branch_first:
494 If ``True``, items will be returned branch first, otherwise depth first.
496 :param visit_once:
497 If ``True``, items will only be returned once, although they might be
498 encountered several times. Loops are prevented that way.
500 :param ignore_self:
501 If ``True``, `self` will be ignored and automatically pruned from the
502 result. Otherwise it will be the first item to be returned. If `as_edge` is
503 ``True``, the source of the first edge is ``None``.
505 :param as_edge:
506 If ``True``, return a pair of items, first being the source, second the
507 destination, i.e. tuple(src, dest) with the edge spanning from source to
508 destination.
510 :return:
511 Iterator yielding items found when traversing `self`::
513 Commit -> Iterator[Union[Commit, Tuple[Commit, Commit]] Submodule ->
514 Iterator[Submodule, Tuple[Submodule, Submodule]] Tree ->
515 Iterator[Union[Blob, Tree, Submodule,
516 Tuple[Union[Submodule, Tree], Union[Blob, Tree,
517 Submodule]]]
519 ignore_self=True is_edge=True -> Iterator[item] ignore_self=True
520 is_edge=False --> Iterator[item] ignore_self=False is_edge=True ->
521 Iterator[item] | Iterator[Tuple[src, item]] ignore_self=False
522 is_edge=False -> Iterator[Tuple[src, item]]
523 """
525 visited = set()
526 stack: Deque[TraverseNT] = deque()
527 stack.append(TraverseNT(0, self, None)) # self is always depth level 0.
529 def addToStack(
530 stack: Deque[TraverseNT],
531 src_item: "Traversable",
532 branch_first: bool,
533 depth: int,
534 ) -> None:
535 lst = self._get_intermediate_items(item)
536 if not lst: # Empty list
537 return
538 if branch_first:
539 stack.extendleft(TraverseNT(depth, i, src_item) for i in lst)
540 else:
541 reviter = (TraverseNT(depth, lst[i], src_item) for i in range(len(lst) - 1, -1, -1))
542 stack.extend(reviter)
544 # END addToStack local method
546 while stack:
547 d, item, src = stack.pop() # Depth of item, item, item_source
549 if visit_once and item in visited:
550 continue
552 if visit_once:
553 visited.add(item)
555 rval: Union[TraversedTup, "Traversable", "Blob"]
556 if as_edge:
557 # If as_edge return (src, item) unless rrc is None
558 # (e.g. for first item).
559 rval = (src, item)
560 else:
561 rval = item
563 if prune(rval, d):
564 continue
566 skipStartItem = ignore_self and (item is self)
567 if not skipStartItem and predicate(rval, d):
568 yield rval
570 # Only continue to next level if this is appropriate!
571 next_d = d + 1
572 if depth > -1 and next_d > depth:
573 continue
575 addToStack(stack, item, branch_first, next_d)
576 # END for each item on work stack
579@runtime_checkable
580class Serializable(Protocol):
581 """Defines methods to serialize and deserialize objects from and into a data
582 stream."""
584 __slots__ = ()
586 # @abstractmethod
587 def _serialize(self, stream: "BytesIO") -> "Serializable":
588 """Serialize the data of this object into the given data stream.
590 :note:
591 A serialized object would :meth:`_deserialize` into the same object.
593 :param stream:
594 A file-like object.
596 :return:
597 self
598 """
599 raise NotImplementedError("To be implemented in subclass")
601 # @abstractmethod
602 def _deserialize(self, stream: "BytesIO") -> "Serializable":
603 """Deserialize all information regarding this object from the stream.
605 :param stream:
606 A file-like object.
608 :return:
609 self
610 """
611 raise NotImplementedError("To be implemented in subclass")
614class TraversableIterableObj(IterableObj, Traversable):
615 __slots__ = ()
617 TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj]
619 def list_traverse(self: T_TIobj, *args: Any, **kwargs: Any) -> IterableList[T_TIobj]:
620 return super()._list_traverse(*args, **kwargs)
622 @overload
623 def traverse(self: T_TIobj) -> Iterator[T_TIobj]: ...
625 @overload
626 def traverse(
627 self: T_TIobj,
628 predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
629 prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
630 depth: int,
631 branch_first: bool,
632 visit_once: bool,
633 ignore_self: Literal[True],
634 as_edge: Literal[False],
635 ) -> Iterator[T_TIobj]: ...
637 @overload
638 def traverse(
639 self: T_TIobj,
640 predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
641 prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
642 depth: int,
643 branch_first: bool,
644 visit_once: bool,
645 ignore_self: Literal[False],
646 as_edge: Literal[True],
647 ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]: ...
649 @overload
650 def traverse(
651 self: T_TIobj,
652 predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool],
653 prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool],
654 depth: int,
655 branch_first: bool,
656 visit_once: bool,
657 ignore_self: Literal[True],
658 as_edge: Literal[True],
659 ) -> Iterator[Tuple[T_TIobj, T_TIobj]]: ...
661 def traverse(
662 self: T_TIobj,
663 predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool] = lambda i, d: True,
664 prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool] = lambda i, d: False,
665 depth: int = -1,
666 branch_first: bool = True,
667 visit_once: bool = True,
668 ignore_self: int = 1,
669 as_edge: bool = False,
670 ) -> Union[Iterator[T_TIobj], Iterator[Tuple[T_TIobj, T_TIobj]], Iterator[TIobj_tuple]]:
671 """For documentation, see :meth:`Traversable._traverse`."""
673 ## To typecheck instead of using cast:
674 #
675 # import itertools
676 # from git.types import TypeGuard
677 # def is_commit_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Tuple['Commit', 'Commit']]]]:
678 # for x in inp[1]:
679 # if not isinstance(x, tuple) and len(x) != 2:
680 # if all(isinstance(inner, Commit) for inner in x):
681 # continue
682 # return True
683 #
684 # ret = super(Commit, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge)
685 # ret_tup = itertools.tee(ret, 2)
686 # assert is_commit_traversed(ret_tup), f"{[type(x) for x in list(ret_tup[0])]}"
687 # return ret_tup[0]
689 return cast(
690 Union[Iterator[T_TIobj], Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]],
691 super()._traverse(
692 predicate, # type: ignore[arg-type]
693 prune, # type: ignore[arg-type]
694 depth,
695 branch_first,
696 visit_once,
697 ignore_self,
698 as_edge,
699 ),
700 )