Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/refs.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

612 statements  

1# refs.py -- For dealing with git refs 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as public by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22 

23"""Ref handling.""" 

24 

25import os 

26import warnings 

27from collections.abc import Iterator 

28from contextlib import suppress 

29from typing import Any, Optional, Union 

30 

31from .errors import PackedRefsException, RefFormatError 

32from .file import GitFile, ensure_dir_exists 

33from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha 

34from .pack import ObjectContainer 

35 

36Ref = bytes 

37 

38HEADREF = b"HEAD" 

39SYMREF = b"ref: " 

40LOCAL_BRANCH_PREFIX = b"refs/heads/" 

41LOCAL_TAG_PREFIX = b"refs/tags/" 

42LOCAL_REMOTE_PREFIX = b"refs/remotes/" 

43LOCAL_NOTES_PREFIX = b"refs/notes/" 

44BAD_REF_CHARS = set(b"\177 ~^:?*[") 

45PEELED_TAG_SUFFIX = b"^{}" 

46 

47# For backwards compatibility 

48ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX 

49 

50 

51class SymrefLoop(Exception): 

52 """There is a loop between one or more symrefs.""" 

53 

54 def __init__(self, ref, depth) -> None: 

55 self.ref = ref 

56 self.depth = depth 

57 

58 

59def parse_symref_value(contents: bytes) -> bytes: 

60 """Parse a symref value. 

61 

62 Args: 

63 contents: Contents to parse 

64 Returns: Destination 

65 """ 

66 if contents.startswith(SYMREF): 

67 return contents[len(SYMREF) :].rstrip(b"\r\n") 

68 raise ValueError(contents) 

69 

70 

71def check_ref_format(refname: Ref) -> bool: 

72 """Check if a refname is correctly formatted. 

73 

74 Implements all the same rules as git-check-ref-format[1]. 

75 

76 [1] 

77 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html 

78 

79 Args: 

80 refname: The refname to check 

81 Returns: True if refname is valid, False otherwise 

82 """ 

83 # These could be combined into one big expression, but are listed 

84 # separately to parallel [1]. 

85 if b"/." in refname or refname.startswith(b"."): 

86 return False 

87 if b"/" not in refname: 

88 return False 

89 if b".." in refname: 

90 return False 

91 for i, c in enumerate(refname): 

92 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS: 

93 return False 

94 if refname[-1] in b"/.": 

95 return False 

96 if refname.endswith(b".lock"): 

97 return False 

98 if b"@{" in refname: 

99 return False 

100 if b"\\" in refname: 

101 return False 

102 return True 

103 

104 

105class RefsContainer: 

106 """A container for refs.""" 

107 

108 def __init__(self, logger=None) -> None: 

109 self._logger = logger 

110 

111 def _log( 

112 self, 

113 ref, 

114 old_sha, 

115 new_sha, 

116 committer=None, 

117 timestamp=None, 

118 timezone=None, 

119 message=None, 

120 ) -> None: 

121 if self._logger is None: 

122 return 

123 if message is None: 

124 return 

125 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message) 

126 

127 def set_symbolic_ref( 

128 self, 

129 name, 

130 other, 

131 committer=None, 

132 timestamp=None, 

133 timezone=None, 

134 message=None, 

135 ) -> None: 

136 """Make a ref point at another ref. 

137 

138 Args: 

139 name: Name of the ref to set 

140 other: Name of the ref to point at 

141 message: Optional message 

142 """ 

143 raise NotImplementedError(self.set_symbolic_ref) 

144 

145 def get_packed_refs(self) -> dict[Ref, ObjectID]: 

146 """Get contents of the packed-refs file. 

147 

148 Returns: Dictionary mapping ref names to SHA1s 

149 

150 Note: Will return an empty dictionary when no packed-refs file is 

151 present. 

152 """ 

153 raise NotImplementedError(self.get_packed_refs) 

154 

155 def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None: 

156 """Add the given refs as packed refs. 

157 

158 Args: 

159 new_refs: A mapping of ref names to targets; if a target is None that 

160 means remove the ref 

161 """ 

162 raise NotImplementedError(self.add_packed_refs) 

163 

164 def get_peeled(self, name) -> Optional[ObjectID]: 

165 """Return the cached peeled value of a ref, if available. 

166 

167 Args: 

168 name: Name of the ref to peel 

169 Returns: The peeled value of the ref. If the ref is known not point to 

170 a tag, this will be the SHA the ref refers to. If the ref may point 

171 to a tag, but no cached information is available, None is returned. 

172 """ 

173 return None 

174 

175 def import_refs( 

176 self, 

177 base: Ref, 

178 other: dict[Ref, ObjectID], 

179 committer: Optional[bytes] = None, 

180 timestamp: Optional[bytes] = None, 

181 timezone: Optional[bytes] = None, 

182 message: Optional[bytes] = None, 

183 prune: bool = False, 

184 ) -> None: 

185 if prune: 

186 to_delete = set(self.subkeys(base)) 

187 else: 

188 to_delete = set() 

189 for name, value in other.items(): 

190 if value is None: 

191 to_delete.add(name) 

192 else: 

193 self.set_if_equals( 

194 b"/".join((base, name)), None, value, message=message 

195 ) 

196 if to_delete: 

