Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tuf/api/_payload.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

695 statements  

1# Copyright the TUF contributors 

2# SPDX-License-Identifier: MIT OR Apache-2.0 

3 

4 

5"""Helper classes for low-level Metadata API.""" 

6 

7from __future__ import annotations 

8 

9import abc 

10import fnmatch 

11import hashlib 

12import io 

13import logging 

14import sys 

15from dataclasses import dataclass 

16from datetime import datetime, timezone 

17from typing import ( 

18 IO, 

19 TYPE_CHECKING, 

20 Any, 

21 ClassVar, 

22 TypeVar, 

23) 

24 

25from securesystemslib import exceptions as sslib_exceptions 

26from securesystemslib.signer import Key, Signature 

27 

28from tuf.api.exceptions import LengthOrHashMismatchError, UnsignedMetadataError 

29 

30if TYPE_CHECKING: 

31 from collections.abc import Iterator 

32 

33_ROOT = "root" 

34_SNAPSHOT = "snapshot" 

35_TARGETS = "targets" 

36_TIMESTAMP = "timestamp" 

37 

38_DEFAULT_HASH_ALGORITHM = "sha256" 

39_BLAKE_HASH_ALGORITHM = "blake2b-256" 

40 

41# We aim to support SPECIFICATION_VERSION and require the input metadata 

42# files to have the same major version (the first number) as ours. 

43SPECIFICATION_VERSION = ["1", "0", "31"] 

44TOP_LEVEL_ROLE_NAMES = {_ROOT, _TIMESTAMP, _SNAPSHOT, _TARGETS} 

45 

46logger = logging.getLogger(__name__) 

47 

48# T is a Generic type constraint for container payloads 

49T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets") 

50 

51 

52def _get_digest(algo: str) -> Any: # noqa: ANN401 

53 """New digest helper to support custom "blake2b-256" algo name.""" 

54 if algo == _BLAKE_HASH_ALGORITHM: 

55 return hashlib.blake2b(digest_size=32) 

56 

57 return hashlib.new(algo) 

58 

59 

60def _hash_bytes(data: bytes, algo: str) -> str: 

61 """Returns hexdigest for data using algo.""" 

62 digest = _get_digest(algo) 

63 digest.update(data) 

64 

65 return digest.hexdigest() 

66 

67 

68def _hash_file(f: IO[bytes], algo: str) -> str: 

69 """Returns hexdigest for file using algo.""" 

70 f.seek(0) 

71 if sys.version_info >= (3, 11): 

72 digest = hashlib.file_digest(f, lambda: _get_digest(algo)) # type: ignore[arg-type] 

73 

74 else: 

75 # Fallback for older Pythons. Chunk size is taken from the previously 

76 # used and now deprecated `securesystemslib.hash.digest_fileobject`. 

77 digest = _get_digest(algo) 

78 for chunk in iter(lambda: f.read(4096), b""): 

79 digest.update(chunk) 

80 

81 return digest.hexdigest() 

82 

83 

84class Signed(metaclass=abc.ABCMeta): 

85 """A base class for the signed part of TUF metadata. 

86 

87 Objects with base class Signed are usually included in a ``Metadata`` object 

88 on the signed attribute. This class provides attributes and methods that 

89 are common for all TUF metadata types (roles). 

90 

91 *All parameters named below are not just constructor arguments but also 

92 instance attributes.* 

93 

94 Args: 

95 version: Metadata version number. If None, then 1 is assigned. 

96 spec_version: Supported TUF specification version. If None, then the 

97 version currently supported by the library is assigned. 

98 expires: Metadata expiry date in UTC timezone. If None, then current 

99 date and time is assigned. 

100 unrecognized_fields: Dictionary of all attributes that are not managed 

101 by TUF Metadata API 

102 

103 Raises: 

104 ValueError: Invalid arguments. 

105 """ 

106 

107 # type is required for static reference without changing the API 

108 type: ClassVar[str] = "signed" 

109 

110 # _type and type are identical: 1st replicates file format, 2nd passes lint 

111 @property 

112 def _type(self) -> str: 

113 return self.type 

114 

115 @property 

116 def expires(self) -> datetime: 

117 """Get the metadata expiry date.""" 

118 return self._expires 

119 

120 @expires.setter 

121 def expires(self, value: datetime) -> None: 

122 """Set the metadata expiry date. 

123 

124 # Use 'datetime' module to e.g. expire in seven days from now 

125 obj.expires = now(timezone.utc) + timedelta(days=7) 

126 """ 

127 self._expires = value.replace(microsecond=0) 

128 if self._expires.tzinfo is None: 

129 # Naive datetime: just make it UTC 

130 self._expires = self._expires.replace(tzinfo=timezone.utc) 

131 elif self._expires.tzinfo != timezone.utc: 

132 raise ValueError(f"Expected tz UTC, not {self._expires.tzinfo}") 

133 

134 # NOTE: Signed is a stupid name, because this might not be signed yet, but 

135 # we keep it to match spec terminology (I often refer to this as "payload", 

136 # or "inner metadata") 

137 def __init__( 

138 self, 

139 version: int | None, 

140 spec_version: str | None, 

141 expires: datetime | None, 

142 unrecognized_fields: dict[str, Any] | None, 

143 ): 

144 if spec_version is None: 

145 spec_version = ".".join(SPECIFICATION_VERSION) 

146 # Accept semver (X.Y.Z) but also X.Y for legacy compatibility 

147 spec_list = spec_version.split(".") 

148 if len(spec_list) not in [2, 3] or not all( 

149 el.isdigit() for el in spec_list 

150 ): 

151 raise ValueError(f"Failed to parse spec_version {spec_version}") 

152 

153 # major version must match 

154 if spec_list[0] != SPECIFICATION_VERSION[0]: 

155 raise ValueError(f"Unsupported spec_version {spec_version}") 

156 

157 self.spec_version = spec_version 

158 

159 self.expires = expires or datetime.now(timezone.utc) 

160 

161 if version is None: 

162 version = 1 

163 elif version <= 0: 

164 raise ValueError(f"version must be > 0, got {version}") 

165 self.version = version 

166 

167 if unrecognized_fields is None: 

168 unrecognized_fields = {} 

169 

170 self.unrecognized_fields = unrecognized_fields 

171 

172 def __eq__(self, other: object) -> bool: 

173 if not isinstance(other, Signed): 

174 return False 

175 

176 return ( 

177 self.type == other.type 

178 and self.version == other.version 

179 and self.spec_version == other.spec_version 

180 and self.expires == other.expires 

181 and self.unrecognized_fields == other.unrecognized_fields 

182 ) 

183 

184 def __hash__(self) -> int: 

185 return hash( 

186 ( 

187 self.type, 

188 self.version, 

189 self.spec_version, 

190 self.expires, 

191 self.unrecognized_fields, 

192 ) 

193 ) 

194 

195 @abc.abstractmethod 

196 def to_dict(self) -> dict[str, Any]: 

197 """Serialize and return a dict representation of self.""" 

198 raise NotImplementedError 

199 

200 @classmethod 

201 @abc.abstractmethod 

202 def from_dict(cls, signed_dict: dict[str, Any]) -> Signed: 

203 """Deserialization helper, creates object from json/dict 

204 representation. 

205 """ 

206 raise NotImplementedError 

207 

208 @classmethod 

209 def _common_fields_from_dict( 

210 cls, signed_dict: dict[str, Any] 

211 ) -> tuple[int, str, datetime]: 

212 """Return common fields of ``Signed`` instances from the passed dict 

213 representation, and returns an ordered list to be passed as leading 

214 positional arguments to a subclass constructor. 

215 

216 See ``{Root, Timestamp, Snapshot, Targets}.from_dict`` 

217 methods for usage. 

218 

219 """ 

220 _type = signed_dict.pop("_type") 

221 if _type != cls.type: 

222 raise ValueError(f"Expected type {cls.type}, got {_type}") 

223 

224 version = signed_dict.pop("version") 

225 spec_version = signed_dict.pop("spec_version") 

226 expires_str = signed_dict.pop("expires") 

227 # Convert 'expires' TUF metadata string to a datetime object, which is 

228 # what the constructor expects and what we store. The inverse operation 

229 # is implemented in '_common_fields_to_dict'. 

230 expires = datetime.strptime(expires_str, "%Y-%m-%dT%H:%M:%SZ").replace( 

231 tzinfo=timezone.utc 

232 ) 

233 

234 return version, spec_version, expires 

235 

236 def _common_fields_to_dict(self) -> dict[str, Any]: 

237 """Return a dict representation of common fields of 

238 ``Signed`` instances. 

239 

240 See ``{Root, Timestamp, Snapshot, Targets}.to_dict`` methods for usage. 

241 

242 """ 

