Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dulwich/refs.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

596 statements  

1# refs.py -- For dealing with git refs 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

5# General Public License as public by the Free Software Foundation; version 2.0 

6# or (at your option) any later version. You can redistribute it and/or 

7# modify it under the terms of either of these two licenses. 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15# You should have received a copy of the licenses; if not, see 

16# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

17# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

18# License, Version 2.0. 

19# 

20 

21 

22"""Ref handling.""" 

23 

24import os 

25import warnings 

26from contextlib import suppress 

27from typing import Any, Dict, Optional, Set 

28 

29from .errors import PackedRefsException, RefFormatError 

30from .file import GitFile, ensure_dir_exists 

31from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha 

32from .pack import ObjectContainer 

33 

34Ref = bytes 

35 

36HEADREF = b"HEAD" 

37SYMREF = b"ref: " 

38LOCAL_BRANCH_PREFIX = b"refs/heads/" 

39LOCAL_TAG_PREFIX = b"refs/tags/" 

40LOCAL_REMOTE_PREFIX = b"refs/remotes/" 

41BAD_REF_CHARS = set(b"\177 ~^:?*[") 

42PEELED_TAG_SUFFIX = b"^{}" 

43 

44# For backwards compatibility 

45ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX 

46 

47 

48class SymrefLoop(Exception): 

49 """There is a loop between one or more symrefs.""" 

50 

51 def __init__(self, ref, depth) -> None: 

52 self.ref = ref 

53 self.depth = depth 

54 

55 

56def parse_symref_value(contents): 

57 """Parse a symref value. 

58 

59 Args: 

60 contents: Contents to parse 

61 Returns: Destination 

62 """ 

63 if contents.startswith(SYMREF): 

64 return contents[len(SYMREF) :].rstrip(b"\r\n") 

65 raise ValueError(contents) 

66 

67 

68def check_ref_format(refname: Ref): 

69 """Check if a refname is correctly formatted. 

70 

71 Implements all the same rules as git-check-ref-format[1]. 

72 

73 [1] 

74 http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html 

75 

76 Args: 

77 refname: The refname to check 

78 Returns: True if refname is valid, False otherwise 

79 """ 

80 # These could be combined into one big expression, but are listed 

81 # separately to parallel [1]. 

82 if b"/." in refname or refname.startswith(b"."): 

83 return False 

84 if b"/" not in refname: 

85 return False 

86 if b".." in refname: 

87 return False 

88 for i, c in enumerate(refname): 

89 if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS: 

90 return False 

91 if refname[-1] in b"/.": 

92 return False 

93 if refname.endswith(b".lock"): 

94 return False 

95 if b"@{" in refname: 

96 return False 

97 if b"\\" in refname: 

98 return False 

99 return True 

100 

101 

102class RefsContainer: 

103 """A container for refs.""" 

104 

105 def __init__(self, logger=None) -> None: 

106 self._logger = logger 

107 

108 def _log( 

109 self, 

110 ref, 

111 old_sha, 

112 new_sha, 

113 committer=None, 

114 timestamp=None, 

115 timezone=None, 

116 message=None, 

117 ): 

118 if self._logger is None: 

119 return 

120 if message is None: 

121 return 

122 self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message) 

123 

124 def set_symbolic_ref( 

125 self, 

126 name, 

127 other, 

128 committer=None, 

129 timestamp=None, 

130 timezone=None, 

131 message=None, 

132 ): 

133 """Make a ref point at another ref. 

134 

135 Args: 

136 name: Name of the ref to set 

137 other: Name of the ref to point at 

138 message: Optional message 

139 """ 

140 raise NotImplementedError(self.set_symbolic_ref) 

141 

142 def get_packed_refs(self): 

143 """Get contents of the packed-refs file. 

144 

145 Returns: Dictionary mapping ref names to SHA1s 

146 

147 Note: Will return an empty dictionary when no packed-refs file is 

148 present. 

149 """ 

150 raise NotImplementedError(self.get_packed_refs) 

151 

152 def add_packed_refs(self, new_refs: Dict[Ref, Optional[ObjectID]]): 

153 """Add the given refs as packed refs. 

154 

155 Args: 

156 new_refs: A mapping of ref names to targets; if a target is None that 

157 means remove the ref 

158 """ 

159 raise NotImplementedError(self.add_packed_refs) 

160 

161 def get_peeled(self, name): 

162 """Return the cached peeled value of a ref, if available. 

163 

164 Args: 

165 name: Name of the ref to peel 

166 Returns: The peeled value of the ref. If the ref is known not point to 

167 a tag, this will be the SHA the ref refers to. If the ref may point 

168 to a tag, but no cached information is available, None is returned. 

169 """ 

170 return None 

171 

172 def import_refs( 

173 self, 

174 base: Ref, 

175 other: Dict[Ref, ObjectID], 

176 committer: Optional[bytes] = None, 

177 timestamp: Optional[bytes] = None, 

178 timezone: Optional[bytes] = None, 

179 message: Optional[bytes] = None, 

180 prune: bool = False, 

181 ): 

182 if prune: 

183 to_delete = set(self.subkeys(base)) 