197 try: 

198 to_delete.remove(name) 

199 except KeyError: 

200 pass 

201 for ref in to_delete: 

202 self.remove_if_equals(b"/".join((base, ref)), None, message=message) 

203 

204 def allkeys(self) -> Iterator[Ref]: 

205 """All refs present in this container.""" 

206 raise NotImplementedError(self.allkeys) 

207 

208 def __iter__(self): 

209 return iter(self.allkeys()) 

210 

211 def keys(self, base=None): 

212 """Refs present in this container. 

213 

214 Args: 

215 base: An optional base to return refs under. 

216 Returns: An unsorted set of valid refs in this container, including 

217 packed refs. 

218 """ 

219 if base is not None: 

220 return self.subkeys(base) 

221 else: 

222 return self.allkeys() 

223 

224 def subkeys(self, base): 

225 """Refs present in this container under a base. 

226 

227 Args: 

228 base: The base to return refs under. 

229 Returns: A set of valid refs in this container under the base; the base 

230 prefix is stripped from the ref names returned. 

231 """ 

232 keys = set() 

233 base_len = len(base) + 1 

234 for refname in self.allkeys(): 

235 if refname.startswith(base): 

236 keys.add(refname[base_len:]) 

237 return keys 

238 

239 def as_dict(self, base=None) -> dict[Ref, ObjectID]: 

240 """Return the contents of this container as a dictionary.""" 

241 ret = {} 

242 keys = self.keys(base) 

243 if base is None: 

244 base = b"" 

245 else: 

246 base = base.rstrip(b"/") 

247 for key in keys: 

248 try: 

249 ret[key] = self[(base + b"/" + key).strip(b"/")] 

250 except (SymrefLoop, KeyError): 

251 continue # Unable to resolve 

252 

253 return ret 

254 

255 def _check_refname(self, name) -> None: 

256 """Ensure a refname is valid and lives in refs or is HEAD. 

257 

258 HEAD is not a valid refname according to git-check-ref-format, but this 

259 class needs to be able to touch HEAD. Also, check_ref_format expects 

260 refnames without the leading 'refs/', but this class requires that 

261 so it cannot touch anything outside the refs dir (or HEAD). 

262 

263 Args: 

264 name: The name of the reference. 

265 

266 Raises: 

267 KeyError: if a refname is not HEAD or is otherwise not valid. 

268 """ 

269 if name in (HEADREF, b"refs/stash"): 

270 return 

271 if not name.startswith(b"refs/") or not check_ref_format(name[5:]): 

272 raise RefFormatError(name) 

273 

274 def read_ref(self, refname): 

275 """Read a reference without following any references. 

276 

277 Args: 

278 refname: The name of the reference 

279 Returns: The contents of the ref file, or None if it does 

280 not exist. 

281 """ 

282 contents = self.read_loose_ref(refname) 

283 if not contents: 

284 contents = self.get_packed_refs().get(refname, None) 

285 return contents 

286 

287 def read_loose_ref(self, name) -> bytes: 

288 """Read a loose reference and return its contents. 

289 

290 Args: 

291 name: the refname to read 

292 Returns: The contents of the ref file, or None if it does 

293 not exist. 

294 """ 

295 raise NotImplementedError(self.read_loose_ref) 

296 

297 def follow(self, name) -> tuple[list[bytes], bytes]: 

298 """Follow a reference name. 

299 

300 Returns: a tuple of (refnames, sha), wheres refnames are the names of 

301 references in the chain 

302 """ 

303 contents = SYMREF + name 

304 depth = 0 

305 refnames = [] 

306 while contents.startswith(SYMREF): 

307 refname = contents[len(SYMREF) :] 

308 refnames.append(refname) 

309 contents = self.read_ref(refname) 

310 if not contents: 

311 break 

312 depth += 1 

313 if depth > 5: 

314 raise SymrefLoop(name, depth) 

315 return refnames, contents 

316 

317 def __contains__(self, refname) -> bool: 

318 if self.read_ref(refname): 

319 return True 

320 return False 

321 

322 def __getitem__(self, name) -> ObjectID: 

323 """Get the SHA1 for a reference name. 

324 

325 This method follows all symbolic references. 

326 """ 

327 _, sha = self.follow(name) 

328 if sha is None: 

329 raise KeyError(name) 

330 return sha 

331 

332 def set_if_equals( 

333 self, 

334 name, 

335 old_ref, 

336 new_ref, 

337 committer=None, 

338 timestamp=None, 

339 timezone=None, 

340 message=None, 

341 ) -> bool: 

342 """Set a refname to new_ref only if it currently equals old_ref. 

343 

344 This method follows all symbolic references if applicable for the 

345 subclass, and can be used to perform an atomic compare-and-swap 

346 operation. 

347 

348 Args: 

349 name: The refname to set. 

350 old_ref: The old sha the refname must refer to, or None to set 

351 unconditionally. 

352 new_ref: The new sha the refname will refer to. 

353 message: Message for reflog 

354 Returns: True if the set was successful, False otherwise. 

355 """ 

356 raise NotImplementedError(self.set_if_equals) 

357 

358 def add_if_new( 

359 self, name, ref, committer=None, timestamp=None, timezone=None, message=None 

360 ) -> bool: 