243 return { 

244 "_type": self._type, 

245 "version": self.version, 

246 "spec_version": self.spec_version, 

247 "expires": self.expires.strftime("%Y-%m-%dT%H:%M:%SZ"), 

248 **self.unrecognized_fields, 

249 } 

250 

251 def is_expired(self, reference_time: datetime | None = None) -> bool: 

252 """Check metadata expiration against a reference time. 

253 

254 Args: 

255 reference_time: Time to check expiration date against. A naive 

256 datetime in UTC expected. Default is current UTC date and time. 

257 

258 Returns: 

259 ``True`` if expiration time is less than the reference time. 

260 """ 

261 if reference_time is None: 

262 reference_time = datetime.now(timezone.utc) 

263 

264 return reference_time >= self.expires 

265 

266 

267class Role: 

268 """Container that defines which keys are required to sign roles metadata. 

269 

270 Role defines how many keys are required to successfully sign the roles 

271 metadata, and which keys are accepted. 

272 

273 *All parameters named below are not just constructor arguments but also 

274 instance attributes.* 

275 

276 Args: 

277 keyids: Roles signing key identifiers. 

278 threshold: Number of keys required to sign this role's metadata. 

279 unrecognized_fields: Dictionary of all attributes that are not managed 

280 by TUF Metadata API 

281 

282 Raises: 

283 ValueError: Invalid arguments. 

284 """ 

285 

286 def __init__( 

287 self, 

288 keyids: list[str], 

289 threshold: int, 

290 unrecognized_fields: dict[str, Any] | None = None, 

291 ): 

292 if len(set(keyids)) != len(keyids): 

293 raise ValueError(f"Nonunique keyids: {keyids}") 

294 if threshold < 1: 

295 raise ValueError("threshold should be at least 1!") 

296 self.keyids = keyids 

297 self.threshold = threshold 

298 if unrecognized_fields is None: 

299 unrecognized_fields = {} 

300 

301 self.unrecognized_fields = unrecognized_fields 

302 

303 def __eq__(self, other: object) -> bool: 

304 if not isinstance(other, Role): 

305 return False 

306 

307 return ( 

308 self.keyids == other.keyids 

309 and self.threshold == other.threshold 

310 and self.unrecognized_fields == other.unrecognized_fields 

311 ) 

312 

313 def __hash__(self) -> int: 

314 return hash((self.keyids, self.threshold, self.unrecognized_fields)) 

315 

316 @classmethod 

317 def from_dict(cls, role_dict: dict[str, Any]) -> Role: 

318 """Create ``Role`` object from its json/dict representation. 

319 

320 Raises: 

321 ValueError, KeyError: Invalid arguments. 

322 """ 

323 keyids = role_dict.pop("keyids") 

324 threshold = role_dict.pop("threshold") 

325 # All fields left in the role_dict are unrecognized. 

326 return cls(keyids, threshold, role_dict) 

327 

328 def to_dict(self) -> dict[str, Any]: 

329 """Return the dictionary representation of self.""" 

330 return { 

331 "keyids": self.keyids, 

332 "threshold": self.threshold, 

333 **self.unrecognized_fields, 

334 } 

335 

336 

337@dataclass 

338class VerificationResult: 

339 """Signature verification result for delegated role metadata. 

340 

341 Attributes: 

342 threshold: Number of required signatures. 

343 signed: dict of keyid to Key, containing keys that have signed. 

344 unsigned: dict of keyid to Key, containing keys that have not signed. 

345 """ 

346 

347 threshold: int 

348 signed: dict[str, Key] 

349 unsigned: dict[str, Key] 

350 

351 def __bool__(self) -> bool: 

352 return self.verified 

353 

354 @property 

355 def verified(self) -> bool: 

356 """True if threshold of signatures is met.""" 

357 return len(self.signed) >= self.threshold 

358 

359 @property 

360 def missing(self) -> int: 

361 """Number of additional signatures required to reach threshold.""" 

362 return max(0, self.threshold - len(self.signed)) 

363 

364 

365@dataclass 

366class RootVerificationResult: 

367 """Signature verification result for root metadata. 

368 

369 Root must be verified by itself and the previous root version. This 

370 dataclass represents both results. For the edge case of first version 

371 of root, these underlying results are identical. 

372 

373 Note that `signed` and `unsigned` correctness requires the underlying 

374 VerificationResult keys to not conflict (no reusing the same keyid for 

375 different keys). 

376 

377 Attributes: 

378 first: First underlying VerificationResult 

379 second: Second underlying VerificationResult 

380 """ 

381 

382 first: VerificationResult 

383 second: VerificationResult 

384 

385 def __bool__(self) -> bool: 

386 return self.verified 

387 

388 @property 

389 def verified(self) -> bool: 

390 """True if threshold of signatures is met in both underlying 

391 VerificationResults. 

392 """ 

393 return self.first.verified and self.second.verified 

394 

395 @property 

396 def signed(self) -> dict[str, Key]: 

397 """Dictionary of all signing keys that have signed, from both 

398 VerificationResults. 

399 """ 

400 return self.first.signed | self.second.signed 

401 

402 @property 

403 def unsigned(self) -> dict[str, Key]: 

404 """Dictionary of all signing keys that have not signed, from both 

405 VerificationResults. 

406 """ 

407 return self.first.unsigned | self.second.unsigned 

408 

409 

410class _DelegatorMixin(metaclass=abc.ABCMeta): 

411 """Class that implements verify_delegate() for Root and Targets""" 

412 

413 @abc.abstractmethod 

414 def get_delegated_role(self, delegated_role: str) -> Role: 

415 """Return the role object for the given delegated role. 

416 

417 Raises ValueError if delegated_role is not actually delegated. 

418 """ 

419 raise NotImplementedError 

420 

421 @abc.abstractmethod 

422 def get_key(self, keyid: str) -> Key: 

423 """Return the key object for the given keyid. 

424 

425 Raises ValueError if key is not found. 

426 """ 

427 raise NotImplementedError 

428 

429 def get_verification_result( 

430 self, 

431 delegated_role: str, 

432 payload: bytes, 

433 signatures: dict[str, Signature], 

434 ) -> VerificationResult: 

435 """Return signature threshold verification result for delegated role. 

436 

437 NOTE: Unlike `verify_delegate()` this method does not raise, if the 

438 role metadata is not fully verified. 

439 

440 Args: 

441 delegated_role: Name of the delegated role to verify 

442 payload: Signed payload bytes for the delegated role 

443 signatures: Signatures over payload bytes 

444 

445 Raises: 

446 ValueError: no delegation was found for ``delegated_role``. 

447 """ 

448 role = self.get_delegated_role(delegated_role) 

449 

450 signed = {} 

451 unsigned = {} 

452 

453 for keyid in role.keyids: 

454 try: 

455 key = self.get_key(keyid) 

456 except ValueError: 

457 logger.info("No key for keyid %s", keyid) 

458 continue 

459 

460 if keyid not in signatures: 

461 unsigned[keyid] = key 

462 logger.info("No signature for keyid %s", keyid) 

463 continue 

464 

465 sig = signatures[keyid] 

466 try: 

467 key.verify_signature(sig, payload) 

468 signed[keyid] = key 

469 except sslib_exceptions.UnverifiedSignatureError: 

470 unsigned[keyid] = key 

471 logger.info("Key %s failed to verify %s", keyid, delegated_role) 

472 

473 return VerificationResult(role.threshold, signed, unsigned) 

474 

475 def verify_delegate( 

476 self, 

477 delegated_role: str, 

478 payload: bytes, 

479 signatures: dict[str, Signature], 

480 ) -> None: 

481 """Verify signature threshold for delegated role. 

482 

483 Verify that there are enough valid ``signatures`` over ``payload``, to 

484 meet the threshold of keys for ``delegated_role``, as defined by the 

485 delegator (``self``). 

486 

487 Args: 

488 delegated_role: Name of the delegated role to verify 

489 payload: Signed payload bytes for the delegated role 

490 signatures: Signatures over payload bytes 

491 

492 Raises: 

493 UnsignedMetadataError: ``delegated_role`` was not signed with 

494 required threshold of keys for ``role_name``. 

495 ValueError: no delegation was found for ``delegated_role``. 

496 """ 

497 result = self.get_verification_result( 

498 delegated_role, payload, signatures 

499 ) 

500 if not result: 

501 raise UnsignedMetadataError( 

502 f"{delegated_role} was signed by {len(result.signed)}/" 

503 f"{result.threshold} keys" 

504 ) 

505 

506 

507class Root(Signed, _DelegatorMixin): 