184 else: 

185 to_delete = set() 

186 for name, value in other.items(): 

187 if value is None: 

188 to_delete.add(name) 

189 else: 

190 self.set_if_equals( 

191 b"/".join((base, name)), None, value, message=message 

192 ) 

193 if to_delete: 

194 try: 

195 to_delete.remove(name) 

196 except KeyError: 

197 pass 

198 for ref in to_delete: 

199 self.remove_if_equals(b"/".join((base, ref)), None, message=message) 

200 

201 def allkeys(self): 

202 """All refs present in this container.""" 

203 raise NotImplementedError(self.allkeys) 

204 

205 def __iter__(self): 

206 return iter(self.allkeys()) 

207 

208 def keys(self, base=None): 

209 """Refs present in this container. 

210 

211 Args: 

212 base: An optional base to return refs under. 

213 Returns: An unsorted set of valid refs in this container, including 

214 packed refs. 

215 """ 

216 if base is not None: 

217 return self.subkeys(base) 

218 else: 

219 return self.allkeys() 

220 

221 def subkeys(self, base): 

222 """Refs present in this container under a base. 

223 

224 Args: 

225 base: The base to return refs under. 

226 Returns: A set of valid refs in this container under the base; the base 

227 prefix is stripped from the ref names returned. 

228 """ 

229 keys = set() 

230 base_len = len(base) + 1 

231 for refname in self.allkeys(): 

232 if refname.startswith(base): 

233 keys.add(refname[base_len:]) 

234 return keys 

235 

236 def as_dict(self, base=None): 

237 """Return the contents of this container as a dictionary.""" 

238 ret = {} 

239 keys = self.keys(base) 

240 if base is None: 

241 base = b"" 

242 else: 

243 base = base.rstrip(b"/") 

244 for key in keys: 

245 try: 

246 ret[key] = self[(base + b"/" + key).strip(b"/")] 

247 except (SymrefLoop, KeyError): 

248 continue # Unable to resolve 

249 

250 return ret 

251 

252 def _check_refname(self, name): 

253 """Ensure a refname is valid and lives in refs or is HEAD. 

254 

255 HEAD is not a valid refname according to git-check-ref-format, but this 

256 class needs to be able to touch HEAD. Also, check_ref_format expects 

257 refnames without the leading 'refs/', but this class requires that 

258 so it cannot touch anything outside the refs dir (or HEAD). 

259 

260 Args: 

261 name: The name of the reference. 

262 

263 Raises: 

264 KeyError: if a refname is not HEAD or is otherwise not valid. 

265 """ 

266 if name in (HEADREF, b"refs/stash"): 

267 return 

268 if not name.startswith(b"refs/") or not check_ref_format(name[5:]): 

269 raise RefFormatError(name) 

270 

271 def read_ref(self, refname): 

272 """Read a reference without following any references. 

273 

274 Args: 

275 refname: The name of the reference 

276 Returns: The contents of the ref file, or None if it does 

277 not exist. 

278 """ 

279 contents = self.read_loose_ref(refname) 

280 if not contents: 

281 contents = self.get_packed_refs().get(refname, None) 

282 return contents 

283 

284 def read_loose_ref(self, name): 

285 """Read a loose reference and return its contents. 

286 

287 Args: 

288 name: the refname to read 

289 Returns: The contents of the ref file, or None if it does 

290 not exist. 

291 """ 

292 raise NotImplementedError(self.read_loose_ref) 

293 

294 def follow(self, name): 

295 """Follow a reference name. 

296 

297 Returns: a tuple of (refnames, sha), wheres refnames are the names of 

298 references in the chain 

299 """ 

300 contents = SYMREF + name 

301 depth = 0 

302 refnames = [] 

303 while contents.startswith(SYMREF): 

304 refname = contents[len(SYMREF) :] 

305 refnames.append(refname) 

306 contents = self.read_ref(refname) 

307 if not contents: 

308 break 

309 depth += 1 

310 if depth > 5: 

311 raise SymrefLoop(name, depth) 

312 return refnames, contents 

313 

314 def __contains__(self, refname) -> bool: 

315 if self.read_ref(refname): 

316 return True 

317 return False 

318 

319 def __getitem__(self, name): 

320 """Get the SHA1 for a reference name. 

321 

322 This method follows all symbolic references. 

323 """ 

324 _, sha = self.follow(name) 

325 if sha is None: 

326 raise KeyError(name) 

327 return sha 

328 

329 def set_if_equals( 

330 self, 

331 name, 

332 old_ref, 

333 new_ref, 

334 committer=None, 

335 timestamp=None, 

336 timezone=None, 

337 message=None, 

338 ): 

339 """Set a refname to new_ref only if it currently equals old_ref. 

340 

341 This method follows all symbolic references if applicable for the 

342 subclass, and can be used to perform an atomic compare-and-swap 

343 operation. 

344 

345 Args: 

346 name: The refname to set. 

347 old_ref: The old sha the refname must refer to, or None to set 

348 unconditionally. 

349 new_ref: The new sha the refname will refer to. 

350 message: Message for reflog 

351 Returns: True if the set was successful, False otherwise. 

352 """ 