361 """Add a new reference only if it does not already exist. 

362 

363 Args: 

364 name: Ref name 

365 ref: Ref value 

366 """ 

367 raise NotImplementedError(self.add_if_new) 

368 

369 def __setitem__(self, name, ref) -> None: 

370 """Set a reference name to point to the given SHA1. 

371 

372 This method follows all symbolic references if applicable for the 

373 subclass. 

374 

375 Note: This method unconditionally overwrites the contents of a 

376 reference. To update atomically only if the reference has not 

377 changed, use set_if_equals(). 

378 

379 Args: 

380 name: The refname to set. 

381 ref: The new sha the refname will refer to. 

382 """ 

383 if not (valid_hexsha(ref) or ref.startswith(SYMREF)): 

384 raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref") 

385 self.set_if_equals(name, None, ref) 

386 

387 def remove_if_equals( 

388 self, 

389 name, 

390 old_ref, 

391 committer=None, 

392 timestamp=None, 

393 timezone=None, 

394 message=None, 

395 ) -> bool: 

396 """Remove a refname only if it currently equals old_ref. 

397 

398 This method does not follow symbolic references, even if applicable for 

399 the subclass. It can be used to perform an atomic compare-and-delete 

400 operation. 

401 

402 Args: 

403 name: The refname to delete. 

404 old_ref: The old sha the refname must refer to, or None to 

405 delete unconditionally. 

406 message: Message for reflog 

407 Returns: True if the delete was successful, False otherwise. 

408 """ 

409 raise NotImplementedError(self.remove_if_equals) 

410 

411 def __delitem__(self, name) -> None: 

412 """Remove a refname. 

413 

414 This method does not follow symbolic references, even if applicable for 

415 the subclass. 

416 

417 Note: This method unconditionally deletes the contents of a reference. 

418 To delete atomically only if the reference has not changed, use 

419 remove_if_equals(). 

420 

421 Args: 

422 name: The refname to delete. 

423 """ 

424 self.remove_if_equals(name, None) 

425 

426 def get_symrefs(self): 

427 """Get a dict with all symrefs in this container. 

428 

429 Returns: Dictionary mapping source ref to target ref 

430 """ 

431 ret = {} 

432 for src in self.allkeys(): 

433 try: 

434 dst = parse_symref_value(self.read_ref(src)) 

435 except ValueError: 

436 pass 

437 else: 

438 ret[src] = dst 

439 return ret 

440 

441 def pack_refs(self, all: bool = False) -> None: 

442 """Pack loose refs into packed-refs file. 

443 

444 Args: 

445 all: If True, pack all refs. If False, only pack tags. 

446 """ 

447 raise NotImplementedError(self.pack_refs) 

448 

449 

450class DictRefsContainer(RefsContainer): 

451 """RefsContainer backed by a simple dict. 

452 

453 This container does not support symbolic or packed references and is not 

454 threadsafe. 

455 """ 

456 

457 def __init__(self, refs, logger=None) -> None: 

458 super().__init__(logger=logger) 

459 self._refs = refs 

460 self._peeled: dict[bytes, ObjectID] = {} 

461 self._watchers: set[Any] = set() 

462 

463 def allkeys(self): 

464 return self._refs.keys() 

465 

466 def read_loose_ref(self, name): 

467 return self._refs.get(name, None) 

468 

469 def get_packed_refs(self): 

470 return {} 

471 

472 def _notify(self, ref, newsha) -> None: 

473 for watcher in self._watchers: 

474 watcher._notify((ref, newsha)) 

475 

476 def set_symbolic_ref( 

477 self, 

478 name: Ref, 

479 other: Ref, 

480 committer=None, 

481 timestamp=None, 

482 timezone=None, 

483 message=None, 

484 ) -> None: 

485 old = self.follow(name)[-1] 

486 new = SYMREF + other 

487 self._refs[name] = new 

488 self._notify(name, new) 

489 self._log( 

490 name, 

491 old, 

492 new, 

493 committer=committer, 

494 timestamp=timestamp, 

495 timezone=timezone, 

496 message=message, 

497 ) 

498 

499 def set_if_equals( 

500 self, 

501 name, 

502 old_ref, 

503 new_ref, 

504 committer=None, 

505 timestamp=None, 

506 timezone=None, 

507 message=None, 

508 ) -> bool: 

509 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

510 return False 

511 # Only update the specific ref requested, not the whole chain 

512 self._check_refname(name) 

513 old = self._refs.get(name) 

514 self._refs[name] = new_ref 

515 self._notify(name, new_ref) 

516 self._log( 

517 name, 

518 old, 

519 new_ref, 

520 committer=committer, 

521 timestamp=timestamp, 

522 timezone=timezone, 

523 message=message, 

524 ) 

525 return True 

526 

527 def add_if_new( 

528 self, 

529 name: Ref, 

530 ref: ObjectID, 

531 committer=None, 

532 timestamp=None, 

533 timezone=None, 

534 message: Optional[bytes] = None, 

535 ) -> bool: 

536 if name in self._refs: 

537 return False 

538 self._refs[name] = ref 

539 self._notify(name, ref) 

540 self._log( 

541 name, 

542 None, 

543 ref, 

544 committer=committer, 

545 timestamp=timestamp, 

546 timezone=timezone, 

547 message=message, 

548 ) 

