Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/git/objects/util.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

229 statements  

1# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors 

2# 

3# This module is part of GitPython and is released under the 

4# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ 

5 

6"""Utility functions for working with git objects.""" 

7 

8__all__ = [ 

9 "get_object_type_by_name", 

10 "parse_date", 

11 "parse_actor_and_date", 

12 "ProcessStreamAdapter", 

13 "Traversable", 

14 "altz_to_utctz_str", 

15 "utctz_to_altz", 

16 "verify_utctz", 

17 "Actor", 

18 "tzoffset", 

19 "utc", 

20] 

21 

22from abc import ABC, abstractmethod 

23import calendar 

24from collections import deque 

25from datetime import datetime, timedelta, tzinfo 

26import re 

27from string import digits 

28import time 

29import warnings 

30 

31from git.util import Actor, IterableList, IterableObj 

32 

33# typing ------------------------------------------------------------ 

34 

35from typing import ( 

36 Any, 

37 Callable, 

38 Deque, 

39 Iterator, 

40 NamedTuple, 

41 Sequence, 

42 TYPE_CHECKING, 

43 Tuple, 

44 Type, 

45 TypeVar, 

46 Union, 

47 cast, 

48 overload, 

49) 

50 

51from git.types import Has_id_attribute, Literal 

52 

53if TYPE_CHECKING: 

54 from io import BytesIO, StringIO 

55 from subprocess import Popen 

56 

57 from git.types import Protocol, runtime_checkable 

58 

59 from .blob import Blob 

60 from .commit import Commit 

61 from .submodule.base import Submodule 

62 from .tag import TagObject 

63 from .tree import TraversedTreeTup, Tree 

64else: 

65 Protocol = ABC 

66 

67 def runtime_checkable(f): 

68 return f 

69 

70 

71class TraverseNT(NamedTuple): 

72 depth: int 

73 item: Union["Traversable", "Blob"] 

74 src: Union["Traversable", None] 

75 

76 

77T_TIobj = TypeVar("T_TIobj", bound="TraversableIterableObj") # For TraversableIterableObj.traverse() 

78 

79TraversedTup = Union[ 

80 Tuple[Union["Traversable", None], "Traversable"], # For Commit, Submodule. 

81 "TraversedTreeTup", # For Tree.traverse(). 

82] 

83 

84# -------------------------------------------------------------------- 

85 

86ZERO = timedelta(0) 

87 

88# { Functions 

89 

90 

91def mode_str_to_int(modestr: Union[bytes, str]) -> int: 

92 """Convert mode bits from an octal mode string to an integer mode for git. 

93 

94 :param modestr: 

95 String like ``755`` or ``644`` or ``100644`` - only the last 6 chars will be 

96 used. 

97 

98 :return: 

99 String identifying a mode compatible to the mode methods ids of the :mod:`stat` 

100 module regarding the rwx permissions for user, group and other, special flags 

101 and file system flags, such as whether it is a symlink. 

102 """ 

103 mode = 0 

104 for iteration, char in enumerate(reversed(modestr[-6:])): 

105 char = cast(Union[str, int], char) 

106 mode += int(char) << iteration * 3 

107 # END for each char 

108 return mode 

109 

110 

111def get_object_type_by_name( 

112 object_type_name: bytes, 

113) -> Union[Type["Commit"], Type["TagObject"], Type["Tree"], Type["Blob"]]: 

114 """Retrieve the Python class GitPython uses to represent a kind of Git object. 

115 

116 :return: 

117 A type suitable to handle the given as `object_type_name`. 

118 This type can be called create new instances. 

119 

120 :param object_type_name: 

121 Member of :attr:`Object.TYPES <git.objects.base.Object.TYPES>`. 

122 

123 :raise ValueError: 

124 If `object_type_name` is unknown. 

125 """ 

126 if object_type_name == b"commit": 

127 from . import commit 

128 

129 return commit.Commit 

130 elif object_type_name == b"tag": 

131 from . import tag 

132 

133 return tag.TagObject 

134 elif object_type_name == b"blob": 

135 from . import blob 

136 

137 return blob.Blob 

138 elif object_type_name == b"tree": 

139 from . import tree 

140 

141 return tree.Tree 

142 else: 