353 raise NotImplementedError(self.set_if_equals) 

354 

355 def add_if_new( 

356 self, name, ref, committer=None, timestamp=None, timezone=None, message=None 

357 ): 

358 """Add a new reference only if it does not already exist. 

359 

360 Args: 

361 name: Ref name 

362 ref: Ref value 

363 """ 

364 raise NotImplementedError(self.add_if_new) 

365 

366 def __setitem__(self, name, ref) -> None: 

367 """Set a reference name to point to the given SHA1. 

368 

369 This method follows all symbolic references if applicable for the 

370 subclass. 

371 

372 Note: This method unconditionally overwrites the contents of a 

373 reference. To update atomically only if the reference has not 

374 changed, use set_if_equals(). 

375 

376 Args: 

377 name: The refname to set. 

378 ref: The new sha the refname will refer to. 

379 """ 

380 self.set_if_equals(name, None, ref) 

381 

382 def remove_if_equals( 

383 self, 

384 name, 

385 old_ref, 

386 committer=None, 

387 timestamp=None, 

388 timezone=None, 

389 message=None, 

390 ): 

391 """Remove a refname only if it currently equals old_ref. 

392 

393 This method does not follow symbolic references, even if applicable for 

394 the subclass. It can be used to perform an atomic compare-and-delete 

395 operation. 

396 

397 Args: 

398 name: The refname to delete. 

399 old_ref: The old sha the refname must refer to, or None to 

400 delete unconditionally. 

401 message: Message for reflog 

402 Returns: True if the delete was successful, False otherwise. 

403 """ 

404 raise NotImplementedError(self.remove_if_equals) 

405 

406 def __delitem__(self, name) -> None: 

407 """Remove a refname. 

408 

409 This method does not follow symbolic references, even if applicable for 

410 the subclass. 

411 

412 Note: This method unconditionally deletes the contents of a reference. 

413 To delete atomically only if the reference has not changed, use 

414 remove_if_equals(). 

415 

416 Args: 

417 name: The refname to delete. 

418 """ 

419 self.remove_if_equals(name, None) 

420 

421 def get_symrefs(self): 

422 """Get a dict with all symrefs in this container. 

423 

424 Returns: Dictionary mapping source ref to target ref 

425 """ 

426 ret = {} 

427 for src in self.allkeys(): 

428 try: 

429 dst = parse_symref_value(self.read_ref(src)) 

430 except ValueError: 

431 pass 

432 else: 

433 ret[src] = dst 

434 return ret 

435 

436 

437class DictRefsContainer(RefsContainer): 

438 """RefsContainer backed by a simple dict. 

439 

440 This container does not support symbolic or packed references and is not 

441 threadsafe. 

442 """ 

443 

444 def __init__(self, refs, logger=None) -> None: 

445 super().__init__(logger=logger) 

446 self._refs = refs 

447 self._peeled: Dict[bytes, ObjectID] = {} 

448 self._watchers: Set[Any] = set() 

449 

450 def allkeys(self): 

451 return self._refs.keys() 

452 

453 def read_loose_ref(self, name): 

454 return self._refs.get(name, None) 

455 

456 def get_packed_refs(self): 

457 return {} 

458 

459 def _notify(self, ref, newsha): 

460 for watcher in self._watchers: 

461 watcher._notify((ref, newsha)) 

462 

463 def set_symbolic_ref( 

464 self, 

465 name: Ref, 

466 other: Ref, 

467 committer=None, 

468 timestamp=None, 

469 timezone=None, 

470 message=None, 

471 ): 

472 old = self.follow(name)[-1] 

473 new = SYMREF + other 

474 self._refs[name] = new 

475 self._notify(name, new) 

476 self._log( 

477 name, 

478 old, 

479 new, 

480 committer=committer, 

481 timestamp=timestamp, 

482 timezone=timezone, 

483 message=message, 

484 ) 

485 

486 def set_if_equals( 

487 self, 

488 name, 

489 old_ref, 

490 new_ref, 

491 committer=None, 

492 timestamp=None, 

493 timezone=None, 

494 message=None, 

495 ): 

496 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

497 return False 

498 realnames, _ = self.follow(name) 

499 for realname in realnames: 

500 self._check_refname(realname) 

501 old = self._refs.get(realname) 

502 self._refs[realname] = new_ref 

503 self._notify(realname, new_ref) 

504 self._log( 

505 realname, 

506 old, 

507 new_ref, 

508 committer=committer, 

509 timestamp=timestamp, 

510 timezone=timezone, 

511 message=message, 

512 ) 

513 return True 

514 

515 def add_if_new( 

516 self, 

517 name: Ref, 

518 ref: ObjectID, 

519 committer=None, 

520 timestamp=None, 

521 timezone=None, 

522 message: Optional[bytes] = None, 

523 ): 

524 if name in self._refs: 

525 return False 

526 self._refs[name] = ref 

527 self._notify(name, ref) 

528 self._log( 

529 name, 

530 None, 

531 ref, 

532 committer=committer, 

533 timestamp=timestamp, 

534 timezone=timezone, 

535 message=message, 

536 ) 