549 return True 

550 

551 def remove_if_equals( 

552 self, 

553 name, 

554 old_ref, 

555 committer=None, 

556 timestamp=None, 

557 timezone=None, 

558 message=None, 

559 ) -> bool: 

560 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

561 return False 

562 try: 

563 old = self._refs.pop(name) 

564 except KeyError: 

565 pass 

566 else: 

567 self._notify(name, None) 

568 self._log( 

569 name, 

570 old, 

571 None, 

572 committer=committer, 

573 timestamp=timestamp, 

574 timezone=timezone, 

575 message=message, 

576 ) 

577 return True 

578 

579 def get_peeled(self, name): 

580 return self._peeled.get(name) 

581 

582 def _update(self, refs) -> None: 

583 """Update multiple refs; intended only for testing.""" 

584 # TODO(dborowitz): replace this with a public function that uses 

585 # set_if_equal. 

586 for ref, sha in refs.items(): 

587 self.set_if_equals(ref, None, sha) 

588 

589 def _update_peeled(self, peeled) -> None: 

590 """Update cached peeled refs; intended only for testing.""" 

591 self._peeled.update(peeled) 

592 

593 

594class InfoRefsContainer(RefsContainer): 

595 """Refs container that reads refs from a info/refs file.""" 

596 

597 def __init__(self, f) -> None: 

598 self._refs = {} 

599 self._peeled = {} 

600 refs = read_info_refs(f) 

601 (self._refs, self._peeled) = split_peeled_refs(refs) 

602 

603 def allkeys(self): 

604 return self._refs.keys() 

605 

606 def read_loose_ref(self, name): 

607 return self._refs.get(name, None) 

608 

609 def get_packed_refs(self): 

610 return {} 

611 

612 def get_peeled(self, name): 

613 try: 

614 return self._peeled[name] 

615 except KeyError: 

616 return self._refs[name] 

617 

618 

619class DiskRefsContainer(RefsContainer): 

620 """Refs container that reads refs from disk.""" 

621 

622 def __init__( 

623 self, 

624 path: Union[str, bytes, os.PathLike], 

625 worktree_path: Optional[Union[str, bytes, os.PathLike]] = None, 

626 logger=None, 

627 ) -> None: 

628 super().__init__(logger=logger) 

629 # Convert path-like objects to strings, then to bytes for Git compatibility 

630 self.path = os.fsencode(os.fspath(path)) 

631 if worktree_path is None: 

632 self.worktree_path = self.path 

633 else: 

634 self.worktree_path = os.fsencode(os.fspath(worktree_path)) 

635 self._packed_refs = None 

636 self._peeled_refs = None 

637 

638 def __repr__(self) -> str: 

639 return f"{self.__class__.__name__}({self.path!r})" 

640 

641 def subkeys(self, base): 

642 subkeys = set() 

643 path = self.refpath(base) 

644 for root, unused_dirs, files in os.walk(path): 

645 dir = root[len(path) :] 

646 if os.path.sep != "/": 

647 dir = dir.replace(os.fsencode(os.path.sep), b"/") 

648 dir = dir.strip(b"/") 

649 for filename in files: 

650 refname = b"/".join(([dir] if dir else []) + [filename]) 

651 # check_ref_format requires at least one /, so we prepend the 

652 # base before calling it. 

653 if check_ref_format(base + b"/" + refname): 

654 subkeys.add(refname) 

655 for key in self.get_packed_refs(): 

656 if key.startswith(base): 

657 subkeys.add(key[len(base) :].strip(b"/")) 

658 return subkeys 

659 

660 def allkeys(self): 

661 allkeys = set() 

662 if os.path.exists(self.refpath(HEADREF)): 

663 allkeys.add(HEADREF) 

664 path = self.refpath(b"") 

665 refspath = self.refpath(b"refs") 

666 for root, unused_dirs, files in os.walk(refspath): 

667 dir = root[len(path) :] 

668 if os.path.sep != "/": 

669 dir = dir.replace(os.fsencode(os.path.sep), b"/") 

670 for filename in files: 

671 refname = b"/".join([dir, filename]) 

672 if check_ref_format(refname): 

673 allkeys.add(refname) 

674 allkeys.update(self.get_packed_refs()) 

675 return allkeys 

676 

677 def refpath(self, name): 

678 """Return the disk path of a ref.""" 

679 if os.path.sep != "/": 

680 name = name.replace(b"/", os.fsencode(os.path.sep)) 

681 # TODO: as the 'HEAD' reference is working tree specific, it 

682 # should actually not be a part of RefsContainer 

683 if name == HEADREF: 

684 return os.path.join(self.worktree_path, name) 

685 else: 

686 return os.path.join(self.path, name) 

687 

688 def get_packed_refs(self): 

689 """Get contents of the packed-refs file. 

690 

691 Returns: Dictionary mapping ref names to SHA1s 

692 

693 Note: Will return an empty dictionary when no packed-refs file is 

694 present. 

695 """ 

696 # TODO: invalidate the cache on repacking 

697 if self._packed_refs is None: 

698 # set both to empty because we want _peeled_refs to be 

699 # None if and only if _packed_refs is also None. 