143 raise ValueError("Cannot handle unknown object type: %s" % object_type_name.decode()) 

144 

145 

146def utctz_to_altz(utctz: str) -> int: 

147 """Convert a git timezone offset into a timezone offset west of UTC in seconds 

148 (compatible with :attr:`time.altzone`). 

149 

150 :param utctz: 

151 git utc timezone string, e.g. +0200 

152 """ 

153 int_utctz = int(utctz) 

154 seconds = (abs(int_utctz) // 100) * 3600 + (abs(int_utctz) % 100) * 60 

155 return seconds if int_utctz < 0 else -seconds 

156 

157 

158def altz_to_utctz_str(altz: float) -> str: 

159 """Convert a timezone offset west of UTC in seconds into a Git timezone offset 

160 string. 

161 

162 :param altz: 

163 Timezone offset in seconds west of UTC. 

164 """ 

165 hours = abs(altz) // 3600 

166 minutes = (abs(altz) % 3600) // 60 

167 sign = "-" if altz >= 60 else "+" 

168 return "{}{:02}{:02}".format(sign, hours, minutes) 

169 

170 

171def verify_utctz(offset: str) -> str: 

172 """ 

173 :raise ValueError: 

174 If `offset` is incorrect. 

175 

176 :return: 

177 `offset` 

178 """ 

179 fmt_exc = ValueError("Invalid timezone offset format: %s" % offset) 

180 if len(offset) != 5: 

181 raise fmt_exc 

182 if offset[0] not in "+-": 

183 raise fmt_exc 

184 if offset[1] not in digits or offset[2] not in digits or offset[3] not in digits or offset[4] not in digits: 

185 raise fmt_exc 

186 # END for each char 

187 return offset 

188 

189 

190class tzoffset(tzinfo): 

191 def __init__(self, secs_west_of_utc: float, name: Union[None, str] = None) -> None: 

192 self._offset = timedelta(seconds=-secs_west_of_utc) 

193 self._name = name or "fixed" 

194 

195 def __reduce__(self) -> Tuple[Type["tzoffset"], Tuple[float, str]]: 

196 return tzoffset, (-self._offset.total_seconds(), self._name) 

197 

198 def utcoffset(self, dt: Union[datetime, None]) -> timedelta: 

199 return self._offset 

200 

201 def tzname(self, dt: Union[datetime, None]) -> str: 

202 return self._name 

203 

204 def dst(self, dt: Union[datetime, None]) -> timedelta: 

205 return ZERO 

206 

207 

208utc = tzoffset(0, "UTC") 

209 

210 

211def from_timestamp(timestamp: float, tz_offset: float) -> datetime: 

212 """Convert a `timestamp` + `tz_offset` into an aware :class:`~datetime.datetime` 

213 instance.""" 

214 utc_dt = datetime.fromtimestamp(timestamp, utc) 

215 try: 

216 local_dt = utc_dt.astimezone(tzoffset(tz_offset)) 

217 return local_dt 

218 except ValueError: 

219 return utc_dt 

220 

221 

222def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: 

223 """Parse the given date as one of the following: 

224 

225 * Aware datetime instance 

226 * Git internal format: timestamp offset 

227 * :rfc:`2822`: ``Thu, 07 Apr 2005 22:13:13 +0200`` 

228 * ISO 8601: ``2005-04-07T22:13:13`` - The ``T`` can be a space as well. 

229 

230 :return: 

231 Tuple(int(timestamp_UTC), int(offset)), both in seconds since epoch 

232 

233 :raise ValueError: 

234 If the format could not be understood. 

235 

236 :note: 

237 Date can also be ``YYYY.MM.DD``, ``MM/DD/YYYY`` and ``DD.MM.YYYY``. 

238 """ 

239 if isinstance(string_date, datetime): 

240 if string_date.tzinfo: 

241 utcoffset = cast(timedelta, string_date.utcoffset()) # typeguard, if tzinfoand is not None 

242 offset = -int(utcoffset.total_seconds()) 

243 return int(string_date.astimezone(utc).timestamp()), offset 

244 else: 

245 raise ValueError(f"string_date datetime object without tzinfo, {string_date}") 

246 

247 # Git time 

248 try: 

249 if string_date.count(" ") == 1 and string_date.rfind(":") == -1: 

250 timestamp, offset_str = string_date.split() 

251 if timestamp.startswith("@"): 

252 timestamp = timestamp[1:] 

253 timestamp_int = int(timestamp) 

254 return timestamp_int, utctz_to_altz(verify_utctz(offset_str)) 

255 else: 

256 offset_str = "+0000" # Local time by default. 

257 if string_date[-5] in "-+": 

258 offset_str = verify_utctz(string_date[-5:]) 

259 string_date = string_date[:-6] # skip space as well 

260 # END split timezone info 

261 offset = utctz_to_altz(offset_str) 

262 

263 # Now figure out the date and time portion - split time. 

264 date_formats = [] 

265 splitter = -1 

266 if "," in string_date: 

267 date_formats.append("%a, %d %b %Y") 

268 splitter = string_date.rfind(" ") 

269 else: 

270 # ISO plus additional 

271 date_formats.append("%Y-%m-%d") 

272 date_formats.append("%Y.%m.%d") 

273 date_formats.append("%m/%d/%Y") 

274 date_formats.append("%d.%m.%Y") 

275 

276 splitter = string_date.rfind("T") 

277 if splitter == -1: 

278 splitter = string_date.rfind(" ") 

279 # END handle 'T' and ' ' 

280 # END handle RFC or ISO 

281 

282 assert splitter > -1 

283 

284 # Split date and time. 

285 time_part = string_date[splitter + 1 :] # Skip space. 

286 date_part = string_date[:splitter] 

287 

288 # Parse time. 

289 tstruct = time.strptime(time_part, "%H:%M:%S") 

290 

291 for fmt in date_formats: 

292 try: 

293 dtstruct = time.strptime(date_part, fmt) 

294 utctime = calendar.timegm( 

295 ( 

296 dtstruct.tm_year, 

297 dtstruct.tm_mon, 

298 dtstruct.tm_mday, 

299 tstruct.tm_hour, 

300 tstruct.tm_min, 

301 tstruct.tm_sec, 

302 dtstruct.tm_wday, 

303 dtstruct.tm_yday, 

304 tstruct.tm_isdst, 

305 ) 

306 ) 

307 return int(utctime), offset 

308 except ValueError: 

309 continue 

310 # END exception handling 

311 # END for each fmt 

312 

313 # Still here ? fail. 

314 raise ValueError("no format matched") 

315 # END handle format 

316 except Exception as e: 

317 raise ValueError(f"Unsupported date format or type: {string_date}, type={type(string_date)}") from e 

318 # END handle exceptions 

319 

320 

321# Precompiled regexes 

322_re_actor_epoch = re.compile(r"^.+? (.*) (\d+) ([+-]\d+).*$") 

323_re_only_actor = re.compile(r"^.+? (.*)$") 

324 

325 

326def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]: 