508 """A container for the signed part of root metadata. 

509 

510 Parameters listed below are also instance attributes. 

511 

512 Args: 

513 version: Metadata version number. Default is 1. 

514 spec_version: Supported TUF specification version. Default is the 

515 version currently supported by the library. 

516 expires: Metadata expiry date. Default is current date and time. 

517 keys: Dictionary of keyids to Keys. Defines the keys used in ``roles``. 

518 Default is empty dictionary. 

519 roles: Dictionary of role names to Roles. Defines which keys are 

520 required to sign the metadata for a specific role. Default is 

521 a dictionary of top level roles without keys and threshold of 1. 

522 consistent_snapshot: ``True`` if repository supports consistent 

523 snapshots. Default is True. 

524 unrecognized_fields: Dictionary of all attributes that are not managed 

525 by TUF Metadata API 

526 

527 Raises: 

528 ValueError: Invalid arguments. 

529 """ 

530 

531 type = _ROOT 

532 

533 def __init__( 

534 self, 

535 version: int | None = None, 

536 spec_version: str | None = None, 

537 expires: datetime | None = None, 

538 keys: dict[str, Key] | None = None, 

539 roles: dict[str, Role] | None = None, 

540 consistent_snapshot: bool | None = True, 

541 unrecognized_fields: dict[str, Any] | None = None, 

542 ): 

543 super().__init__(version, spec_version, expires, unrecognized_fields) 

544 self.consistent_snapshot = consistent_snapshot 

545 self.keys = keys if keys is not None else {} 

546 

547 if roles is None: 

548 roles = {r: Role([], 1) for r in TOP_LEVEL_ROLE_NAMES} 

549 elif set(roles) != TOP_LEVEL_ROLE_NAMES: 

550 raise ValueError("Role names must be the top-level metadata roles") 

551 self.roles = roles 

552 

553 def __eq__(self, other: object) -> bool: 

554 if not isinstance(other, Root): 

555 return False 

556 

557 return ( 

558 super().__eq__(other) 

559 and self.keys == other.keys 

560 and self.roles == other.roles 

561 and self.consistent_snapshot == other.consistent_snapshot 

562 ) 

563 

564 def __hash__(self) -> int: 

565 return hash( 

566 ( 

567 super().__hash__(), 

568 self.keys, 

569 self.roles, 

570 self.consistent_snapshot, 

571 self.unrecognized_fields, 

572 ) 

573 ) 

574 

575 @classmethod 

576 def from_dict(cls, signed_dict: dict[str, Any]) -> Root: 

577 """Create ``Root`` object from its json/dict representation. 

578 

579 Raises: 

580 ValueError, KeyError, TypeError: Invalid arguments. 

581 """ 

582 common_args = cls._common_fields_from_dict(signed_dict) 

583 consistent_snapshot = signed_dict.pop("consistent_snapshot", None) 

584 keys = signed_dict.pop("keys") 

585 roles = signed_dict.pop("roles") 

586 

587 for keyid, key_dict in keys.items(): 

588 keys[keyid] = Key.from_dict(keyid, key_dict) 

589 for role_name, role_dict in roles.items(): 

590 roles[role_name] = Role.from_dict(role_dict) 

591 

592 # All fields left in the signed_dict are unrecognized. 

593 return cls(*common_args, keys, roles, consistent_snapshot, signed_dict) 

594 

595 def to_dict(self) -> dict[str, Any]: 

596 """Return the dict representation of self.""" 

597 root_dict = self._common_fields_to_dict() 

598 keys = {keyid: key.to_dict() for (keyid, key) in self.keys.items()} 

599 roles = {} 

600 for role_name, role in self.roles.items(): 

601 roles[role_name] = role.to_dict() 

602 if self.consistent_snapshot is not None: 

603 root_dict["consistent_snapshot"] = self.consistent_snapshot 

604 

605 root_dict.update( 

606 { 

607 "keys": keys, 

608 "roles": roles, 

609 } 

610 ) 

611 return root_dict 

612 

613 def add_key(self, key: Key, role: str) -> None: 

614 """Add new signing key for delegated role ``role``. 

615 

616 Args: 

617 key: Signing key to be added for ``role``. 

618 role: Name of the role, for which ``key`` is added. 

619 

620 Raises: 

621 ValueError: If the argument order is wrong or if ``role`` doesn't 

622 exist. 

623 """ 

624 # Verify that our users are not using the old argument order. 

625 if isinstance(role, Key): 

626 raise ValueError("Role must be a string, not a Key instance") 

627 

628 if role not in self.roles: 

629 raise ValueError(f"Role {role} doesn't exist") 

630 if key.keyid not in self.roles[role].keyids: 

631 self.roles[role].keyids.append(key.keyid) 

632 self.keys[key.keyid] = key 

633 

634 def revoke_key(self, keyid: str, role: str) -> None: 

635 """Revoke key from ``role`` and updates the key store. 

636 

637 Args: 

638 keyid: Identifier of the key to be removed for ``role``. 

639 role: Name of the role, for which a signing key is removed. 

640 

641 Raises: 

642 ValueError: If ``role`` doesn't exist or if ``role`` doesn't include 

643 the key. 

644 """ 

645 if role not in self.roles: 

646 raise ValueError(f"Role {role} doesn't exist") 

647 if keyid not in self.roles[role].keyids: 

648 raise ValueError(f"Key with id {keyid} is not used by {role}") 

649 self.roles[role].keyids.remove(keyid) 

650 for keyinfo in self.roles.values(): 

651 if keyid in keyinfo.keyids: 

652 return 

653 

654 del self.keys[keyid] 

655 

656 def get_delegated_role(self, delegated_role: str) -> Role: 

657 """Return the role object for the given delegated role. 

658 

659 Raises ValueError if delegated_role is not actually delegated. 

660 """ 

661 if delegated_role not in self.roles: 

662 raise ValueError(f"Delegated role {delegated_role} not found") 

663 

664 return self.roles[delegated_role] 

665 

666 def get_key(self, keyid: str) -> Key: 

667 if keyid not in self.keys: 

668 raise ValueError(f"Key {keyid} not found") 

669 

670 return self.keys[keyid] 

671 

672 def get_root_verification_result( 

673 self, 

674 previous: Root | None, 

675 payload: bytes, 

676 signatures: dict[str, Signature], 

677 ) -> RootVerificationResult: 

678 """Return signature threshold verification result for two root roles. 

679 

680 Verify root metadata with two roles (`self` and optionally `previous`). 

681 

682 If the repository has no root role versions yet, `previous` can be left 

683 None. In all other cases, `previous` must be the previous version of 

684 the Root. 

685 

686 NOTE: Unlike `verify_delegate()` this method does not raise, if the 

687 root metadata is not fully verified. 

688 

689 Args: 

690 previous: The previous `Root` to verify payload with, or None 

691 payload: Signed payload bytes for root 

692 signatures: Signatures over payload bytes 

693 

694 Raises: 

695 ValueError: no delegation was found for ``root`` or given Root 

696 versions are not sequential. 

697 """ 

698 

699 if previous is None: 

700 previous = self 

701 elif self.version != previous.version + 1: 

702 versions = f"v{previous.version} and v{self.version}" 

703 raise ValueError( 

704 f"Expected sequential root versions, got {versions}." 

705 ) 

706 

707 return RootVerificationResult( 

708 previous.get_verification_result(Root.type, payload, signatures), 

709 self.get_verification_result(Root.type, payload, signatures), 

710 ) 

711 

712 

713class BaseFile: 

714 """A base class of ``MetaFile`` and ``TargetFile``. 

715 

716 Encapsulates common static methods for length and hash verification. 

717 """ 

718 

719 @staticmethod 

720 def _verify_hashes( 

721 data: bytes | IO[bytes], expected_hashes: dict[str, str] 

722 ) -> None: 

723 """Verify that the hash of ``data`` matches ``expected_hashes``.""" 

724 for algo, exp_hash in expected_hashes.items(): 

725 try: 

726 if isinstance(data, bytes): 

727 observed_hash = _hash_bytes(data, algo) 

728 else: 

729 # if data is not bytes, assume it is a file object 

730 observed_hash = _hash_file(data, algo) 

731 except (ValueError, TypeError) as e: 

732 raise LengthOrHashMismatchError( 

733 f"Unsupported algorithm '{algo}'" 

734 ) from e 

735 

736 if observed_hash != exp_hash: 

737 raise LengthOrHashMismatchError( 

738 f"Observed hash {observed_hash} does not match " 

739 f"expected hash {exp_hash}" 

740 ) 

741 

742 @staticmethod 

743 def _verify_length(data: bytes | IO[bytes], expected_length: int) -> None: 

744 """Verify that the length of ``data`` matches ``expected_length``.""" 

745 if isinstance(data, bytes): 

746 observed_length = len(data) 

747 else: 

748 # if data is not bytes, assume it is a file object 

749 data.seek(0, io.SEEK_END) 