700 self._packed_refs = {} 

701 self._peeled_refs = {} 

702 path = os.path.join(self.path, b"packed-refs") 

703 try: 

704 f = GitFile(path, "rb") 

705 except FileNotFoundError: 

706 return {} 

707 with f: 

708 first_line = next(iter(f)).rstrip() 

709 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line: 

710 for sha, name, peeled in read_packed_refs_with_peeled(f): 

711 self._packed_refs[name] = sha 

712 if peeled: 

713 self._peeled_refs[name] = peeled 

714 else: 

715 f.seek(0) 

716 for sha, name in read_packed_refs(f): 

717 self._packed_refs[name] = sha 

718 return self._packed_refs 

719 

720 def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None: 

721 """Add the given refs as packed refs. 

722 

723 Args: 

724 new_refs: A mapping of ref names to targets; if a target is None that 

725 means remove the ref 

726 """ 

727 if not new_refs: 

728 return 

729 

730 path = os.path.join(self.path, b"packed-refs") 

731 

732 with GitFile(path, "wb") as f: 

733 # reread cached refs from disk, while holding the lock 

734 packed_refs = self.get_packed_refs().copy() 

735 

736 for ref, target in new_refs.items(): 

737 # sanity check 

738 if ref == HEADREF: 

739 raise ValueError("cannot pack HEAD") 

740 

741 # remove any loose refs pointing to this one -- please 

742 # note that this bypasses remove_if_equals as we don't 

743 # want to affect packed refs in here 

744 with suppress(OSError): 

745 os.remove(self.refpath(ref)) 

746 

747 if target is not None: 

748 packed_refs[ref] = target 

749 else: 

750 packed_refs.pop(ref, None) 

751 

752 write_packed_refs(f, packed_refs, self._peeled_refs) 

753 

754 self._packed_refs = packed_refs 

755 

756 def get_peeled(self, name): 

757 """Return the cached peeled value of a ref, if available. 

758 

759 Args: 

760 name: Name of the ref to peel 

761 Returns: The peeled value of the ref. If the ref is known not point to 

762 a tag, this will be the SHA the ref refers to. If the ref may point 

763 to a tag, but no cached information is available, None is returned. 

764 """ 

765 self.get_packed_refs() 

766 if self._peeled_refs is None or name not in self._packed_refs: 

767 # No cache: no peeled refs were read, or this ref is loose 

768 return None 

769 if name in self._peeled_refs: 

770 return self._peeled_refs[name] 

771 else: 

772 # Known not peelable 

773 return self[name] 

774 

775 def read_loose_ref(self, name): 

776 """Read a reference file and return its contents. 

777 

778 If the reference file a symbolic reference, only read the first line of 

779 the file. Otherwise, only read the first 40 bytes. 

780 

781 Args: 

782 name: the refname to read, relative to refpath 

783 Returns: The contents of the ref file, or None if the file does not 

784 exist. 

785 

786 Raises: 

787 IOError: if any other error occurs 

788 """ 

789 filename = self.refpath(name) 

790 try: 

791 with GitFile(filename, "rb") as f: 

792 header = f.read(len(SYMREF)) 

793 if header == SYMREF: 

794 # Read only the first line 

795 return header + next(iter(f)).rstrip(b"\r\n") 

796 else: 

797 # Read only the first 40 bytes 

798 return header + f.read(40 - len(SYMREF)) 

799 except (OSError, UnicodeError): 

800 # don't assume anything specific about the error; in 

801 # particular, invalid or forbidden paths can raise weird 

802 # errors depending on the specific operating system 

803 return None 

804 

805 def _remove_packed_ref(self, name) -> None: 

806 if self._packed_refs is None: 

807 return 

808 filename = os.path.join(self.path, b"packed-refs") 

809 # reread cached refs from disk, while holding the lock 

810 f = GitFile(filename, "wb") 

811 try: 

812 self._packed_refs = None 

813 self.get_packed_refs() 

814 

815 if name not in self._packed_refs: 

816 return 

817 

818 del self._packed_refs[name] 

819 with suppress(KeyError): 

820 del self._peeled_refs[name] 

821 write_packed_refs(f, self._packed_refs, self._peeled_refs) 

822 f.close() 

823 finally: 

824 f.abort() 

825 

826 def set_symbolic_ref( 

827 self, 

828 name, 

829 other, 

830 committer=None, 

831 timestamp=None, 

832 timezone=None, 

833 message=None, 

834 ) -> None: 

835 """Make a ref point at another ref. 

836 

837 Args: 

838 name: Name of the ref to set 

839 other: Name of the ref to point at 

840 message: Optional message to describe the change 

841 """ 

842 self._check_refname(name) 

843 self._check_refname(other) 

844 filename = self.refpath(name) 

845 f = GitFile(filename, "wb") 

846 try: 

847 f.write(SYMREF + other + b"\n") 

848 sha = self.follow(name)[-1] 

849 self._log( 

850 name, 

851 sha, 

852 sha, 

853 committer=committer, 

854 timestamp=timestamp, 

855 timezone=timezone, 

856 message=message, 

857 ) 

858 except BaseException: 

859 f.abort() 

860 raise 

861 else: 

862 f.close() 

863 