537 return True 

538 

539 def remove_if_equals( 

540 self, 

541 name, 

542 old_ref, 

543 committer=None, 

544 timestamp=None, 

545 timezone=None, 

546 message=None, 

547 ): 

548 if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref: 

549 return False 

550 try: 

551 old = self._refs.pop(name) 

552 except KeyError: 

553 pass 

554 else: 

555 self._notify(name, None) 

556 self._log( 

557 name, 

558 old, 

559 None, 

560 committer=committer, 

561 timestamp=timestamp, 

562 timezone=timezone, 

563 message=message, 

564 ) 

565 return True 

566 

567 def get_peeled(self, name): 

568 return self._peeled.get(name) 

569 

570 def _update(self, refs): 

571 """Update multiple refs; intended only for testing.""" 

572 # TODO(dborowitz): replace this with a public function that uses 

573 # set_if_equal. 

574 for ref, sha in refs.items(): 

575 self.set_if_equals(ref, None, sha) 

576 

577 def _update_peeled(self, peeled): 

578 """Update cached peeled refs; intended only for testing.""" 

579 self._peeled.update(peeled) 

580 

581 

582class InfoRefsContainer(RefsContainer): 

583 """Refs container that reads refs from a info/refs file.""" 

584 

585 def __init__(self, f) -> None: 

586 self._refs = {} 

587 self._peeled = {} 

588 for line in f.readlines(): 

589 sha, name = line.rstrip(b"\n").split(b"\t") 

590 if name.endswith(PEELED_TAG_SUFFIX): 

591 name = name[:-3] 

592 if not check_ref_format(name): 

593 raise ValueError(f"invalid ref name {name!r}") 

594 self._peeled[name] = sha 

595 else: 

596 if not check_ref_format(name): 

597 raise ValueError(f"invalid ref name {name!r}") 

598 self._refs[name] = sha 

599 

600 def allkeys(self): 

601 return self._refs.keys() 

602 

603 def read_loose_ref(self, name): 

604 return self._refs.get(name, None) 

605 

606 def get_packed_refs(self): 

607 return {} 

608 

609 def get_peeled(self, name): 

610 try: 

611 return self._peeled[name] 

612 except KeyError: 

613 return self._refs[name] 

614 

615 

616class DiskRefsContainer(RefsContainer): 

617 """Refs container that reads refs from disk.""" 

618 

619 def __init__(self, path, worktree_path=None, logger=None) -> None: 

620 super().__init__(logger=logger) 

621 if getattr(path, "encode", None) is not None: 

622 path = os.fsencode(path) 

623 self.path = path 

624 if worktree_path is None: 

625 worktree_path = path 

626 if getattr(worktree_path, "encode", None) is not None: 

627 worktree_path = os.fsencode(worktree_path) 

628 self.worktree_path = worktree_path 

629 self._packed_refs = None 

630 self._peeled_refs = None 

631 

632 def __repr__(self) -> str: 

633 return f"{self.__class__.__name__}({self.path!r})" 

634 

635 def subkeys(self, base): 

636 subkeys = set() 

637 path = self.refpath(base) 

638 for root, unused_dirs, files in os.walk(path): 

639 dir = root[len(path) :] 

640 if os.path.sep != "/": 

641 dir = dir.replace(os.fsencode(os.path.sep), b"/") 

642 dir = dir.strip(b"/") 

643 for filename in files: 

644 refname = b"/".join(([dir] if dir else []) + [filename]) 

645 # check_ref_format requires at least one /, so we prepend the 

646 # base before calling it. 

647 if check_ref_format(base + b"/" + refname): 

648 subkeys.add(refname) 

649 for key in self.get_packed_refs(): 

650 if key.startswith(base): 

651 subkeys.add(key[len(base) :].strip(b"/")) 

652 return subkeys 

653 

654 def allkeys(self): 

655 allkeys = set() 

656 if os.path.exists(self.refpath(HEADREF)): 

657 allkeys.add(HEADREF) 

658 path = self.refpath(b"") 

659 refspath = self.refpath(b"refs") 

660 for root, unused_dirs, files in os.walk(refspath): 

661 dir = root[len(path) :] 

662 if os.path.sep != "/": 

663 dir = dir.replace(os.fsencode(os.path.sep), b"/") 

664 for filename in files: 

665 refname = b"/".join([dir, filename]) 

666 if check_ref_format(refname): 

667 allkeys.add(refname) 

668 allkeys.update(self.get_packed_refs()) 

669 return allkeys 

670 

671 def refpath(self, name): 

672 """Return the disk path of a ref.""" 

673 if os.path.sep != "/": 

674 name = name.replace(b"/", os.fsencode(os.path.sep)) 

675 # TODO: as the 'HEAD' reference is working tree specific, it 

676 # should actually not be a part of RefsContainer 

677 if name == HEADREF: 

678 return os.path.join(self.worktree_path, name) 

679 else: 

680 return os.path.join(self.path, name) 

681 

682 def get_packed_refs(self): 