750 observed_length = data.tell() 

751 

752 if observed_length != expected_length: 

753 raise LengthOrHashMismatchError( 

754 f"Observed length {observed_length} does not match " 

755 f"expected length {expected_length}" 

756 ) 

757 

758 @staticmethod 

759 def _validate_hashes(hashes: dict[str, str]) -> None: 

760 if not hashes: 

761 raise ValueError("Hashes must be a non empty dictionary") 

762 for key, value in hashes.items(): 

763 if not (isinstance(key, str) and isinstance(value, str)): 

764 raise TypeError("Hashes items must be strings") 

765 

766 @staticmethod 

767 def _validate_length(length: int) -> None: 

768 if length < 0: 

769 raise ValueError(f"Length must be >= 0, got {length}") 

770 

771 @staticmethod 

772 def _get_length_and_hashes( 

773 data: bytes | IO[bytes], hash_algorithms: list[str] | None 

774 ) -> tuple[int, dict[str, str]]: 

775 """Calculate length and hashes of ``data``.""" 

776 if isinstance(data, bytes): 

777 length = len(data) 

778 else: 

779 data.seek(0, io.SEEK_END) 

780 length = data.tell() 

781 

782 hashes = {} 

783 

784 if hash_algorithms is None: 

785 hash_algorithms = [_DEFAULT_HASH_ALGORITHM] 

786 

787 for algorithm in hash_algorithms: 

788 try: 

789 if isinstance(data, bytes): 

790 hashes[algorithm] = _hash_bytes(data, algorithm) 

791 else: 

792 hashes[algorithm] = _hash_file(data, algorithm) 

793 except (ValueError, TypeError) as e: 

794 raise ValueError(f"Unsupported algorithm '{algorithm}'") from e 

795 

796 return (length, hashes) 

797 

798 

799class MetaFile(BaseFile): 

800 """A container with information about a particular metadata file. 

801 

802 *All parameters named below are not just constructor arguments but also 

803 instance attributes.* 

804 

805 Args: 

806 version: Version of the metadata file. 

807 length: Length of the metadata file in bytes. 

808 hashes: Dictionary of hash algorithm names to hashes of the metadata 

809 file content. 

810 unrecognized_fields: Dictionary of all attributes that are not managed 

811 by TUF Metadata API 

812 

813 Raises: 

814 ValueError, TypeError: Invalid arguments. 

815 """ 

816 

817 def __init__( 

818 self, 

819 version: int = 1, 

820 length: int | None = None, 

821 hashes: dict[str, str] | None = None, 

822 unrecognized_fields: dict[str, Any] | None = None, 

823 ): 

824 if version <= 0: 

825 raise ValueError(f"Metafile version must be > 0, got {version}") 

826 if length is not None: 

827 self._validate_length(length) 

828 if hashes is not None: 

829 self._validate_hashes(hashes) 

830 

831 self.version = version 

832 self.length = length 

833 self.hashes = hashes 

834 if unrecognized_fields is None: 

835 unrecognized_fields = {} 

836 

837 self.unrecognized_fields = unrecognized_fields 

838 

839 def __eq__(self, other: object) -> bool: 

840 if not isinstance(other, MetaFile): 

841 return False 

842 

843 return ( 

844 self.version == other.version 

845 and self.length == other.length 

846 and self.hashes == other.hashes 

847 and self.unrecognized_fields == other.unrecognized_fields 

848 ) 

849 

850 def __hash__(self) -> int: 

851 return hash( 

852 (self.version, self.length, self.hashes, self.unrecognized_fields) 

853 ) 

854 

855 @classmethod 

856 def from_dict(cls, meta_dict: dict[str, Any]) -> MetaFile: 

857 """Create ``MetaFile`` object from its json/dict representation. 

858 

859 Raises: 

860 ValueError, KeyError: Invalid arguments. 

861 """ 

862 version = meta_dict.pop("version") 

863 length = meta_dict.pop("length", None) 

864 hashes = meta_dict.pop("hashes", None) 

865 

866 # All fields left in the meta_dict are unrecognized. 

867 return cls(version, length, hashes, meta_dict) 

868 

869 @classmethod 

870 def from_data( 

871 cls, 

872 version: int, 

873 data: bytes | IO[bytes], 

874 hash_algorithms: list[str], 

875 ) -> MetaFile: 

876 """Creates MetaFile object from bytes. 

877 This constructor should only be used if hashes are wanted. 

878 By default, MetaFile(ver) should be used. 

879 Args: 

880 version: Version of the metadata file. 

881 data: Metadata bytes that the metafile represents. 

882 hash_algorithms: Hash algorithms to create the hashes with. If not 

883 specified, "sha256" is used. 

884 

885 Raises: 

886 ValueError: The hash algorithms list contains an unsupported 

887 algorithm. 

888 """ 

889 length, hashes = cls._get_length_and_hashes(data, hash_algorithms) 

890 return cls(version, length, hashes) 

891 

892 def to_dict(self) -> dict[str, Any]: 

893 """Return the dictionary representation of self.""" 

894 res_dict: dict[str, Any] = { 

895 "version": self.version, 

896 **self.unrecognized_fields, 

897 } 

898 

899 if self.length is not None: 

900 res_dict["length"] = self.length 

901 

902 if self.hashes is not None: 

903 res_dict["hashes"] = self.hashes 

904 

905 return res_dict 

906 

907 def verify_length_and_hashes(self, data: bytes | IO[bytes]) -> None: 

908 """Verify that the length and hashes of ``data`` match expected values. 

909 

910 Args: 

911 data: File object or its content in bytes. 

912 

913 Raises: 

914 LengthOrHashMismatchError: Calculated length or hashes do not 

915 match expected values or hash algorithm is not supported. 

916 """ 

917 if self.length is not None: 

918 self._verify_length(data, self.length) 

919 

920 if self.hashes is not None: 

921 self._verify_hashes(data, self.hashes) 

922 

923 

924class Timestamp(Signed): 

925 """A container for the signed part of timestamp metadata. 

926 

927 TUF file format uses a dictionary to contain the snapshot information: 

928 this is not the case with ``Timestamp.snapshot_meta`` which is a 

929 ``MetaFile``. 

930 

931 *All parameters named below are not just constructor arguments but also 

932 instance attributes.* 

933 

934 Args: 

935 version: Metadata version number. Default is 1. 

936 spec_version: Supported TUF specification version. Default is the 

937 version currently supported by the library. 

938 expires: Metadata expiry date. Default is current date and time. 

939 unrecognized_fields: Dictionary of all attributes that are not managed 

940 by TUF Metadata API 

941 snapshot_meta: Meta information for snapshot metadata. Default is a 

942 MetaFile with version 1. 

943 

944 Raises: 

945 ValueError: Invalid arguments. 

946 """ 

947 

948 type = _TIMESTAMP 

949 

950 def __init__( 

951 self, 

952 version: int | None = None, 

953 spec_version: str | None = None, 

954 expires: datetime | None = None, 

955 snapshot_meta: MetaFile | None = None, 

956 unrecognized_fields: dict[str, Any] | None = None, 

957 ): 

958 super().__init__(version, spec_version, expires, unrecognized_fields) 

959 self.snapshot_meta = snapshot_meta or MetaFile(1) 

960 

961 def __eq__(self, other: object) -> bool: 

962 if not isinstance(other, Timestamp): 

963 return False 

964 

965 return ( 

966 super().__eq__(other) and self.snapshot_meta == other.snapshot_meta 

967 ) 

968 

969 def __hash__(self) -> int: 

970 return hash((super().__hash__(), self.snapshot_meta)) 

971 

972 @classmethod 

973 def from_dict(cls, signed_dict: dict[str, Any]) -> Timestamp: 

974 """Create ``Timestamp`` object from its json/dict representation. 

975 

976 Raises: 

977 ValueError, KeyError: Invalid arguments. 

978 """ 

979 common_args = cls._common_fields_from_dict(signed_dict) 

980 meta_dict = signed_dict.pop("meta") 

981 snapshot_meta = MetaFile.from_dict(meta_dict["snapshot.json"]) 

982 # All fields left in the timestamp_dict are unrecognized. 

983 return cls(*common_args, snapshot_meta, signed_dict) 

984 

985 def to_dict(self) -> dict[str, Any]: 

986 """Return the dict representation of self.""" 

987 res_dict = self._common_fields_to_dict() 

988 res_dict["meta"] = {"snapshot.json": self.snapshot_meta.to_dict()} 

989 return res_dict 

990 

991 

992class Snapshot(Signed): 