864 def set_if_equals( 

865 self, 

866 name, 

867 old_ref, 

868 new_ref, 

869 committer=None, 

870 timestamp=None, 

871 timezone=None, 

872 message=None, 

873 ) -> bool: 

874 """Set a refname to new_ref only if it currently equals old_ref. 

875 

876 This method follows all symbolic references, and can be used to perform 

877 an atomic compare-and-swap operation. 

878 

879 Args: 

880 name: The refname to set. 

881 old_ref: The old sha the refname must refer to, or None to set 

882 unconditionally. 

883 new_ref: The new sha the refname will refer to. 

884 message: Set message for reflog 

885 Returns: True if the set was successful, False otherwise. 

886 """ 

887 self._check_refname(name) 

888 try: 

889 realnames, _ = self.follow(name) 

890 realname = realnames[-1] 

891 except (KeyError, IndexError, SymrefLoop): 

892 realname = name 

893 filename = self.refpath(realname) 

894 

895 # make sure none of the ancestor folders is in packed refs 

896 probe_ref = os.path.dirname(realname) 

897 packed_refs = self.get_packed_refs() 

898 while probe_ref: 

899 if packed_refs.get(probe_ref, None) is not None: 

900 raise NotADirectoryError(filename) 

901 probe_ref = os.path.dirname(probe_ref) 

902 

903 ensure_dir_exists(os.path.dirname(filename)) 

904 with GitFile(filename, "wb") as f: 

905 if old_ref is not None: 

906 try: 

907 # read again while holding the lock 

908 orig_ref = self.read_loose_ref(realname) 

909 if orig_ref is None: 

910 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA) 

911 if orig_ref != old_ref: 

912 f.abort() 

913 return False 

914 except OSError: 

915 f.abort() 

916 raise 

917 try: 

918 f.write(new_ref + b"\n") 

919 except OSError: 

920 f.abort() 

921 raise 

922 self._log( 

923 realname, 

924 old_ref, 

925 new_ref, 

926 committer=committer, 

927 timestamp=timestamp, 

928 timezone=timezone, 

929 message=message, 

930 ) 

931 return True 

932 

933 def add_if_new( 

934 self, 

935 name: bytes, 

936 ref: bytes, 

937 committer=None, 

938 timestamp=None, 

939 timezone=None, 

940 message: Optional[bytes] = None, 

941 ) -> bool: 

942 """Add a new reference only if it does not already exist. 

943 

944 This method follows symrefs, and only ensures that the last ref in the 

945 chain does not exist. 

946 

947 Args: 

948 name: The refname to set. 

949 ref: The new sha the refname will refer to. 

950 message: Optional message for reflog 

951 Returns: True if the add was successful, False otherwise. 

952 """ 

953 try: 

954 realnames, contents = self.follow(name) 

955 if contents is not None: 

956 return False 

957 realname = realnames[-1] 

958 except (KeyError, IndexError): 

959 realname = name 

960 self._check_refname(realname) 

961 filename = self.refpath(realname) 

962 ensure_dir_exists(os.path.dirname(filename)) 

963 with GitFile(filename, "wb") as f: 

964 if os.path.exists(filename) or name in self.get_packed_refs(): 

965 f.abort() 

966 return False 

967 try: 

968 f.write(ref + b"\n") 

969 except OSError: 

970 f.abort() 

971 raise 

972 else: 

973 self._log( 

974 name, 

975 None, 

976 ref, 

977 committer=committer, 

978 timestamp=timestamp, 

979 timezone=timezone, 

980 message=message, 

981 ) 

982 return True 

983 

984 def remove_if_equals( 

985 self, 

986 name, 

987 old_ref, 

988 committer=None, 

989 timestamp=None, 

990 timezone=None, 

991 message=None, 

992 ) -> bool: 

993 """Remove a refname only if it currently equals old_ref. 

994 

995 This method does not follow symbolic references. It can be used to 

996 perform an atomic compare-and-delete operation. 

997 

998 Args: 

999 name: The refname to delete. 

1000 old_ref: The old sha the refname must refer to, or None to 

1001 delete unconditionally. 

1002 message: Optional message 

1003 Returns: True if the delete was successful, False otherwise. 

1004 """ 

1005 self._check_refname(name) 

1006 filename = self.refpath(name) 

1007 ensure_dir_exists(os.path.dirname(filename)) 

1008 f = GitFile(filename, "wb") 

1009 try: 

1010 if old_ref is not None: 

1011 orig_ref = self.read_loose_ref(name) 

1012 if orig_ref is None: 

1013 orig_ref = self.get_packed_refs().get(name, ZERO_SHA) 

1014 if orig_ref != old_ref: 

1015 return False 

1016 

1017 # remove the reference file itself 

1018 try: 

1019 found = os.path.lexists(filename) 

1020 except OSError: 

1021 # may only be packed, or otherwise unstorable 

1022 found = False 

1023 

1024 if found: 

1025 os.remove(filename) 

1026 

1027 self._remove_packed_ref(name) 

1028 self._log( 

1029 name, 

1030 old_ref, 

1031 None, 

1032 committer=committer, 

1033 timestamp=timestamp, 

1034 timezone=timezone, 

1035 message=message, 

1036 ) 

1037 finally: 

1038 # never write, we just wanted the lock 

1039 f.abort() 