683 """Get contents of the packed-refs file. 

684 

685 Returns: Dictionary mapping ref names to SHA1s 

686 

687 Note: Will return an empty dictionary when no packed-refs file is 

688 present. 

689 """ 

690 # TODO: invalidate the cache on repacking 

691 if self._packed_refs is None: 

692 # set both to empty because we want _peeled_refs to be 

693 # None if and only if _packed_refs is also None. 

694 self._packed_refs = {} 

695 self._peeled_refs = {} 

696 path = os.path.join(self.path, b"packed-refs") 

697 try: 

698 f = GitFile(path, "rb") 

699 except FileNotFoundError: 

700 return {} 

701 with f: 

702 first_line = next(iter(f)).rstrip() 

703 if first_line.startswith(b"# pack-refs") and b" peeled" in first_line: 

704 for sha, name, peeled in read_packed_refs_with_peeled(f): 

705 self._packed_refs[name] = sha 

706 if peeled: 

707 self._peeled_refs[name] = peeled 

708 else: 

709 f.seek(0) 

710 for sha, name in read_packed_refs(f): 

711 self._packed_refs[name] = sha 

712 return self._packed_refs 

713 

714 def add_packed_refs(self, new_refs: Dict[Ref, Optional[ObjectID]]): 

715 """Add the given refs as packed refs. 

716 

717 Args: 

718 new_refs: A mapping of ref names to targets; if a target is None that 

719 means remove the ref 

720 """ 

721 if not new_refs: 

722 return 

723 

724 path = os.path.join(self.path, b"packed-refs") 

725 

726 with GitFile(path, "wb") as f: 

727 # reread cached refs from disk, while holding the lock 

728 packed_refs = self.get_packed_refs().copy() 

729 

730 for ref, target in new_refs.items(): 

731 # sanity check 

732 if ref == HEADREF: 

733 raise ValueError("cannot pack HEAD") 

734 

735 # remove any loose refs pointing to this one -- please 

736 # note that this bypasses remove_if_equals as we don't 

737 # want to affect packed refs in here 

738 with suppress(OSError): 

739 os.remove(self.refpath(ref)) 

740 

741 if target is not None: 

742 packed_refs[ref] = target 

743 else: 

744 packed_refs.pop(ref, None) 

745 

746 write_packed_refs(f, packed_refs, self._peeled_refs) 

747 

748 self._packed_refs = packed_refs 

749 

750 def get_peeled(self, name): 

751 """Return the cached peeled value of a ref, if available. 

752 

753 Args: 

754 name: Name of the ref to peel 

755 Returns: The peeled value of the ref. If the ref is known not point to 

756 a tag, this will be the SHA the ref refers to. If the ref may point 

757 to a tag, but no cached information is available, None is returned. 

758 """ 

759 self.get_packed_refs() 

760 if self._peeled_refs is None or name not in self._packed_refs: 

761 # No cache: no peeled refs were read, or this ref is loose 

762 return None 

763 if name in self._peeled_refs: 

764 return self._peeled_refs[name] 

765 else: 

766 # Known not peelable 

767 return self[name] 

768 

769 def read_loose_ref(self, name): 

770 """Read a reference file and return its contents. 

771 

772 If the reference file a symbolic reference, only read the first line of 

773 the file. Otherwise, only read the first 40 bytes. 

774 

775 Args: 

776 name: the refname to read, relative to refpath 

777 Returns: The contents of the ref file, or None if the file does not 

778 exist. 

779 

780 Raises: 

781 IOError: if any other error occurs 

782 """ 

783 filename = self.refpath(name) 

784 try: 

785 with GitFile(filename, "rb") as f: 

786 header = f.read(len(SYMREF)) 

787 if header == SYMREF: 

788 # Read only the first line 

789 return header + next(iter(f)).rstrip(b"\r\n") 

790 else: 

791 # Read only the first 40 bytes 

792 return header + f.read(40 - len(SYMREF)) 

793 except (OSError, UnicodeError): 

794 # don't assume anything specific about the error; in 

795 # particular, invalid or forbidden paths can raise weird 

796 # errors depending on the specific operating system 

797 return None 

798 

799 def _remove_packed_ref(self, name): 

800 if self._packed_refs is None: 

801 return 

802 filename = os.path.join(self.path, b"packed-refs") 

803 # reread cached refs from disk, while holding the lock 

804 f = GitFile(filename, "wb") 

805 try: 

806 self._packed_refs = None 

807 self.get_packed_refs() 

808 

809 if name not in self._packed_refs: 

810 return 

811 

812 del self._packed_refs[name] 

813 with suppress(KeyError): 

814 del self._peeled_refs[name] 

815 write_packed_refs(f, self._packed_refs, self._peeled_refs) 

816 f.close() 

817 finally: 

818 f.abort() 

819 

820 def set_symbolic_ref( 

821 self, 

822 name, 

823 other, 

824 committer=None, 

825 timestamp=None, 

826 timezone=None, 

827 message=None, 

828 ): 

829 """Make a ref point at another ref. 

830 

831 Args: 

832 name: Name of the ref to set 

833 other: Name of the ref to point at 

834 message: Optional message to describe the change 

835 """ 

836 self._check_refname(name) 