993 """A container for the signed part of snapshot metadata. 

994 

995 Snapshot contains information about all target Metadata files. 

996 

997 *All parameters named below are not just constructor arguments but also 

998 instance attributes.* 

999 

1000 Args: 

1001 version: Metadata version number. Default is 1. 

1002 spec_version: Supported TUF specification version. Default is the 

1003 version currently supported by the library. 

1004 expires: Metadata expiry date. Default is current date and time. 

1005 unrecognized_fields: Dictionary of all attributes that are not managed 

1006 by TUF Metadata API 

1007 meta: Dictionary of targets filenames to ``MetaFile`` objects. Default 

1008 is a dictionary with a Metafile for "snapshot.json" version 1. 

1009 

1010 Raises: 

1011 ValueError: Invalid arguments. 

1012 """ 

1013 

1014 type = _SNAPSHOT 

1015 

1016 def __init__( 

1017 self, 

1018 version: int | None = None, 

1019 spec_version: str | None = None, 

1020 expires: datetime | None = None, 

1021 meta: dict[str, MetaFile] | None = None, 

1022 unrecognized_fields: dict[str, Any] | None = None, 

1023 ): 

1024 super().__init__(version, spec_version, expires, unrecognized_fields) 

1025 self.meta = meta if meta is not None else {"targets.json": MetaFile(1)} 

1026 

1027 def __eq__(self, other: object) -> bool: 

1028 if not isinstance(other, Snapshot): 

1029 return False 

1030 

1031 return super().__eq__(other) and self.meta == other.meta 

1032 

1033 def __hash__(self) -> int: 

1034 return hash((super().__hash__(), self.meta)) 

1035 

1036 @classmethod 

1037 def from_dict(cls, signed_dict: dict[str, Any]) -> Snapshot: 

1038 """Create ``Snapshot`` object from its json/dict representation. 

1039 

1040 Raises: 

1041 ValueError, KeyError: Invalid arguments. 

1042 """ 

1043 common_args = cls._common_fields_from_dict(signed_dict) 

1044 meta_dicts = signed_dict.pop("meta") 

1045 meta = {} 

1046 for meta_path, meta_dict in meta_dicts.items(): 

1047 meta[meta_path] = MetaFile.from_dict(meta_dict) 

1048 # All fields left in the snapshot_dict are unrecognized. 

1049 return cls(*common_args, meta, signed_dict) 

1050 

1051 def to_dict(self) -> dict[str, Any]: 

1052 """Return the dict representation of self.""" 

1053 snapshot_dict = self._common_fields_to_dict() 

1054 meta_dict = {} 

1055 for meta_path, meta_info in self.meta.items(): 

1056 meta_dict[meta_path] = meta_info.to_dict() 

1057 

1058 snapshot_dict["meta"] = meta_dict 

1059 return snapshot_dict 

1060 

1061 

1062class DelegatedRole(Role): 

1063 """A container with information about a delegated role. 

1064 

1065 A delegation can happen in two ways: 

1066 

1067 - ``paths`` is set: delegates targets matching any path pattern in 

1068 ``paths`` 

1069 - ``path_hash_prefixes`` is set: delegates targets whose target path 

1070 hash starts with any of the prefixes in ``path_hash_prefixes`` 

1071 

1072 ``paths`` and ``path_hash_prefixes`` are mutually exclusive: 

1073 both cannot be set, at least one of them must be set. 

1074 

1075 *All parameters named below are not just constructor arguments but also 

1076 instance attributes.* 

1077 

1078 Args: 

1079 name: Delegated role name. 

1080 keyids: Delegated role signing key identifiers. 

1081 threshold: Number of keys required to sign this role's metadata. 

1082 terminating: ``True`` if this delegation terminates a target lookup. 

1083 paths: Path patterns. See note above. 

1084 path_hash_prefixes: Hash prefixes. See note above. 

1085 unrecognized_fields: Dictionary of all attributes that are not managed 

1086 by TUF Metadata API. 

1087 

1088 Raises: 

1089 ValueError: Invalid arguments. 

1090 """ 

1091 

1092 def __init__( 

1093 self, 

1094 name: str, 

1095 keyids: list[str], 

1096 threshold: int, 

1097 terminating: bool, 

1098 paths: list[str] | None = None, 

1099 path_hash_prefixes: list[str] | None = None, 

1100 unrecognized_fields: dict[str, Any] | None = None, 

1101 ): 

1102 super().__init__(keyids, threshold, unrecognized_fields) 

1103 self.name = name 

1104 self.terminating = terminating 

1105 exclusive_vars = [paths, path_hash_prefixes] 

1106 if sum(1 for var in exclusive_vars if var is not None) != 1: 

1107 raise ValueError( 

1108 "Only one of (paths, path_hash_prefixes) must be set" 

1109 ) 

1110 

1111 if paths is not None and any(not isinstance(p, str) for p in paths): 

1112 raise ValueError("Paths must be strings") 

1113 if path_hash_prefixes is not None and any( 

1114 not isinstance(p, str) for p in path_hash_prefixes 

1115 ): 

1116 raise ValueError("Path_hash_prefixes must be strings") 

1117 

1118 self.paths = paths 

1119 self.path_hash_prefixes = path_hash_prefixes 

1120 

1121 def __eq__(self, other: object) -> bool: 

1122 if not isinstance(other, DelegatedRole): 

1123 return False 

1124 

1125 return ( 

1126 super().__eq__(other) 

1127 and self.name == other.name 

1128 and self.terminating == other.terminating 

1129 and self.paths == other.paths 

1130 and self.path_hash_prefixes == other.path_hash_prefixes 

1131 ) 

1132 

1133 def __hash__(self) -> int: 

1134 return hash( 

1135 ( 

1136 super().__hash__(), 

1137 self.name, 

1138 self.terminating, 

1139 self.path, 

1140 self.path_hash_prefixes, 

1141 ) 

1142 ) 

1143 

1144 @classmethod 

1145 def from_dict(cls, role_dict: dict[str, Any]) -> DelegatedRole: 

1146 """Create ``DelegatedRole`` object from its json/dict representation. 

1147 

1148 Raises: 

1149 ValueError, KeyError, TypeError: Invalid arguments. 

1150 """ 

1151 name = role_dict.pop("name") 

1152 keyids = role_dict.pop("keyids") 

1153 threshold = role_dict.pop("threshold") 

1154 terminating = role_dict.pop("terminating") 

1155 paths = role_dict.pop("paths", None) 

1156 path_hash_prefixes = role_dict.pop("path_hash_prefixes", None) 

1157 # All fields left in the role_dict are unrecognized. 

1158 return cls( 

1159 name, 

1160 keyids, 

1161 threshold, 

1162 terminating, 

1163 paths, 

1164 path_hash_prefixes, 

1165 role_dict, 

1166 ) 

1167 

1168 def to_dict(self) -> dict[str, Any]: 

1169 """Return the dict representation of self.""" 

1170 base_role_dict = super().to_dict() 

1171 res_dict = { 

1172 "name": self.name, 

1173 "terminating": self.terminating, 

1174 **base_role_dict, 

1175 } 

1176 if self.paths is not None: 

1177 res_dict["paths"] = self.paths 

1178 elif self.path_hash_prefixes is not None: 

1179 res_dict["path_hash_prefixes"] = self.path_hash_prefixes 

1180 return res_dict 

1181 

1182 @staticmethod 

1183 def _is_target_in_pathpattern(targetpath: str, pathpattern: str) -> bool: 

1184 """Determine whether ``targetpath`` matches the ``pathpattern``.""" 

1185 # We need to make sure that targetpath and pathpattern are pointing to 

1186 # the same directory as fnmatch doesn't threat "/" as a special symbol. 

1187 target_parts = targetpath.split("/") 

1188 pattern_parts = pathpattern.split("/") 

1189 if len(target_parts) != len(pattern_parts): 

1190 return False 

1191 

1192 # Every part in the pathpattern could include a glob pattern, that's why 

1193 # each of the target and pathpattern parts should match. 

1194 for target, pattern in zip(target_parts, pattern_parts, strict=True): 

1195 if not fnmatch.fnmatchcase(target, pattern): 

1196 return False 

1197 

1198 return True 

1199 

1200 def is_delegated_path(self, target_filepath: str) -> bool: 

1201 """Determine whether the given ``target_filepath`` is in one of 

1202 the paths that ``DelegatedRole`` is trusted to provide. 

1203 

1204 The ``target_filepath`` and the ``DelegatedRole`` paths are expected to 

1205 be in their canonical forms, so e.g. "a/b" instead of "a//b" . Only "/" 

1206 is supported as target path separator. Leading separators are not 

1207 handled as special cases (see `TUF specification on targetpath 

1208 <https://theupdateframework.github.io/specification/latest/#targetpath>`_). 

1209 

1210 Args: 

1211 target_filepath: URL path to a target file, relative to a base 

1212 targets URL. 

1213 """ 