327 """Parse out the actor (author or committer) info from a line like:: 

328 

329 author Tom Preston-Werner <tom@mojombo.com> 1191999972 -0700 

330 

331 :return: 

332 [Actor, int_seconds_since_epoch, int_timezone_offset] 

333 """ 

334 actor, epoch, offset = "", "0", "0" 

335 m = _re_actor_epoch.search(line) 

336 if m: 

337 actor, epoch, offset = m.groups() 

338 else: 

339 m = _re_only_actor.search(line) 

340 actor = m.group(1) if m else line or "" 

341 return (Actor._from_string(actor), int(epoch), utctz_to_altz(offset)) 

342 

343 

344# } END functions 

345 

346 

347# { Classes 

348 

349 

350class ProcessStreamAdapter: 

351 """Class wiring all calls to the contained Process instance. 

352 

353 Use this type to hide the underlying process to provide access only to a specified 

354 stream. The process is usually wrapped into an :class:`~git.cmd.Git.AutoInterrupt` 

355 class to kill it if the instance goes out of scope. 

356 """ 

357 

358 __slots__ = ("_proc", "_stream") 

359 

360 def __init__(self, process: "Popen", stream_name: str) -> None: 

361 self._proc = process 

362 self._stream: StringIO = getattr(process, stream_name) # guessed type 

363 

364 def __getattr__(self, attr: str) -> Any: 

365 return getattr(self._stream, attr) 

366 

367 

368@runtime_checkable 