837 self._check_refname(other) 

838 filename = self.refpath(name) 

839 f = GitFile(filename, "wb") 

840 try: 

841 f.write(SYMREF + other + b"\n") 

842 sha = self.follow(name)[-1] 

843 self._log( 

844 name, 

845 sha, 

846 sha, 

847 committer=committer, 

848 timestamp=timestamp, 

849 timezone=timezone, 

850 message=message, 

851 ) 

852 except BaseException: 

853 f.abort() 

854 raise 

855 else: 

856 f.close() 

857 

858 def set_if_equals( 

859 self, 

860 name, 

861 old_ref, 

862 new_ref, 

863 committer=None, 

864 timestamp=None, 

865 timezone=None, 

866 message=None, 

867 ): 

868 """Set a refname to new_ref only if it currently equals old_ref. 

869 

870 This method follows all symbolic references, and can be used to perform 

871 an atomic compare-and-swap operation. 

872 

873 Args: 

874 name: The refname to set. 

875 old_ref: The old sha the refname must refer to, or None to set 

876 unconditionally. 

877 new_ref: The new sha the refname will refer to. 

878 message: Set message for reflog 

879 Returns: True if the set was successful, False otherwise. 

880 """ 

881 self._check_refname(name) 

882 try: 

883 realnames, _ = self.follow(name) 

884 realname = realnames[-1] 

885 except (KeyError, IndexError, SymrefLoop): 

886 realname = name 

887 filename = self.refpath(realname) 

888 

889 # make sure none of the ancestor folders is in packed refs 

890 probe_ref = os.path.dirname(realname) 

891 packed_refs = self.get_packed_refs() 

892 while probe_ref: 

893 if packed_refs.get(probe_ref, None) is not None: 

894 raise NotADirectoryError(filename) 

895 probe_ref = os.path.dirname(probe_ref) 

896 

897 ensure_dir_exists(os.path.dirname(filename)) 

898 with GitFile(filename, "wb") as f: 

899 if old_ref is not None: 

900 try: 

901 # read again while holding the lock 

902 orig_ref = self.read_loose_ref(realname) 

903 if orig_ref is None: 

904 orig_ref = self.get_packed_refs().get(realname, ZERO_SHA) 

905 if orig_ref != old_ref: 

906 f.abort() 

907 return False 

908 except OSError: 

909 f.abort() 

910 raise 

911 try: 

912 f.write(new_ref + b"\n") 

913 except OSError: 

914 f.abort() 

915 raise 

916 self._log( 

917 realname, 

918 old_ref, 

919 new_ref, 

920 committer=committer, 

921 timestamp=timestamp, 

922 timezone=timezone, 

923 message=message, 

924 ) 

925 return True 

926 

927 def add_if_new( 

928 self, 

929 name: bytes, 

930 ref: bytes, 

931 committer=None, 

932 timestamp=None, 

933 timezone=None, 

934 message: Optional[bytes] = None, 

935 ): 

936 """Add a new reference only if it does not already exist. 

937 

938 This method follows symrefs, and only ensures that the last ref in the 

939 chain does not exist. 

940 

941 Args: 

942 name: The refname to set. 

943 ref: The new sha the refname will refer to. 

944 message: Optional message for reflog 

945 Returns: True if the add was successful, False otherwise. 

946 """ 

947 try: 

948 realnames, contents = self.follow(name) 

949 if contents is not None: 

950 return False 

951 realname = realnames[-1] 

952 except (KeyError, IndexError): 

953 realname = name 

954 self._check_refname(realname) 

955 filename = self.refpath(realname) 

956 ensure_dir_exists(os.path.dirname(filename)) 

957 with GitFile(filename, "wb") as f: 

958 if os.path.exists(filename) or name in self.get_packed_refs(): 

959 f.abort() 

960 return False 

961 try: 

962 f.write(ref + b"\n") 

963 except OSError: 

964 f.abort() 

965 raise 

966 else: 

967 self._log( 

968 name, 

969 None, 

970 ref, 

971 committer=committer, 

972 timestamp=timestamp, 

973 timezone=timezone, 

974 message=message, 

975 ) 

976 return True 

977 

978 def remove_if_equals( 

979 self, 

980 name, 

981 old_ref, 

982 committer=None, 

983 timestamp=None, 

984 timezone=None, 

985 message=None, 

986 ): 

987 """Remove a refname only if it currently equals old_ref. 

988 

989 This method does not follow symbolic references. It can be used to 

990 perform an atomic compare-and-delete operation. 

991 

992 Args: 

993 name: The refname to delete. 

994 old_ref: The old sha the refname must refer to, or None to 

995 delete unconditionally. 

996 message: Optional message 

997 Returns: True if the delete was successful, False otherwise. 

998 """ 

999 self._check_refname(name) 

1000 filename = self.refpath(name) 

1001 ensure_dir_exists(os.path.dirname(filename)) 

1002 f = GitFile(filename, "wb") 

1003 try: 

1004 if old_ref is not None: 

1005 orig_ref = self.read_loose_ref(name) 

1006 if orig_ref is None: 