1214 

1215 if self.path_hash_prefixes is not None: 

1216 # Calculate the hash of the filepath 

1217 # to determine in which bin to find the target. 

1218 digest_object = hashlib.new(name="sha256") 

1219 digest_object.update(target_filepath.encode("utf-8")) 

1220 target_filepath_hash = digest_object.hexdigest() 

1221 

1222 for path_hash_prefix in self.path_hash_prefixes: 

1223 if target_filepath_hash.startswith(path_hash_prefix): 

1224 return True 

1225 

1226 elif self.paths is not None: 

1227 for pathpattern in self.paths: 

1228 # A delegated role path may be an explicit path or glob 

1229 # pattern (Unix shell-style wildcards). 

1230 if self._is_target_in_pathpattern(target_filepath, pathpattern): 

1231 return True 

1232 

1233 return False 

1234 

1235 

1236class SuccinctRoles(Role): 

1237 """Succinctly defines a hash bin delegation graph. 

1238 

1239 A ``SuccinctRoles`` object describes a delegation graph that covers all 

1240 targets, distributing them uniformly over the delegated roles (i.e. bins) 

1241 in the graph. 

1242 

1243 The total number of bins is 2 to the power of the passed ``bit_length``. 

1244 

1245 Bin names are the concatenation of the passed ``name_prefix`` and a 

1246 zero-padded hex representation of the bin index separated by a hyphen. 

1247 

1248 The passed ``keyids`` and ``threshold`` is used for each bin, and each bin 

1249 is 'terminating'. 

1250 

1251 For details: https://github.com/theupdateframework/taps/blob/master/tap15.md 

1252 

1253 Args: 

1254 keyids: Signing key identifiers for any bin metadata. 

1255 threshold: Number of keys required to sign any bin metadata. 

1256 bit_length: Number of bits between 1 and 32. 

1257 name_prefix: Prefix of all bin names. 

1258 unrecognized_fields: Dictionary of all attributes that are not managed 

1259 by TUF Metadata API. 

1260 

1261 Raises: 

1262 ValueError, TypeError, AttributeError: Invalid arguments. 

1263 """ 

1264 

1265 def __init__( 

1266 self, 

1267 keyids: list[str], 

1268 threshold: int, 

1269 bit_length: int, 

1270 name_prefix: str, 

1271 unrecognized_fields: dict[str, Any] | None = None, 

1272 ) -> None: 

1273 super().__init__(keyids, threshold, unrecognized_fields) 

1274 

1275 if bit_length <= 0 or bit_length > 32: 

1276 raise ValueError("bit_length must be between 1 and 32") 

1277 if not isinstance(name_prefix, str): 

1278 raise ValueError("name_prefix must be a string") 

1279 

1280 self.bit_length = bit_length 

1281 self.name_prefix = name_prefix 

1282 

1283 # Calculate the suffix_len value based on the total number of bins in 

1284 # hex. If bit_length = 10 then number_of_bins = 1024 or bin names will 

1285 # have a suffix between "000" and "3ff" in hex and suffix_len will be 3 

1286 # meaning the third bin will have a suffix of "003". 

1287 self.number_of_bins = 2**bit_length 

1288 # suffix_len is calculated based on "number_of_bins - 1" as the name 

1289 # of the last bin contains the number "number_of_bins -1" as a suffix. 

1290 self.suffix_len = len(f"{self.number_of_bins - 1:x}") 

1291 

1292 def __eq__(self, other: object) -> bool: 

1293 if not isinstance(other, SuccinctRoles): 

1294 return False 

1295 

1296 return ( 

1297 super().__eq__(other) 

1298 and self.bit_length == other.bit_length 

1299 and self.name_prefix == other.name_prefix 

1300 ) 

1301 

1302 def __hash__(self) -> int: 

1303 return hash((super().__hash__(), self.bit_length, self.name_prefix)) 

1304 

1305 @classmethod 

1306 def from_dict(cls, role_dict: dict[str, Any]) -> SuccinctRoles: 

1307 """Create ``SuccinctRoles`` object from its json/dict representation. 

1308 

1309 Raises: 

1310 ValueError, KeyError, AttributeError, TypeError: Invalid arguments. 

1311 """ 

1312 keyids = role_dict.pop("keyids") 

1313 threshold = role_dict.pop("threshold") 

1314 bit_length = role_dict.pop("bit_length") 

1315 name_prefix = role_dict.pop("name_prefix") 

1316 # All fields left in the role_dict are unrecognized. 

1317 return cls(keyids, threshold, bit_length, name_prefix, role_dict) 

1318 

1319 def to_dict(self) -> dict[str, Any]: 

1320 """Return the dict representation of self.""" 

1321 base_role_dict = super().to_dict() 

1322 return { 

1323 "bit_length": self.bit_length, 

1324 "name_prefix": self.name_prefix, 

1325 **base_role_dict, 

1326 } 

1327 

1328 def get_role_for_target(self, target_filepath: str) -> str: 

1329 """Calculate the name of the delegated role responsible for 

1330 ``target_filepath``. 

1331 

1332 The target at path ``target_filepath`` is assigned to a bin by casting 

1333 the left-most ``bit_length`` of bits of the file path hash digest to 

1334 int, using it as bin index between 0 and ``2**bit_length - 1``. 

1335 

1336 Args: 

1337 target_filepath: URL path to a target file, relative to a base 

1338 targets URL. 

1339 """ 

1340 hasher = hashlib.new(name="sha256") 

1341 hasher.update(target_filepath.encode("utf-8")) 

1342 

1343 # We can't ever need more than 4 bytes (32 bits). 

1344 hash_bytes = hasher.digest()[:4] 

1345 # Right shift hash bytes, so that we only have the leftmost 

1346 # bit_length bits that we care about. 

1347 shift_value = 32 - self.bit_length 

1348 bin_number = int.from_bytes(hash_bytes, byteorder="big") >> shift_value 

1349 # Add zero padding if necessary and cast to hex the suffix. 

1350 suffix = f"{bin_number:0{self.suffix_len}x}" 

1351 return f"{self.name_prefix}-{suffix}" 

1352 

1353 def get_roles(self) -> Iterator[str]: 

1354 """Yield the names of all different delegated roles one by one.""" 

1355 for i in range(self.number_of_bins): 

1356 suffix = f"{i:0{self.suffix_len}x}" 

1357 yield f"{self.name_prefix}-{suffix}" 

1358 

1359 def is_delegated_role(self, role_name: str) -> bool: 

1360 """Determine whether the given ``role_name`` is in one of 

1361 the delegated roles that ``SuccinctRoles`` represents. 

1362 

1363 Args: 

1364 role_name: The name of the role to check against. 

1365 """ 

1366 desired_prefix = self.name_prefix + "-" 

1367 

1368 if not role_name.startswith(desired_prefix): 

1369 return False 

1370 

1371 suffix = role_name[len(desired_prefix) :] 

1372 if len(suffix) != self.suffix_len: 

1373 return False 

1374 

1375 try: 

1376 # make sure suffix is hex value 

1377 num = int(suffix, 16) 

1378 except ValueError: 

1379 return False 

1380 

1381 return 0 <= num < self.number_of_bins 

1382 

1383 

1384class Delegations: 

1385 """A container object storing information about all delegations. 

1386 

1387 *All parameters named below are not just constructor arguments but also 

1388 instance attributes.* 

1389 

1390 Args: 

1391 keys: Dictionary of keyids to Keys. Defines the keys used in ``roles``. 

1392 roles: Ordered dictionary of role names to DelegatedRoles instances. It 

1393 defines which keys are required to sign the metadata for a specific 

1394 role. The roles order also defines the order that role delegations 

1395 are considered during target searches. 

1396 succinct_roles: Contains succinct information about hash bin 

1397 delegations. Note that succinct roles is not a TUF specification 

1398 feature yet and setting `succinct_roles` to a value makes the 

1399 resulting metadata non-compliant. The metadata will not be accepted 

1400 as valid by specification compliant clients such as those built with 

1401 python-tuf <= 1.1.0. For more information see: https://github.com/theupdateframework/taps/blob/master/tap15.md 

1402 unrecognized_fields: Dictionary of all attributes that are not managed 

1403 by TUF Metadata API 

1404 

1405 Exactly one of ``roles`` and ``succinct_roles`` must be set. 

1406 

1407 Raises: 

1408 ValueError: Invalid arguments. 

1409 """ 

1410 

1411 def __init__( 

1412 self, 

1413 keys: dict[str, Key], 

1414 roles: dict[str, DelegatedRole] | None = None, 

1415 succinct_roles: SuccinctRoles | None = None, 

1416 unrecognized_fields: dict[str, Any] | None = None, 

1417 ): 

1418 self.keys = keys 