369class Traversable(Protocol): 

370 """Simple interface to perform depth-first or breadth-first traversals in one 

371 direction. 

372 

373 Subclasses only need to implement one function. 

374 

375 Instances of the subclass must be hashable. 

376 

377 Defined subclasses: 

378 

379 * :class:`Commit <git.objects.Commit>` 

380 * :class:`Tree <git.objects.tree.Tree>` 

381 * :class:`Submodule <git.objects.submodule.base.Submodule>` 

382 """ 

383 

384 __slots__ = () 

385 

386 @classmethod 

387 @abstractmethod 

388 def _get_intermediate_items(cls, item: Any) -> Sequence["Traversable"]: 

389 """ 

390 :return: 

391 Tuple of items connected to the given item. 

392 Must be implemented in subclass. 

393 

394 class Commit:: (cls, Commit) -> Tuple[Commit, ...] 

395 class Submodule:: (cls, Submodule) -> Iterablelist[Submodule] 

396 class Tree:: (cls, Tree) -> Tuple[Tree, ...] 

397 """ 

398 raise NotImplementedError("To be implemented in subclass") 

399 

400 @abstractmethod 

401 def list_traverse(self, *args: Any, **kwargs: Any) -> Any: 

402 """Traverse self and collect all items found. 

403 

404 Calling this directly on the abstract base class, including via a ``super()`` 

405 proxy, is deprecated. Only overridden implementations should be called. 

406 """ 

407 warnings.warn( 

408 "list_traverse() method should only be called from subclasses." 

409 " Calling from Traversable abstract class will raise NotImplementedError in 4.0.0." 

410 " The concrete subclasses in GitPython itself are 'Commit', 'RootModule', 'Submodule', and 'Tree'.", 

411 DeprecationWarning, 

412 stacklevel=2, 

413 ) 

414 return self._list_traverse(*args, **kwargs) 

415 

416 def _list_traverse( 

417 self, as_edge: bool = False, *args: Any, **kwargs: Any 

418 ) -> IterableList[Union["Commit", "Submodule", "Tree", "Blob"]]: 

419 """Traverse self and collect all items found. 

420 

421 :return: 

422 :class:`~git.util.IterableList` with the results of the traversal as 

423 produced by :meth:`traverse`:: 

424 

425 Commit -> IterableList[Commit] 

426 Submodule -> IterableList[Submodule] 

427 Tree -> IterableList[Union[Submodule, Tree, Blob]] 

428 """ 

429 # Commit and Submodule have id.__attribute__ as IterableObj. 

430 # Tree has id.__attribute__ inherited from IndexObject. 

431 if isinstance(self, Has_id_attribute): 

432 id = self._id_attribute_ 

433 else: 

434 # Shouldn't reach here, unless Traversable subclass created with no 

435 # _id_attribute_. 

436 id = "" 

437 # Could add _id_attribute_ to Traversable, or make all Traversable also 

438 # Iterable? 

439 

440 if not as_edge: 

441 out: IterableList[Union["Commit", "Submodule", "Tree", "Blob"]] = IterableList(id) 

442 out.extend(self.traverse(as_edge=as_edge, *args, **kwargs)) # noqa: B026 

443 return out 

444 # Overloads in subclasses (mypy doesn't allow typing self: subclass). 

445 # Union[IterableList['Commit'], IterableList['Submodule'], IterableList[Union['Submodule', 'Tree', 'Blob']]] 

446 else: 

447 # Raise DeprecationWarning, it doesn't make sense to use this. 

448 out_list: IterableList = IterableList(self.traverse(*args, **kwargs)) 

449 return out_list 

450 

451 @abstractmethod 

452 def traverse(self, *args: Any, **kwargs: Any) -> Any: 

453 """Iterator yielding items found when traversing self. 

454 

455 Calling this directly on the abstract base class, including via a ``super()`` 

456 proxy, is deprecated. Only overridden implementations should be called. 

457 """ 

458 warnings.warn( 

459 "traverse() method should only be called from subclasses." 

460 " Calling from Traversable abstract class will raise NotImplementedError in 4.0.0." 

461 " The concrete subclasses in GitPython itself are 'Commit', 'RootModule', 'Submodule', and 'Tree'.", 

462 DeprecationWarning, 

463 stacklevel=2, 

464 ) 