1007 orig_ref = self.get_packed_refs().get(name, ZERO_SHA) 

1008 if orig_ref != old_ref: 

1009 return False 

1010 

1011 # remove the reference file itself 

1012 try: 

1013 found = os.path.lexists(filename) 

1014 except OSError: 

1015 # may only be packed, or otherwise unstorable 

1016 found = False 

1017 

1018 if found: 

1019 os.remove(filename) 

1020 

1021 self._remove_packed_ref(name) 

1022 self._log( 

1023 name, 

1024 old_ref, 

1025 None, 

1026 committer=committer, 

1027 timestamp=timestamp, 

1028 timezone=timezone, 

1029 message=message, 

1030 ) 

1031 finally: 

1032 # never write, we just wanted the lock 

1033 f.abort() 

1034 

1035 # outside of the lock, clean-up any parent directory that might now 

1036 # be empty. this ensures that re-creating a reference of the same 

1037 # name of what was previously a directory works as expected 

1038 parent = name 

1039 while True: 

1040 try: 

1041 parent, _ = parent.rsplit(b"/", 1) 

1042 except ValueError: 

1043 break 

1044 

1045 if parent == b"refs": 

1046 break 

1047 parent_filename = self.refpath(parent) 

1048 try: 

1049 os.rmdir(parent_filename) 

1050 except OSError: 

1051 # this can be caused by the parent directory being 

1052 # removed by another process, being not empty, etc. 

1053 # in any case, this is non fatal because we already 

1054 # removed the reference, just ignore it 

1055 break 

1056 

1057 return True 

1058 

1059 

1060def _split_ref_line(line): 

1061 """Split a single ref line into a tuple of SHA1 and name.""" 

1062 fields = line.rstrip(b"\n\r").split(b" ") 

1063 if len(fields) != 2: 

1064 raise PackedRefsException(f"invalid ref line {line!r}") 

1065 sha, name = fields 

1066 if not valid_hexsha(sha): 

1067 raise PackedRefsException(f"Invalid hex sha {sha!r}") 

1068 if not check_ref_format(name): 

1069 raise PackedRefsException(f"invalid ref name {name!r}") 

1070 return (sha, name) 

1071 

1072 

1073def read_packed_refs(f): 

1074 """Read a packed refs file. 

1075 

1076 Args: 

1077 f: file-like object to read from 

1078 Returns: Iterator over tuples with SHA1s and ref names. 

1079 """ 

1080 for line in f: 

1081 if line.startswith(b"#"): 

1082 # Comment 

1083 continue 

1084 if line.startswith(b"^"): 

1085 raise PackedRefsException("found peeled ref in packed-refs without peeled") 

1086 yield _split_ref_line(line) 

1087 

1088 

1089def read_packed_refs_with_peeled(f): 

1090 """Read a packed refs file including peeled refs. 

1091 

1092 Assumes the "# pack-refs with: peeled" line was already read. Yields tuples 

1093 with ref names, SHA1s, and peeled SHA1s (or None). 

1094 

1095 Args: 

1096 f: file-like object to read from, seek'ed to the second line 

1097 """ 

1098 last = None 

1099 for line in f: 

1100 if line[0] == b"#": 

1101 continue 

1102 line = line.rstrip(b"\r\n") 

1103 if line.startswith(b"^"): 

1104 if not last: 

1105 raise PackedRefsException("unexpected peeled ref line") 

1106 if not valid_hexsha(line[1:]): 

1107 raise PackedRefsException(f"Invalid hex sha {line[1:]!r}") 

1108 sha, name = _split_ref_line(last) 

1109 last = None 

1110 yield (sha, name, line[1:]) 

1111 else: 

1112 if last: 

1113 sha, name = _split_ref_line(last) 

1114 yield (sha, name, None) 

1115 last = line 

1116 if last: 

1117 sha, name = _split_ref_line(last) 

1118 yield (sha, name, None) 

1119 

1120 

1121def write_packed_refs(f, packed_refs, peeled_refs=None): 

1122 """Write a packed refs file. 

1123 

1124 Args: 

1125 f: empty file-like object to write to 

1126 packed_refs: dict of refname to sha of packed refs to write 

1127 peeled_refs: dict of refname to peeled value of sha 

1128 """ 

1129 if peeled_refs is None: 

1130 peeled_refs = {} 

1131 else: 

1132 f.write(b"# pack-refs with: peeled\n") 

1133 for refname in sorted(packed_refs.keys()): 

1134 f.write(git_line(packed_refs[refname], refname)) 

1135 if refname in peeled_refs: 

1136 f.write(b"^" + peeled_refs[refname] + b"\n") 

1137 

1138 

1139def read_info_refs(f): 

1140 ret = {} 

1141 for line in f.readlines(): 

1142 (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1) 

1143 ret[name] = sha 

1144 return ret 

1145 

1146 

1147def write_info_refs(refs, store: ObjectContainer): 

1148 """Generate info refs.""" 

1149 # TODO: Avoid recursive import :( 

1150 from .object_store import peel_sha 

1151 

1152 for name, sha in sorted(refs.items()): 

1153 # get_refs() includes HEAD as a special case, but we don't want to 