1419 if sum(1 for v in [roles, succinct_roles] if v is not None) != 1: 

1420 raise ValueError("One of roles and succinct_roles must be set") 

1421 

1422 if roles is not None: 

1423 for role in roles: 

1424 if not role or role in TOP_LEVEL_ROLE_NAMES: 

1425 raise ValueError( 

1426 "Delegated roles cannot be empty or use top-level " 

1427 "role names" 

1428 ) 

1429 

1430 self.roles = roles 

1431 self.succinct_roles = succinct_roles 

1432 if unrecognized_fields is None: 

1433 unrecognized_fields = {} 

1434 

1435 self.unrecognized_fields = unrecognized_fields 

1436 

1437 def __eq__(self, other: object) -> bool: 

1438 if not isinstance(other, Delegations): 

1439 return False 

1440 

1441 all_attributes_check = ( 

1442 self.keys == other.keys 

1443 and self.roles == other.roles 

1444 and self.succinct_roles == other.succinct_roles 

1445 and self.unrecognized_fields == other.unrecognized_fields 

1446 ) 

1447 

1448 if self.roles is not None and other.roles is not None: 

1449 all_attributes_check = ( 

1450 all_attributes_check 

1451 # Order of the delegated roles matters (see issue #1788). 

1452 and list(self.roles.items()) == list(other.roles.items()) 

1453 ) 

1454 

1455 return all_attributes_check 

1456 

1457 def __hash__(self) -> int: 

1458 return hash( 

1459 ( 

1460 self.keys, 

1461 self.roles, 

1462 self.succinct_roles, 

1463 self.unrecognized_fields, 

1464 ) 

1465 ) 

1466 

1467 @classmethod 

1468 def from_dict(cls, delegations_dict: dict[str, Any]) -> Delegations: 

1469 """Create ``Delegations`` object from its json/dict representation. 

1470 

1471 Raises: 

1472 ValueError, KeyError, TypeError: Invalid arguments. 

1473 """ 

1474 keys = delegations_dict.pop("keys") 

1475 keys_res = {} 

1476 for keyid, key_dict in keys.items(): 

1477 keys_res[keyid] = Key.from_dict(keyid, key_dict) 

1478 roles = delegations_dict.pop("roles", None) 

1479 roles_res: dict[str, DelegatedRole] | None = None 

1480 

1481 if roles is not None: 

1482 roles_res = {} 

1483 for role_dict in roles: 

1484 new_role = DelegatedRole.from_dict(role_dict) 

1485 if new_role.name in roles_res: 

1486 raise ValueError(f"Duplicate role {new_role.name}") 

1487 roles_res[new_role.name] = new_role 

1488 

1489 succinct_roles_dict = delegations_dict.pop("succinct_roles", None) 

1490 succinct_roles_info = None 

1491 if succinct_roles_dict is not None: 

1492 succinct_roles_info = SuccinctRoles.from_dict(succinct_roles_dict) 

1493 

1494 # All fields left in the delegations_dict are unrecognized. 

1495 return cls(keys_res, roles_res, succinct_roles_info, delegations_dict) 

1496 

1497 def to_dict(self) -> dict[str, Any]: 

1498 """Return the dict representation of self.""" 

1499 keys = {keyid: key.to_dict() for keyid, key in self.keys.items()} 

1500 res_dict: dict[str, Any] = { 

1501 "keys": keys, 

1502 **self.unrecognized_fields, 

1503 } 

1504 if self.roles is not None: 

1505 roles = [role_obj.to_dict() for role_obj in self.roles.values()] 

1506 res_dict["roles"] = roles 

1507 elif self.succinct_roles is not None: 

1508 res_dict["succinct_roles"] = self.succinct_roles.to_dict() 

1509 

1510 return res_dict 

1511 

1512 def get_roles_for_target( 

1513 self, target_filepath: str 

1514 ) -> Iterator[tuple[str, bool]]: 

1515 """Given ``target_filepath`` get names and terminating status of all 

1516 delegated roles who are responsible for it. 

1517 

1518 Args: 

1519 target_filepath: URL path to a target file, relative to a base 

1520 targets URL. 

1521 """ 

1522 if self.roles is not None: 

1523 for role in self.roles.values(): 

1524 if role.is_delegated_path(target_filepath): 

1525 yield role.name, role.terminating 

1526 

1527 elif self.succinct_roles is not None: 

1528 # We consider all succinct_roles as terminating. 

1529 # For more information read TAP 15. 

1530 yield self.succinct_roles.get_role_for_target(target_filepath), True 

1531 

1532 

1533class TargetFile(BaseFile): 

1534 """A container with information about a particular target file. 

1535 

1536 *All parameters named below are not just constructor arguments but also 

1537 instance attributes.* 

1538 

1539 Args: 

1540 length: Length of the target file in bytes. 

1541 hashes: Dictionary of hash algorithm names to hashes of the target 

1542 file content. 

1543 path: URL path to a target file, relative to a base targets URL. 

1544 unrecognized_fields: Dictionary of all attributes that are not managed 

1545 by TUF Metadata API 

1546 

1547 Raises: 

1548 ValueError, TypeError: Invalid arguments. 

1549 """ 

1550 

1551 def __init__( 

1552 self, 

1553 length: int, 

1554 hashes: dict[str, str], 

1555 path: str, 

1556 unrecognized_fields: dict[str, Any] | None = None, 

1557 ): 

1558 self._validate_length(length) 

1559 self._validate_hashes(hashes) 

1560 

1561 self.length = length 

1562 self.hashes = hashes 

1563 self.path = path 

1564 if unrecognized_fields is None: 

1565 unrecognized_fields = {} 

1566 

1567 self.unrecognized_fields = unrecognized_fields 

1568 

1569 @property 

1570 def custom(self) -> Any: # noqa: ANN401 

1571 """Get implementation specific data related to the target. 

1572 

1573 python-tuf does not use or validate this data. 

1574 """ 

1575 return self.unrecognized_fields.get("custom") 

1576 

1577 def __eq__(self, other: object) -> bool: 

1578 if not isinstance(other, TargetFile): 

1579 return False 

1580 

1581 return ( 

1582 self.length == other.length 

1583 and self.hashes == other.hashes 

1584 and self.path == other.path 

1585 and self.unrecognized_fields == other.unrecognized_fields 

1586 ) 

1587 

1588 def __hash__(self) -> int: 

1589 return hash( 

1590 (self.length, self.hashes, self.path, self.unrecognized_fields) 

1591 ) 

1592 

1593 @classmethod 

1594 def from_dict(cls, target_dict: dict[str, Any], path: str) -> TargetFile: 

1595 """Create ``TargetFile`` object from its json/dict representation. 

1596 

1597 Raises: 

1598 ValueError, KeyError, TypeError: Invalid arguments. 

1599 """ 

1600 length = target_dict.pop("length") 

1601 hashes = target_dict.pop("hashes") 

1602 

1603 # All fields left in the target_dict are unrecognized. 

1604 return cls(length, hashes, path, target_dict) 

1605 

1606 def to_dict(self) -> dict[str, Any]: 

1607 """Return the JSON-serializable dictionary representation of self.""" 

1608 return { 

1609 "length": self.length, 

1610 "hashes": self.hashes, 

1611 **self.unrecognized_fields, 

1612 } 

1613 

1614 @classmethod 

1615 def from_file( 

1616 cls, 

1617 target_file_path: str, 

1618 local_path: str, 

1619 hash_algorithms: list[str] | None = None, 

1620 ) -> TargetFile: 

1621 """Create ``TargetFile`` object from a file. 

1622 

1623 Args: 

1624 target_file_path: URL path to a target file, relative to a base 

1625 targets URL. 

1626 local_path: Local path to target file content. 

1627 hash_algorithms: Hash algorithms to calculate hashes with. If not 

1628 specified, "sha256" is used. 

1629 

1630 Raises: 

1631 FileNotFoundError: The file doesn't exist. 

1632 ValueError: The hash algorithms list contains an unsupported 

1633 algorithm. 

1634 """ 

1635 with open(local_path, "rb") as file: 

1636 return cls.from_data(target_file_path, file, hash_algorithms) 

1637 

1638 @classmethod 

1639 def from_data( 

1640 cls, 

1641 target_file_path: str, 

1642 data: bytes | IO[bytes], 

1643 hash_algorithms: list[str] | None = None, 

1644 ) -> TargetFile: 

1645 """Create ``TargetFile`` object from bytes. 

1646 

1647 Args: 

1648 target_file_path: URL path to a target file, relative to a base 

1649 targets URL. 

1650 data: Target file content. 

1651 hash_algorithms: Hash algorithms to create the hashes with. If not 

1652 specified, "sha256" is used. 

1653 

1654 Raises: 

1655 ValueError: The hash algorithms list contains an unsupported 

1656 algorithm. 

1657 """ 