465 return self._traverse(*args, **kwargs) 

466 

467 def _traverse( 

468 self, 

469 predicate: Callable[[Union["Traversable", "Blob", TraversedTup], int], bool] = lambda i, d: True, 

470 prune: Callable[[Union["Traversable", "Blob", TraversedTup], int], bool] = lambda i, d: False, 

471 depth: int = -1, 

472 branch_first: bool = True, 

473 visit_once: bool = True, 

474 ignore_self: int = 1, 

475 as_edge: bool = False, 

476 ) -> Union[Iterator[Union["Traversable", "Blob"]], Iterator[TraversedTup]]: 

477 """Iterator yielding items found when traversing `self`. 

478 

479 :param predicate: 

480 A function ``f(i,d)`` that returns ``False`` if item i at depth ``d`` should 

481 not be included in the result. 

482 

483 :param prune: 

484 A function ``f(i,d)`` that returns ``True`` if the search should stop at 

485 item ``i`` at depth ``d``. Item ``i`` will not be returned. 

486 

487 :param depth: 

488 Defines at which level the iteration should not go deeper if -1. There is no 

489 limit if 0, you would effectively only get `self`, the root of the 

490 iteration. If 1, you would only get the first level of 

491 predecessors/successors. 

492 

493 :param branch_first: 

494 If ``True``, items will be returned branch first, otherwise depth first. 

495 

496 :param visit_once: 

497 If ``True``, items will only be returned once, although they might be 

498 encountered several times. Loops are prevented that way. 

499 

500 :param ignore_self: 

501 If ``True``, `self` will be ignored and automatically pruned from the 

502 result. Otherwise it will be the first item to be returned. If `as_edge` is 

503 ``True``, the source of the first edge is ``None``. 

504 

505 :param as_edge: 

506 If ``True``, return a pair of items, first being the source, second the 

507 destination, i.e. tuple(src, dest) with the edge spanning from source to 

508 destination. 

509 

510 :return: 

511 Iterator yielding items found when traversing `self`:: 

512 

513 Commit -> Iterator[Union[Commit, Tuple[Commit, Commit]] Submodule -> 

514 Iterator[Submodule, Tuple[Submodule, Submodule]] Tree -> 

515 Iterator[Union[Blob, Tree, Submodule, 

516 Tuple[Union[Submodule, Tree], Union[Blob, Tree, 

517 Submodule]]] 

518 

519 ignore_self=True is_edge=True -> Iterator[item] ignore_self=True 

520 is_edge=False --> Iterator[item] ignore_self=False is_edge=True -> 

521 Iterator[item] | Iterator[Tuple[src, item]] ignore_self=False 

522 is_edge=False -> Iterator[Tuple[src, item]] 

523 """ 

524 

525 visited = set() 

526 stack: Deque[TraverseNT] = deque() 

527 stack.append(TraverseNT(0, self, None)) # self is always depth level 0. 

528 

529 def addToStack( 

530 stack: Deque[TraverseNT], 

531 src_item: "Traversable", 

532 branch_first: bool, 

533 depth: int, 

534 ) -> None: 

535 lst = self._get_intermediate_items(item) 

536 if not lst: # Empty list 

537 return 

538 if branch_first: 

539 stack.extendleft(TraverseNT(depth, i, src_item) for i in lst) 

540 else: 

541 reviter = (TraverseNT(depth, lst[i], src_item) for i in range(len(lst) - 1, -1, -1)) 

542 stack.extend(reviter) 

543 

544 # END addToStack local method 

545 

546 while stack: 

547 d, item, src = stack.pop() # Depth of item, item, item_source 

548 

549 if visit_once and item in visited: 

550 continue 

551 

552 if visit_once: 

553 visited.add(item) 

554 

555 rval: Union[TraversedTup, "Traversable", "Blob"] 

556 if as_edge: 

557 # If as_edge return (src, item) unless rrc is None 

558 # (e.g. for first item). 

559 rval = (src, item) 

560 else: 

561 rval = item 

562 

563 if prune(rval, d): 

564 continue 

565 

566 skipStartItem = ignore_self and (item is self) 

567 if not skipStartItem and predicate(rval, d): 

568 yield rval 

569 