1040 

1041 # outside of the lock, clean-up any parent directory that might now 

1042 # be empty. this ensures that re-creating a reference of the same 

1043 # name of what was previously a directory works as expected 

1044 parent = name 

1045 while True: 

1046 try: 

1047 parent, _ = parent.rsplit(b"/", 1) 

1048 except ValueError: 

1049 break 

1050 

1051 if parent == b"refs": 

1052 break 

1053 parent_filename = self.refpath(parent) 

1054 try: 

1055 os.rmdir(parent_filename) 

1056 except OSError: 

1057 # this can be caused by the parent directory being 

1058 # removed by another process, being not empty, etc. 

1059 # in any case, this is non fatal because we already 

1060 # removed the reference, just ignore it 

1061 break 

1062 

1063 return True 

1064 

1065 def pack_refs(self, all: bool = False) -> None: 

1066 """Pack loose refs into packed-refs file. 

1067 

1068 Args: 

1069 all: If True, pack all refs. If False, only pack tags. 

1070 """ 

1071 refs_to_pack: dict[Ref, Optional[ObjectID]] = {} 

1072 for ref in self.allkeys(): 

1073 if ref == HEADREF: 

1074 # Never pack HEAD 

1075 continue 

1076 if all or ref.startswith(LOCAL_TAG_PREFIX): 

1077 try: 

1078 sha = self[ref] 

1079 if sha: 

1080 refs_to_pack[ref] = sha 

1081 except KeyError: 

1082 # Broken ref, skip it 

1083 pass 

1084 

1085 if refs_to_pack: 

1086 self.add_packed_refs(refs_to_pack) 

1087 

1088 

1089def _split_ref_line(line): 

1090 """Split a single ref line into a tuple of SHA1 and name.""" 

1091 fields = line.rstrip(b"\n\r").split(b" ") 

1092 if len(fields) != 2: 

1093 raise PackedRefsException(f"invalid ref line {line!r}") 

1094 sha, name = fields 

1095 if not valid_hexsha(sha): 

1096 raise PackedRefsException(f"Invalid hex sha {sha!r}") 

1097 if not check_ref_format(name): 

1098 raise PackedRefsException(f"invalid ref name {name!r}") 

1099 return (sha, name) 

1100 

1101 

1102def read_packed_refs(f): 

1103 """Read a packed refs file. 

1104 

1105 Args: 

1106 f: file-like object to read from 

1107 Returns: Iterator over tuples with SHA1s and ref names. 

1108 """ 

1109 for line in f: 

1110 if line.startswith(b"#"): 

1111 # Comment 

1112 continue 

1113 if line.startswith(b"^"): 

1114 raise PackedRefsException("found peeled ref in packed-refs without peeled") 

1115 yield _split_ref_line(line) 

1116 

1117 

1118def read_packed_refs_with_peeled(f): 

1119 """Read a packed refs file including peeled refs. 

1120 

1121 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples 

1122 with ref names, SHA1s, and peeled SHA1s (or None). 

1123 

1124 Args: 

1125 f: file-like object to read from, seek'ed to the second line 

1126 """ 

1127 last = None 

1128 for line in f: 

1129 if line[0] == b"#": 

1130 continue 

1131 line = line.rstrip(b"\r\n") 

1132 if line.startswith(b"^"): 

1133 if not last: 

1134 raise PackedRefsException("unexpected peeled ref line") 

1135 if not valid_hexsha(line[1:]): 

1136 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}") 

1137 sha, name = _split_ref_line(last) 

1138 last = None 

1139 yield (sha, name, line[1:]) 

1140 else: 

1141 if last: 

1142 sha, name = _split_ref_line(last) 

1143 yield (sha, name, None) 

1144 last = line 

1145 if last: 

1146 sha, name = _split_ref_line(last) 

1147 yield (sha, name, None) 

1148 

1149 

1150def write_packed_refs(f, packed_refs, peeled_refs=None) -> None: 

1151 """Write a packed refs file. 

1152 

1153 Args: 

1154 f: empty file-like object to write to 

1155 packed_refs: dict of refname to sha of packed refs to write 

1156 peeled_refs: dict of refname to peeled value of sha 

1157 """ 

1158 if peeled_refs is None: 

1159 peeled_refs = {} 

1160 else: 

1161 f.write(b"# pack-refs with: peeled\n") 

1162 for refname in sorted(packed_refs.keys()): 

1163 f.write(git_line(packed_refs[refname], refname)) 

1164 if refname in peeled_refs: 

1165 f.write(b"^" + peeled_refs[refname] + b"\n") 

1166 

1167 

1168def read_info_refs(f): 

1169 ret = {} 

1170 for line in f.readlines(): 

1171 (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1) 

1172 ret[name] = sha 

1173 return ret 

1174 

1175 

1176def write_info_refs(refs, store: ObjectContainer): 

1177 """Generate info refs.""" 

1178 # TODO: Avoid recursive import :( 

1179 from .object_store import peel_sha 

1180 

1181 for name, sha in sorted(refs.items()): 

1182 # get_refs() includes HEAD as a special case, but we don't want to 

1183 # advertise it 

1184 if name == HEADREF: 

1185 continue 

1186 try: 

1187 o = store[sha] 