1154 # advertise it 

1155 if name == HEADREF: 

1156 continue 

1157 try: 

1158 o = store[sha] 

1159 except KeyError: 

1160 continue 

1161 unpeeled, peeled = peel_sha(store, sha) 

1162 yield o.id + b"\t" + name + b"\n" 

1163 if o.id != peeled.id: 

1164 yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n" 

1165 

1166 

1167def is_local_branch(x): 

1168 return x.startswith(LOCAL_BRANCH_PREFIX) 

1169 

1170 

1171def strip_peeled_refs(refs): 

1172 """Remove all peeled refs.""" 

1173 return { 

1174 ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX) 

1175 } 

1176 

1177 

1178def _set_origin_head(refs, origin, origin_head): 

1179 # set refs/remotes/origin/HEAD 

1180 origin_base = b"refs/remotes/" + origin + b"/" 

1181 if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1182 origin_ref = origin_base + HEADREF 

1183 target_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :] 

1184 if target_ref in refs: 

1185 refs.set_symbolic_ref(origin_ref, target_ref) 

1186 

1187 

1188def _set_default_branch( 

1189 refs: RefsContainer, 

1190 origin: bytes, 

1191 origin_head: bytes, 

1192 branch: bytes, 

1193 ref_message: Optional[bytes], 

1194) -> bytes: 

1195 """Set the default branch.""" 

1196 origin_base = b"refs/remotes/" + origin + b"/" 

1197 if branch: 

1198 origin_ref = origin_base + branch 

1199 if origin_ref in refs: 

1200 local_ref = LOCAL_BRANCH_PREFIX + branch 

1201 refs.add_if_new(local_ref, refs[origin_ref], ref_message) 

1202 head_ref = local_ref 

1203 elif LOCAL_TAG_PREFIX + branch in refs: 

1204 head_ref = LOCAL_TAG_PREFIX + branch 

1205 else: 

1206 raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag") 

1207 elif origin_head: 

1208 head_ref = origin_head 

1209 if origin_head.startswith(LOCAL_BRANCH_PREFIX): 

1210 origin_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :] 

1211 else: 

1212 origin_ref = origin_head 

1213 try: 

1214 refs.add_if_new(head_ref, refs[origin_ref], ref_message) 

1215 except KeyError: 

1216 pass 

1217 else: 

1218 raise ValueError("neither origin_head nor branch are provided") 

1219 return head_ref 

1220 

1221 

1222def _set_head(refs, head_ref, ref_message): 

1223 if head_ref.startswith(LOCAL_TAG_PREFIX): 

1224 # detach HEAD at specified tag 

1225 head = refs[head_ref] 

1226 if isinstance(head, Tag): 

1227 _cls, obj = head.object 

1228 head = obj.get_object(obj).id 

1229 del refs[HEADREF] 

1230 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1231 else: 

1232 # set HEAD to specific branch 

1233 try: 

1234 head = refs[head_ref] 

1235 refs.set_symbolic_ref(HEADREF, head_ref) 

1236 refs.set_if_equals(HEADREF, None, head, message=ref_message) 

1237 except KeyError: 

1238 head = None 

1239 return head 

1240 

1241 

1242def _import_remote_refs( 

1243 refs_container: RefsContainer, 

1244 remote_name: str, 

1245 refs: Dict[str, str], 

1246 message: Optional[bytes] = None, 

1247 prune: bool = False, 

1248 prune_tags: bool = False, 

1249): 

1250 stripped_refs = strip_peeled_refs(refs) 

1251 branches = { 

1252 n[len(LOCAL_BRANCH_PREFIX) :]: v 

1253 for (n, v) in stripped_refs.items() 

1254 if n.startswith(LOCAL_BRANCH_PREFIX) 

1255 } 

1256 refs_container.import_refs( 

1257 b"refs/remotes/" + remote_name.encode(), 

1258 branches, 

1259 message=message, 

1260 prune=prune, 

1261 ) 

1262 tags = { 

1263 n[len(LOCAL_TAG_PREFIX) :]: v 

1264 for (n, v) in stripped_refs.items() 

1265 if n.startswith(LOCAL_TAG_PREFIX) and not n.endswith(PEELED_TAG_SUFFIX) 

1266 } 

1267 refs_container.import_refs( 

1268 LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags 

1269 ) 

1270 

1271 

1272def serialize_refs(store, refs): 

1273 # TODO: Avoid recursive import :( 

1274 from .object_store import peel_sha 

1275 

1276 ret = {} 

1277 for ref, sha in refs.items(): 

1278 try: 

1279 unpeeled, peeled = peel_sha(store, sha) 

1280 except KeyError: 

1281 warnings.warn( 

1282 "ref {} points at non-present sha {}".format( 

1283 ref.decode("utf-8", "replace"), sha.decode("ascii") 

1284 ), 

1285 UserWarning, 

1286 ) 

1287 continue 

1288 else: 

1289 if isinstance(unpeeled, Tag): 

1290 ret[ref + PEELED_TAG_SUFFIX] = peeled.id 

1291 ret[ref] = unpeeled.id 

1292 return ret