1658 length, hashes = cls._get_length_and_hashes(data, hash_algorithms) 

1659 return cls(length, hashes, target_file_path) 

1660 

1661 def verify_length_and_hashes(self, data: bytes | IO[bytes]) -> None: 

1662 """Verify that length and hashes of ``data`` match expected values. 

1663 

1664 Args: 

1665 data: Target file object or its content in bytes. 

1666 

1667 Raises: 

1668 LengthOrHashMismatchError: Calculated length or hashes do not 

1669 match expected values or hash algorithm is not supported. 

1670 """ 

1671 self._verify_length(data, self.length) 

1672 self._verify_hashes(data, self.hashes) 

1673 

1674 def get_prefixed_paths(self) -> list[str]: 

1675 """ 

1676 Return hash-prefixed URL path fragments for the target file path. 

1677 """ 

1678 paths = [] 

1679 parent, sep, name = self.path.rpartition("/") 

1680 for hash_value in self.hashes.values(): 

1681 paths.append(f"{parent}{sep}{hash_value}.{name}") 

1682 

1683 return paths 

1684 

1685 

1686class Targets(Signed, _DelegatorMixin): 

1687 """A container for the signed part of targets metadata. 

1688 

1689 Targets contains verifying information about target files and also 

1690 delegates responsibility to other Targets roles. 

1691 

1692 *All parameters named below are not just constructor arguments but also 

1693 instance attributes.* 

1694 

1695 Args: 

1696 version: Metadata version number. Default is 1. 

1697 spec_version: Supported TUF specification version. Default is the 

1698 version currently supported by the library. 

1699 expires: Metadata expiry date. Default is current date and time. 

1700 targets: Dictionary of target filenames to TargetFiles. Default is an 

1701 empty dictionary. 

1702 delegations: Defines how this Targets delegates responsibility to other 

1703 Targets Metadata files. Default is None. 

1704 unrecognized_fields: Dictionary of all attributes that are not managed 

1705 by TUF Metadata API 

1706 

1707 Raises: 

1708 ValueError: Invalid arguments. 

1709 """ 

1710 

1711 type = _TARGETS 

1712 

1713 def __init__( 

1714 self, 

1715 version: int | None = None, 

1716 spec_version: str | None = None, 

1717 expires: datetime | None = None, 

1718 targets: dict[str, TargetFile] | None = None, 

1719 delegations: Delegations | None = None, 

1720 unrecognized_fields: dict[str, Any] | None = None, 

1721 ) -> None: 

1722 super().__init__(version, spec_version, expires, unrecognized_fields) 

1723 self.targets = targets if targets is not None else {} 

1724 self.delegations = delegations 

1725 

1726 def __eq__(self, other: object) -> bool: 

1727 if not isinstance(other, Targets): 

1728 return False 

1729 

1730 return ( 

1731 super().__eq__(other) 

1732 and self.targets == other.targets 

1733 and self.delegations == other.delegations 

1734 ) 

1735 

1736 def __hash__(self) -> int: 

1737 return hash((super().__hash__(), self.targets, self.delegations)) 

1738 

1739 @classmethod 

1740 def from_dict(cls, signed_dict: dict[str, Any]) -> Targets: 

1741 """Create ``Targets`` object from its json/dict representation. 

1742 

1743 Raises: 

1744 ValueError, KeyError, TypeError: Invalid arguments. 

1745 """ 

1746 common_args = cls._common_fields_from_dict(signed_dict) 

1747 targets = signed_dict.pop(_TARGETS) 

1748 try: 

1749 delegations_dict = signed_dict.pop("delegations") 

1750 except KeyError: 

1751 delegations = None 

1752 else: 

1753 delegations = Delegations.from_dict(delegations_dict) 

1754 res_targets = {} 

1755 for target_path, target_info in targets.items(): 

1756 res_targets[target_path] = TargetFile.from_dict( 

1757 target_info, target_path 

1758 ) 

1759 # All fields left in the targets_dict are unrecognized. 

1760 return cls(*common_args, res_targets, delegations, signed_dict) 

1761 

1762 def to_dict(self) -> dict[str, Any]: 

1763 """Return the dict representation of self.""" 

1764 targets_dict = self._common_fields_to_dict() 

1765 targets = {} 

1766 for target_path, target_file_obj in self.targets.items(): 

1767 targets[target_path] = target_file_obj.to_dict() 

1768 targets_dict[_TARGETS] = targets 

1769 if self.delegations is not None: 

1770 targets_dict["delegations"] = self.delegations.to_dict() 

1771 return targets_dict 

1772 

1773 def add_key(self, key: Key, role: str | None = None) -> None: 

1774 """Add new signing key for delegated role ``role``. 

1775 

1776 If succinct_roles is used then the ``role`` argument is not required. 

1777 

1778 Args: 

1779 key: Signing key to be added for ``role``. 

1780 role: Name of the role, for which ``key`` is added. 

1781 

1782 Raises: 

1783 ValueError: If the argument order is wrong or if there are no 

1784 delegated roles or if ``role`` is not delegated by this Target. 

1785 """ 

1786 # Verify that our users are not using the old argument order. 

1787 if isinstance(role, Key): 

1788 raise ValueError("Role must be a string, not a Key instance") 

1789 

1790 if self.delegations is None: 

1791 raise ValueError(f"Delegated role {role} doesn't exist") 

1792 

1793 if self.delegations.roles is not None: 

1794 if role not in self.delegations.roles: 

1795 raise ValueError(f"Delegated role {role} doesn't exist") 

1796 if key.keyid not in self.delegations.roles[role].keyids: 

1797 self.delegations.roles[role].keyids.append(key.keyid) 

1798 

1799 elif self.delegations.succinct_roles is not None: 

1800 if key.keyid not in self.delegations.succinct_roles.keyids: 

1801 self.delegations.succinct_roles.keyids.append(key.keyid) 

1802 

1803 self.delegations.keys[key.keyid] = key 

1804 

1805 def revoke_key(self, keyid: str, role: str | None = None) -> None: 

1806 """Revokes key from delegated role ``role`` and updates the delegations 

1807 key store. 

1808 

1809 If succinct_roles is used then the ``role`` argument is not required. 

1810 

1811 Args: 

1812 keyid: Identifier of the key to be removed for ``role``. 

1813 role: Name of the role, for which a signing key is removed. 

1814 

1815 Raises: 

1816 ValueError: If there are no delegated roles or if ``role`` is not 

1817 delegated by this ``Target`` or if key is not used by ``role`` 

1818 or if key with id ``keyid`` is not used by succinct roles. 

1819 """ 

1820 if self.delegations is None: 

1821 raise ValueError(f"Delegated role {role} doesn't exist") 

1822 

1823 if self.delegations.roles is not None: 

1824 if role not in self.delegations.roles: 

1825 raise ValueError(f"Delegated role {role} doesn't exist") 

1826 if keyid not in self.delegations.roles[role].keyids: 

1827 raise ValueError(f"Key with id {keyid} is not used by {role}") 

1828 

1829 self.delegations.roles[role].keyids.remove(keyid) 

1830 for keyinfo in self.delegations.roles.values(): 

1831 if keyid in keyinfo.keyids: 

1832 return 

1833 

1834 elif self.delegations.succinct_roles is not None: 

1835 if keyid not in self.delegations.succinct_roles.keyids: 

1836 raise ValueError( 

1837 f"Key with id {keyid} is not used by succinct_roles" 

1838 ) 

1839 

1840 self.delegations.succinct_roles.keyids.remove(keyid) 

1841 

1842 del self.delegations.keys[keyid] 

1843 

1844 def get_delegated_role(self, delegated_role: str) -> Role: 

1845 """Return the role object for the given delegated role. 

1846 

1847 Raises ValueError if delegated_role is not actually delegated. 

1848 """ 

1849 if self.delegations is None: 

1850 raise ValueError("No delegations found") 

1851 

1852 role: Role | None = None 

1853 if self.delegations.roles is not None: 

1854 role = self.delegations.roles.get(delegated_role) 

1855 elif self.delegations.succinct_roles is not None: 

1856 succinct = self.delegations.succinct_roles 

1857 if succinct.is_delegated_role(delegated_role): 

1858 role = succinct 

1859 

1860 if not role: 

1861 raise ValueError(f"Delegated role {delegated_role} not found") 

1862 

1863 return role 

1864 

1865 def get_key(self, keyid: str) -> Key: 

1866 if self.delegations is None: 

1867 raise ValueError("No delegations found") 

1868 if keyid not in self.delegations.keys: 

1869 raise ValueError(f"Key {keyid} not found") 

1870 

1871 return self.delegations.keys[keyid]