570 # Only continue to next level if this is appropriate! 

571 next_d = d + 1 

572 if depth > -1 and next_d > depth: 

573 continue 

574 

575 addToStack(stack, item, branch_first, next_d) 

576 # END for each item on work stack 

577 

578 

579@runtime_checkable 

580class Serializable(Protocol): 

581 """Defines methods to serialize and deserialize objects from and into a data 

582 stream.""" 

583 

584 __slots__ = () 

585 

586 # @abstractmethod 

587 def _serialize(self, stream: "BytesIO") -> "Serializable": 

588 """Serialize the data of this object into the given data stream. 

589 

590 :note: 

591 A serialized object would :meth:`_deserialize` into the same object. 

592 

593 :param stream: 

594 A file-like object. 

595 

596 :return: 

597 self 

598 """ 

599 raise NotImplementedError("To be implemented in subclass") 

600 

601 # @abstractmethod 

602 def _deserialize(self, stream: "BytesIO") -> "Serializable": 

603 """Deserialize all information regarding this object from the stream. 

604 

605 :param stream: 

606 A file-like object. 

607 

608 :return: 

609 self 

610 """ 

611 raise NotImplementedError("To be implemented in subclass") 

612 

613 

614class TraversableIterableObj(IterableObj, Traversable): 

615 __slots__ = () 

616 

617 TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj] 

618 

619 def list_traverse(self: T_TIobj, *args: Any, **kwargs: Any) -> IterableList[T_TIobj]: 

620 return super()._list_traverse(*args, **kwargs) 

621 

622 @overload 

623 def traverse(self: T_TIobj) -> Iterator[T_TIobj]: ... 

624 

625 @overload 

626 def traverse( 

627 self: T_TIobj, 

628 predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], 

629 prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], 

630 depth: int, 

631 branch_first: bool, 

632 visit_once: bool, 

633 ignore_self: Literal[True], 

634 as_edge: Literal[False], 

635 ) -> Iterator[T_TIobj]: ... 

636 

637 @overload 

638 def traverse( 

639 self: T_TIobj, 

640 predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], 

641 prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], 

642 depth: int, 

643 branch_first: bool, 

644 visit_once: bool, 

645 ignore_self: Literal[False], 

646 as_edge: Literal[True], 

647 ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]: ... 

648 

649 @overload 

650 def traverse( 

651 self: T_TIobj, 

652 predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], 

653 prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], 

654 depth: int, 

655 branch_first: bool, 

656 visit_once: bool, 

657 ignore_self: Literal[True], 

658 as_edge: Literal[True], 

659 ) -> Iterator[Tuple[T_TIobj, T_TIobj]]: ... 

660 

661 def traverse( 

662 self: T_TIobj, 

663 predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool] = lambda i, d: True, 

664 prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool] = lambda i, d: False, 

665 depth: int = -1, 

666 branch_first: bool = True, 

667 visit_once: bool = True, 

668 ignore_self: int = 1, 

669 as_edge: bool = False, 

670 ) -> Union[Iterator[T_TIobj], Iterator[Tuple[T_TIobj, T_TIobj]], Iterator[TIobj_tuple]]: 

671 """For documentation, see :meth:`Traversable._traverse`.""" 

672 

673 ## To typecheck instead of using cast: 

674 # 

675 # import itertools 

676 # from git.types import TypeGuard 

677 # def is_commit_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Tuple['Commit', 'Commit']]]]: 

678 # for x in inp[1]: 

679 # if not isinstance(x, tuple) and len(x) != 2: 

680 # if all(isinstance(inner, Commit) for inner in x): 

681 # continue 

682 # return True 

683 # 

684 # ret = super(Commit, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge) 

685 # ret_tup = itertools.tee(ret, 2) 

686 # assert is_commit_traversed(ret_tup), f"{[type(x) for x in list(ret_tup[0])]}" 

687 # return ret_tup[0] 

688 

689 return cast( 

690 Union[Iterator[T_TIobj], Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]], 

691 super()._traverse( 

692 predicate, # type: ignore[arg-type] 

693 prune, # type: ignore[arg-type] 

694 depth, 

695 branch_first, 

696 visit_once, 

697 ignore_self, 

698 as_edge, 

699 ), 

700 )