1188 except KeyError: 

1189 continue 

1190 unpeeled, peeled = peel_sha(store, sha) 

1191 yield o.id + b"\t" + name + b"\n" 

1192 if o.id != peeled.id: 

1193 yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n" 

1194 

1195 

1196def is_local_branch(x): 

1197 return x.startswith(LOCAL_BRANCH_PREFIX) 

1198 

1199 

1200def strip_peeled_refs(refs): 

1201 """Remove all peeled refs.""" 

1202 return { 

1203 ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX) 

1204 } 

1205 

1206 

1207def split_peeled_refs(refs): 

1208 """Split peeled refs from regular refs.""" 

1209 peeled = {} 

1210 regular = {} 

1211 for ref, sha in refs.items(): 

1212 if ref.endswith(PEELED_TAG_SUFFIX): 

1213 peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha 

1214 else: 

1215 regular[ref] = sha 

1216 return regular, peeled 

1217 

1218 

1219def _set_origin_head(refs, origin, origin_head) -> None: 

1220 # set refs/remotes/origin/HEAD 

1221 origin_base = b"refs/remotes/" + origin + b"/" 

1222 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1223 origin_ref = origin_base + HEADREF 

1224 target_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :] 

1225 if target_ref in refs: 

1226 refs.set_symbolic_ref(origin_ref, target_ref) 

1227 

1228 

1229def _set_default_branch( 

1230 refs: RefsContainer, 

1231 origin: bytes, 

1232 origin_head: Optional[bytes], 

1233 branch: bytes, 

1234 ref_message: Optional[bytes], 

1235) -> bytes: 

1236 """Set the default branch.""" 

1237 origin_base = b"refs/remotes/" + origin + b"/" 

1238 if branch: 

1239 origin_ref = origin_base + branch 

1240 if origin_ref in refs: 

1241 local_ref = LOCAL_BRANCH_PREFIX + branch 

1242 refs.add_if_new(local_ref, refs[origin_ref], ref_message) 

1243 head_ref = local_ref 

1244 elif LOCAL_TAG_PREFIX + branch in refs: 

1245 head_ref = LOCAL_TAG_PREFIX + branch 

1246 else: 

1247 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag") 

1248 elif origin_head: 

1249 head_ref = origin_head 

1250 if origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1251 origin_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :] 

1252 else: 

1253 origin_ref = origin_head 

1254 try: 

1255 refs.add_if_new(head_ref, refs[origin_ref], ref_message) 

1256 except KeyError: 

1257 pass 

1258 else: 

1259 raise ValueError("neither origin_head nor branch are provided") 

1260 return head_ref 

1261 

1262 

1263def _set_head(refs, head_ref, ref_message): 

1264 if head_ref.startswith(LOCAL_TAG_PREFIX): 

1265 # detach HEAD at specified tag 

1266 head = refs[head_ref] 

1267 if isinstance(head, Tag): 

1268 _cls, obj = head.object 

1269 head = obj.get_object(obj).id 

1270 del refs[HEADREF] 

1271 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1272 else: 

1273 # set HEAD to specific branch 

1274 try: 

1275 head = refs[head_ref] 

1276 refs.set_symbolic_ref(HEADREF, head_ref) 

1277 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1278 except KeyError: 

1279 head = None 

1280 return head 

1281 

1282 

1283def _import_remote_refs( 

1284 refs_container: RefsContainer, 

1285 remote_name: str, 

1286 refs: dict[str, str], 

1287 message: Optional[bytes] = None, 

1288 prune: bool = False, 

1289 prune_tags: bool = False, 

1290) -> None: 

1291 stripped_refs = strip_peeled_refs(refs) 

1292 branches = { 

1293 n[len(LOCAL_BRANCH_PREFIX) :]: v 

1294 for (n, v) in stripped_refs.items() 

1295 if n.startswith(LOCAL_BRANCH_PREFIX) 

1296 } 

1297 refs_container.import_refs( 

1298 b"refs/remotes/" + remote_name.encode(), 

1299 branches, 

1300 message=message, 

1301 prune=prune, 

1302 ) 

1303 tags = { 

1304 n[len(LOCAL_TAG_PREFIX) :]: v 

1305 for (n, v) in stripped_refs.items() 

1306 if n.startswith(LOCAL_TAG_PREFIX) and not n.endswith(PEELED_TAG_SUFFIX) 

1307 } 

1308 refs_container.import_refs( 

1309 LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags 

1310 ) 

1311 

1312 

1313def serialize_refs(store, refs): 

1314 # TODO: Avoid recursive import :( 

1315 from .object_store import peel_sha 

1316 

1317 ret = {} 

1318 for ref, sha in refs.items(): 

1319 try: 

1320 unpeeled, peeled = peel_sha(store, sha) 

1321 except KeyError: 

1322 warnings.warn( 

1323 "ref {} points at non-present sha {}".format( 

1324 ref.decode("utf-8", "replace"), sha.decode("ascii") 

1325 ), 

1326 UserWarning, 

1327 ) 

1328 continue 

1329 else: 

1330 if isinstance(unpeeled, Tag): 

1331 ret[ref + PEELED_TAG_SUFFIX] = peeled.id 

1332 ret[ref] = unpeeled.id 

1333 return ret