Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tuf/api/_payload.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

659 statements  

1# Copyright the TUF contributors 

2# SPDX-License-Identifier: MIT OR Apache-2.0 

3 

4 

5"""Helper classes for low-level Metadata API.""" 

6 

7from __future__ import annotations 

8 

9import abc 

10import fnmatch 

11import io 

12import logging 

13from dataclasses import dataclass 

14from datetime import datetime, timezone 

15from typing import ( 

16 IO, 

17 TYPE_CHECKING, 

18 Any, 

19 ClassVar, 

20 TypeVar, 

21) 

22 

23from securesystemslib import exceptions as sslib_exceptions 

24from securesystemslib import hash as sslib_hash 

25from securesystemslib.signer import Key, Signature 

26 

27from tuf.api.exceptions import LengthOrHashMismatchError, UnsignedMetadataError 

28 

29if TYPE_CHECKING: 

30 from collections.abc import Iterator 

31 

32_ROOT = "root" 

33_SNAPSHOT = "snapshot" 

34_TARGETS = "targets" 

35_TIMESTAMP = "timestamp" 

36 

37# We aim to support SPECIFICATION_VERSION and require the input metadata 

38# files to have the same major version (the first number) as ours. 

39SPECIFICATION_VERSION = ["1", "0", "31"] 

40TOP_LEVEL_ROLE_NAMES = {_ROOT, _TIMESTAMP, _SNAPSHOT, _TARGETS} 

41 

42logger = logging.getLogger(__name__) 

43 

44# T is a Generic type constraint for container payloads 

45T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets") 

46 

47 

48class Signed(metaclass=abc.ABCMeta): 

49 """A base class for the signed part of TUF metadata. 

50 

51 Objects with base class Signed are usually included in a ``Metadata`` object 

52 on the signed attribute. This class provides attributes and methods that 

53 are common for all TUF metadata types (roles). 

54 

55 *All parameters named below are not just constructor arguments but also 

56 instance attributes.* 

57 

58 Args: 

59 version: Metadata version number. If None, then 1 is assigned. 

60 spec_version: Supported TUF specification version. If None, then the 

61 version currently supported by the library is assigned. 

62 expires: Metadata expiry date in UTC timezone. If None, then current 

63 date and time is assigned. 

64 unrecognized_fields: Dictionary of all attributes that are not managed 

65 by TUF Metadata API 

66 

67 Raises: 

68 ValueError: Invalid arguments. 

69 """ 

70 

71 # type is required for static reference without changing the API 

72 type: ClassVar[str] = "signed" 

73 

74 # _type and type are identical: 1st replicates file format, 2nd passes lint 

75 @property 

76 def _type(self) -> str: 

77 return self.type 

78 

79 @property 

80 def expires(self) -> datetime: 

81 """Get the metadata expiry date.""" 

82 return self._expires 

83 

84 @expires.setter 

85 def expires(self, value: datetime) -> None: 

86 """Set the metadata expiry date. 

87 

88 # Use 'datetime' module to e.g. expire in seven days from now 

89 obj.expires = now(timezone.utc) + timedelta(days=7) 

90 """ 

91 self._expires = value.replace(microsecond=0) 

92 if self._expires.tzinfo is None: 

93 # Naive datetime: just make it UTC 

94 self._expires = self._expires.replace(tzinfo=timezone.utc) 

95 elif self._expires.tzinfo != timezone.utc: 

96 raise ValueError(f"Expected tz UTC, not {self._expires.tzinfo}") 

97 

98 # NOTE: Signed is a stupid name, because this might not be signed yet, but 

99 # we keep it to match spec terminology (I often refer to this as "payload", 

100 # or "inner metadata") 

101 def __init__( 

102 self, 

103 version: int | None, 

104 spec_version: str | None, 

105 expires: datetime | None, 

106 unrecognized_fields: dict[str, Any] | None, 

107 ): 

108 if spec_version is None: 

109 spec_version = ".".join(SPECIFICATION_VERSION) 

110 # Accept semver (X.Y.Z) but also X.Y for legacy compatibility 

111 spec_list = spec_version.split(".") 

112 if len(spec_list) not in [2, 3] or not all( 

113 el.isdigit() for el in spec_list 

114 ): 

115 raise ValueError(f"Failed to parse spec_version {spec_version}") 

116 

117 # major version must match 

118 if spec_list[0] != SPECIFICATION_VERSION[0]: 

119 raise ValueError(f"Unsupported spec_version {spec_version}") 

120 

121 self.spec_version = spec_version 

122 

123 self.expires = expires or datetime.now(timezone.utc) 

124 

125 if version is None: 

126 version = 1 

127 elif version <= 0: 

128 raise ValueError(f"version must be > 0, got {version}") 

129 self.version = version 

130 

131 if unrecognized_fields is None: 

132 unrecognized_fields = {} 

133 

134 self.unrecognized_fields = unrecognized_fields 

135 

136 def __eq__(self, other: object) -> bool: 

137 if not isinstance(other, Signed): 

138 return False 

139 

140 return ( 

141 self.type == other.type 

142 and self.version == other.version 

143 and self.spec_version == other.spec_version 

144 and self.expires == other.expires 

145 and self.unrecognized_fields == other.unrecognized_fields 

146 ) 

147 

148 @abc.abstractmethod 

149 def to_dict(self) -> dict[str, Any]: 

150 """Serialize and return a dict representation of self.""" 

151 raise NotImplementedError 

152 

153 @classmethod 

154 @abc.abstractmethod 

155 def from_dict(cls, signed_dict: dict[str, Any]) -> Signed: 

156 """Deserialization helper, creates object from json/dict 

157 representation. 

158 """ 

159 raise NotImplementedError 

160 

161 @classmethod 

162 def _common_fields_from_dict( 

163 cls, signed_dict: dict[str, Any] 

164 ) -> tuple[int, str, datetime]: 

165 """Return common fields of ``Signed`` instances from the passed dict 

166 representation, and returns an ordered list to be passed as leading 

167 positional arguments to a subclass constructor. 

168 

169 See ``{Root, Timestamp, Snapshot, Targets}.from_dict`` 

170 methods for usage. 

171 

172 """ 

173 _type = signed_dict.pop("_type") 

174 if _type != cls.type: 

175 raise ValueError(f"Expected type {cls.type}, got {_type}") 

176 

177 version = signed_dict.pop("version") 

178 spec_version = signed_dict.pop("spec_version") 

179 expires_str = signed_dict.pop("expires") 

180 # Convert 'expires' TUF metadata string to a datetime object, which is 

181 # what the constructor expects and what we store. The inverse operation 

182 # is implemented in '_common_fields_to_dict'. 

183 expires = datetime.strptime(expires_str, "%Y-%m-%dT%H:%M:%SZ").replace( 

184 tzinfo=timezone.utc 

185 ) 

186 

187 return version, spec_version, expires 

188 

189 def _common_fields_to_dict(self) -> dict[str, Any]: 

190 """Return a dict representation of common fields of 

191 ``Signed`` instances. 

192 

193 See ``{Root, Timestamp, Snapshot, Targets}.to_dict`` methods for usage. 

194 

195 """ 

196 return { 

197 "_type": self._type, 

198 "version": self.version, 

199 "spec_version": self.spec_version, 

200 "expires": self.expires.strftime("%Y-%m-%dT%H:%M:%SZ"), 

201 **self.unrecognized_fields, 

202 } 

203 

204 def is_expired(self, reference_time: datetime | None = None) -> bool: 

205 """Check metadata expiration against a reference time. 

206 

207 Args: 

208 reference_time: Time to check expiration date against. A naive 

209 datetime in UTC expected. Default is current UTC date and time. 

210 

211 Returns: 

212 ``True`` if expiration time is less than the reference time. 

213 """ 

214 if reference_time is None: 

215 reference_time = datetime.now(timezone.utc) 

216 

217 return reference_time >= self.expires 

218 

219 

220class Role: 

221 """Container that defines which keys are required to sign roles metadata. 

222 

223 Role defines how many keys are required to successfully sign the roles 

224 metadata, and which keys are accepted. 

225 

226 *All parameters named below are not just constructor arguments but also 

227 instance attributes.* 

228 

229 Args: 

230 keyids: Roles signing key identifiers. 

231 threshold: Number of keys required to sign this role's metadata. 

232 unrecognized_fields: Dictionary of all attributes that are not managed 

233 by TUF Metadata API 

234 

235 Raises: 

236 ValueError: Invalid arguments. 

237 """ 

238 

239 def __init__( 

240 self, 

241 keyids: list[str], 

242 threshold: int, 

243 unrecognized_fields: dict[str, Any] | None = None, 

244 ): 

245 if len(set(keyids)) != len(keyids): 

246 raise ValueError(f"Nonunique keyids: {keyids}") 

247 if threshold < 1: 

248 raise ValueError("threshold should be at least 1!") 

249 self.keyids = keyids 

250 self.threshold = threshold 

251 if unrecognized_fields is None: 

252 unrecognized_fields = {} 

253 

254 self.unrecognized_fields = unrecognized_fields 

255 

256 def __eq__(self, other: object) -> bool: 

257 if not isinstance(other, Role): 

258 return False 

259 

260 return ( 

261 self.keyids == other.keyids 

262 and self.threshold == other.threshold 

263 and self.unrecognized_fields == other.unrecognized_fields 

264 ) 

265 

266 @classmethod 

267 def from_dict(cls, role_dict: dict[str, Any]) -> Role: 

268 """Create ``Role`` object from its json/dict representation. 

269 

270 Raises: 

271 ValueError, KeyError: Invalid arguments. 

272 """ 

273 keyids = role_dict.pop("keyids") 

274 threshold = role_dict.pop("threshold") 

275 # All fields left in the role_dict are unrecognized. 

276 return cls(keyids, threshold, role_dict) 

277 

278 def to_dict(self) -> dict[str, Any]: 

279 """Return the dictionary representation of self.""" 

280 return { 

281 "keyids": self.keyids, 

282 "threshold": self.threshold, 

283 **self.unrecognized_fields, 

284 } 

285 

286 

287@dataclass 

288class VerificationResult: 

289 """Signature verification result for delegated role metadata. 

290 

291 Attributes: 

292 threshold: Number of required signatures. 

293 signed: dict of keyid to Key, containing keys that have signed. 

294 unsigned: dict of keyid to Key, containing keys that have not signed. 

295 """ 

296 

297 threshold: int 

298 signed: dict[str, Key] 

299 unsigned: dict[str, Key] 

300 

301 def __bool__(self) -> bool: 

302 return self.verified 

303 

304 @property 

305 def verified(self) -> bool: 

306 """True if threshold of signatures is met.""" 

307 return len(self.signed) >= self.threshold 

308 

309 @property 

310 def missing(self) -> int: 

311 """Number of additional signatures required to reach threshold.""" 

312 return max(0, self.threshold - len(self.signed)) 

313 

314 

315@dataclass 

316class RootVerificationResult: 

317 """Signature verification result for root metadata. 

318 

319 Root must be verified by itself and the previous root version. This 

320 dataclass represents both results. For the edge case of first version 

321 of root, these underlying results are identical. 

322 

323 Note that `signed` and `unsigned` correctness requires the underlying 

324 VerificationResult keys to not conflict (no reusing the same keyid for 

325 different keys). 

326 

327 Attributes: 

328 first: First underlying VerificationResult 

329 second: Second underlying VerificationResult 

330 """ 

331 

332 first: VerificationResult 

333 second: VerificationResult 

334 

335 def __bool__(self) -> bool: 

336 return self.verified 

337 

338 @property 

339 def verified(self) -> bool: 

340 """True if threshold of signatures is met in both underlying 

341 VerificationResults. 

342 """ 

343 return self.first.verified and self.second.verified 

344 

345 @property 

346 def signed(self) -> dict[str, Key]: 

347 """Dictionary of all signing keys that have signed, from both 

348 VerificationResults. 

349 return a union of all signed (in python<3.9 this requires 

350 dict unpacking) 

351 """ 

352 return {**self.first.signed, **self.second.signed} 

353 

354 @property 

355 def unsigned(self) -> dict[str, Key]: 

356 """Dictionary of all signing keys that have not signed, from both 

357 VerificationResults. 

358 return a union of all unsigned (in python<3.9 this requires 

359 dict unpacking) 

360 """ 

361 return {**self.first.unsigned, **self.second.unsigned} 

362 

363 

364class _DelegatorMixin(metaclass=abc.ABCMeta): 

365 """Class that implements verify_delegate() for Root and Targets""" 

366 

367 @abc.abstractmethod 

368 def get_delegated_role(self, delegated_role: str) -> Role: 

369 """Return the role object for the given delegated role. 

370 

371 Raises ValueError if delegated_role is not actually delegated. 

372 """ 

373 raise NotImplementedError 

374 

375 @abc.abstractmethod 

376 def get_key(self, keyid: str) -> Key: 

377 """Return the key object for the given keyid. 

378 

379 Raises ValueError if key is not found. 

380 """ 

381 raise NotImplementedError 

382 

383 def get_verification_result( 

384 self, 

385 delegated_role: str, 

386 payload: bytes, 

387 signatures: dict[str, Signature], 

388 ) -> VerificationResult: 

389 """Return signature threshold verification result for delegated role. 

390 

391 NOTE: Unlike `verify_delegate()` this method does not raise, if the 

392 role metadata is not fully verified. 

393 

394 Args: 

395 delegated_role: Name of the delegated role to verify 

396 payload: Signed payload bytes for the delegated role 

397 signatures: Signatures over payload bytes 

398 

399 Raises: 

400 ValueError: no delegation was found for ``delegated_role``. 

401 """ 

402 role = self.get_delegated_role(delegated_role) 

403 

404 signed = {} 

405 unsigned = {} 

406 

407 for keyid in role.keyids: 

408 try: 

409 key = self.get_key(keyid) 

410 except ValueError: 

411 logger.info("No key for keyid %s", keyid) 

412 continue 

413 

414 if keyid not in signatures: 

415 unsigned[keyid] = key 

416 logger.info("No signature for keyid %s", keyid) 

417 continue 

418 

419 sig = signatures[keyid] 

420 try: 

421 key.verify_signature(sig, payload) 

422 signed[keyid] = key 

423 except sslib_exceptions.UnverifiedSignatureError: 

424 unsigned[keyid] = key 

425 logger.info("Key %s failed to verify %s", keyid, delegated_role) 

426 

427 return VerificationResult(role.threshold, signed, unsigned) 

428 

429 def verify_delegate( 

430 self, 

431 delegated_role: str, 

432 payload: bytes, 

433 signatures: dict[str, Signature], 

434 ) -> None: 

435 """Verify signature threshold for delegated role. 

436 

437 Verify that there are enough valid ``signatures`` over ``payload``, to 

438 meet the threshold of keys for ``delegated_role``, as defined by the 

439 delegator (``self``). 

440 

441 Args: 

442 delegated_role: Name of the delegated role to verify 

443 payload: Signed payload bytes for the delegated role 

444 signatures: Signatures over payload bytes 

445 

446 Raises: 

447 UnsignedMetadataError: ``delegated_role`` was not signed with 

448 required threshold of keys for ``role_name``. 

449 ValueError: no delegation was found for ``delegated_role``. 

450 """ 

451 result = self.get_verification_result( 

452 delegated_role, payload, signatures 

453 ) 

454 if not result: 

455 raise UnsignedMetadataError( 

456 f"{delegated_role} was signed by {len(result.signed)}/" 

457 f"{result.threshold} keys" 

458 ) 

459 

460 

461class Root(Signed, _DelegatorMixin): 

462 """A container for the signed part of root metadata. 

463 

464 Parameters listed below are also instance attributes. 

465 

466 Args: 

467 version: Metadata version number. Default is 1. 

468 spec_version: Supported TUF specification version. Default is the 

469 version currently supported by the library. 

470 expires: Metadata expiry date. Default is current date and time. 

471 keys: Dictionary of keyids to Keys. Defines the keys used in ``roles``. 

472 Default is empty dictionary. 

473 roles: Dictionary of role names to Roles. Defines which keys are 

474 required to sign the metadata for a specific role. Default is 

475 a dictionary of top level roles without keys and threshold of 1. 

476 consistent_snapshot: ``True`` if repository supports consistent 

477 snapshots. Default is True. 

478 unrecognized_fields: Dictionary of all attributes that are not managed 

479 by TUF Metadata API 

480 

481 Raises: 

482 ValueError: Invalid arguments. 

483 """ 

484 

485 type = _ROOT 

486 

487 def __init__( 

488 self, 

489 version: int | None = None, 

490 spec_version: str | None = None, 

491 expires: datetime | None = None, 

492 keys: dict[str, Key] | None = None, 

493 roles: dict[str, Role] | None = None, 

494 consistent_snapshot: bool | None = True, 

495 unrecognized_fields: dict[str, Any] | None = None, 

496 ): 

497 super().__init__(version, spec_version, expires, unrecognized_fields) 

498 self.consistent_snapshot = consistent_snapshot 

499 self.keys = keys if keys is not None else {} 

500 

501 if roles is None: 

502 roles = {r: Role([], 1) for r in TOP_LEVEL_ROLE_NAMES} 

503 elif set(roles) != TOP_LEVEL_ROLE_NAMES: 

504 raise ValueError("Role names must be the top-level metadata roles") 

505 self.roles = roles 

506 

507 def __eq__(self, other: object) -> bool: 

508 if not isinstance(other, Root): 

509 return False 

510 

511 return ( 

512 super().__eq__(other) 

513 and self.keys == other.keys 

514 and self.roles == other.roles 

515 and self.consistent_snapshot == other.consistent_snapshot 

516 ) 

517 

518 @classmethod 

519 def from_dict(cls, signed_dict: dict[str, Any]) -> Root: 

520 """Create ``Root`` object from its json/dict representation. 

521 

522 Raises: 

523 ValueError, KeyError, TypeError: Invalid arguments. 

524 """ 

525 common_args = cls._common_fields_from_dict(signed_dict) 

526 consistent_snapshot = signed_dict.pop("consistent_snapshot", None) 

527 keys = signed_dict.pop("keys") 

528 roles = signed_dict.pop("roles") 

529 

530 for keyid, key_dict in keys.items(): 

531 keys[keyid] = Key.from_dict(keyid, key_dict) 

532 for role_name, role_dict in roles.items(): 

533 roles[role_name] = Role.from_dict(role_dict) 

534 

535 # All fields left in the signed_dict are unrecognized. 

536 return cls(*common_args, keys, roles, consistent_snapshot, signed_dict) 

537 

538 def to_dict(self) -> dict[str, Any]: 

539 """Return the dict representation of self.""" 

540 root_dict = self._common_fields_to_dict() 

541 keys = {keyid: key.to_dict() for (keyid, key) in self.keys.items()} 

542 roles = {} 

543 for role_name, role in self.roles.items(): 

544 roles[role_name] = role.to_dict() 

545 if self.consistent_snapshot is not None: 

546 root_dict["consistent_snapshot"] = self.consistent_snapshot 

547 

548 root_dict.update( 

549 { 

550 "keys": keys, 

551 "roles": roles, 

552 } 

553 ) 

554 return root_dict 

555 

556 def add_key(self, key: Key, role: str) -> None: 

557 """Add new signing key for delegated role ``role``. 

558 

559 Args: 

560 key: Signing key to be added for ``role``. 

561 role: Name of the role, for which ``key`` is added. 

562 

563 Raises: 

564 ValueError: If the argument order is wrong or if ``role`` doesn't 

565 exist. 

566 """ 

567 # Verify that our users are not using the old argument order. 

568 if isinstance(role, Key): 

569 raise ValueError("Role must be a string, not a Key instance") 

570 

571 if role not in self.roles: 

572 raise ValueError(f"Role {role} doesn't exist") 

573 if key.keyid not in self.roles[role].keyids: 

574 self.roles[role].keyids.append(key.keyid) 

575 self.keys[key.keyid] = key 

576 

577 def revoke_key(self, keyid: str, role: str) -> None: 

578 """Revoke key from ``role`` and updates the key store. 

579 

580 Args: 

581 keyid: Identifier of the key to be removed for ``role``. 

582 role: Name of the role, for which a signing key is removed. 

583 

584 Raises: 

585 ValueError: If ``role`` doesn't exist or if ``role`` doesn't include 

586 the key. 

587 """ 

588 if role not in self.roles: 

589 raise ValueError(f"Role {role} doesn't exist") 

590 if keyid not in self.roles[role].keyids: 

591 raise ValueError(f"Key with id {keyid} is not used by {role}") 

592 self.roles[role].keyids.remove(keyid) 

593 for keyinfo in self.roles.values(): 

594 if keyid in keyinfo.keyids: 

595 return 

596 

597 del self.keys[keyid] 

598 

599 def get_delegated_role(self, delegated_role: str) -> Role: 

600 """Return the role object for the given delegated role. 

601 

602 Raises ValueError if delegated_role is not actually delegated. 

603 """ 

604 if delegated_role not in self.roles: 

605 raise ValueError(f"Delegated role {delegated_role} not found") 

606 

607 return self.roles[delegated_role] 

608 

609 def get_key(self, keyid: str) -> Key: 

610 if keyid not in self.keys: 

611 raise ValueError(f"Key {keyid} not found") 

612 

613 return self.keys[keyid] 

614 

615 def get_root_verification_result( 

616 self, 

617 previous: Root | None, 

618 payload: bytes, 

619 signatures: dict[str, Signature], 

620 ) -> RootVerificationResult: 

621 """Return signature threshold verification result for two root roles. 

622 

623 Verify root metadata with two roles (`self` and optionally `previous`). 

624 

625 If the repository has no root role versions yet, `previous` can be left 

626 None. In all other cases, `previous` must be the previous version of 

627 the Root. 

628 

629 NOTE: Unlike `verify_delegate()` this method does not raise, if the 

630 root metadata is not fully verified. 

631 

632 Args: 

633 previous: The previous `Root` to verify payload with, or None 

634 payload: Signed payload bytes for root 

635 signatures: Signatures over payload bytes 

636 

637 Raises: 

638 ValueError: no delegation was found for ``root`` or given Root 

639 versions are not sequential. 

640 """ 

641 

642 if previous is None: 

643 previous = self 

644 elif self.version != previous.version + 1: 

645 versions = f"v{previous.version} and v{self.version}" 

646 raise ValueError( 

647 f"Expected sequential root versions, got {versions}." 

648 ) 

649 

650 return RootVerificationResult( 

651 previous.get_verification_result(Root.type, payload, signatures), 

652 self.get_verification_result(Root.type, payload, signatures), 

653 ) 

654 

655 

656class BaseFile: 

657 """A base class of ``MetaFile`` and ``TargetFile``. 

658 

659 Encapsulates common static methods for length and hash verification. 

660 """ 

661 

662 @staticmethod 

663 def _verify_hashes( 

664 data: bytes | IO[bytes], expected_hashes: dict[str, str] 

665 ) -> None: 

666 """Verify that the hash of ``data`` matches ``expected_hashes``.""" 

667 is_bytes = isinstance(data, bytes) 

668 for algo, exp_hash in expected_hashes.items(): 

669 try: 

670 if is_bytes: 

671 digest_object = sslib_hash.digest(algo) 

672 digest_object.update(data) 

673 else: 

674 # if data is not bytes, assume it is a file object 

675 digest_object = sslib_hash.digest_fileobject(data, algo) 

676 except ( 

677 sslib_exceptions.UnsupportedAlgorithmError, 

678 sslib_exceptions.FormatError, 

679 ) as e: 

680 raise LengthOrHashMismatchError( 

681 f"Unsupported algorithm '{algo}'" 

682 ) from e 

683 

684 observed_hash = digest_object.hexdigest() 

685 if observed_hash != exp_hash: 

686 raise LengthOrHashMismatchError( 

687 f"Observed hash {observed_hash} does not match " 

688 f"expected hash {exp_hash}" 

689 ) 

690 

691 @staticmethod 

692 def _verify_length(data: bytes | IO[bytes], expected_length: int) -> None: 

693 """Verify that the length of ``data`` matches ``expected_length``.""" 

694 if isinstance(data, bytes): 

695 observed_length = len(data) 

696 else: 

697 # if data is not bytes, assume it is a file object 

698 data.seek(0, io.SEEK_END) 

699 observed_length = data.tell() 

700 

701 if observed_length != expected_length: 

702 raise LengthOrHashMismatchError( 

703 f"Observed length {observed_length} does not match " 

704 f"expected length {expected_length}" 

705 ) 

706 

707 @staticmethod 

708 def _validate_hashes(hashes: dict[str, str]) -> None: 

709 if not hashes: 

710 raise ValueError("Hashes must be a non empty dictionary") 

711 for key, value in hashes.items(): 

712 if not (isinstance(key, str) and isinstance(value, str)): 

713 raise TypeError("Hashes items must be strings") 

714 

715 @staticmethod 

716 def _validate_length(length: int) -> None: 

717 if length < 0: 

718 raise ValueError(f"Length must be >= 0, got {length}") 

719 

720 @staticmethod 

721 def _get_length_and_hashes( 

722 data: bytes | IO[bytes], hash_algorithms: list[str] | None 

723 ) -> tuple[int, dict[str, str]]: 

724 """Calculate length and hashes of ``data``.""" 

725 if isinstance(data, bytes): 

726 length = len(data) 

727 else: 

728 data.seek(0, io.SEEK_END) 

729 length = data.tell() 

730 

731 hashes = {} 

732 

733 if hash_algorithms is None: 

734 hash_algorithms = [sslib_hash.DEFAULT_HASH_ALGORITHM] 

735 

736 for algorithm in hash_algorithms: 

737 try: 

738 if isinstance(data, bytes): 

739 digest_object = sslib_hash.digest(algorithm) 

740 digest_object.update(data) 

741 else: 

742 digest_object = sslib_hash.digest_fileobject( 

743 data, algorithm 

744 ) 

745 except ( 

746 sslib_exceptions.UnsupportedAlgorithmError, 

747 sslib_exceptions.FormatError, 

748 ) as e: 

749 raise ValueError(f"Unsupported algorithm '{algorithm}'") from e 

750 

751 hashes[algorithm] = digest_object.hexdigest() 

752 

753 return (length, hashes) 

754 

755 

756class MetaFile(BaseFile): 

757 """A container with information about a particular metadata file. 

758 

759 *All parameters named below are not just constructor arguments but also 

760 instance attributes.* 

761 

762 Args: 

763 version: Version of the metadata file. 

764 length: Length of the metadata file in bytes. 

765 hashes: Dictionary of hash algorithm names to hashes of the metadata 

766 file content. 

767 unrecognized_fields: Dictionary of all attributes that are not managed 

768 by TUF Metadata API 

769 

770 Raises: 

771 ValueError, TypeError: Invalid arguments. 

772 """ 

773 

774 def __init__( 

775 self, 

776 version: int = 1, 

777 length: int | None = None, 

778 hashes: dict[str, str] | None = None, 

779 unrecognized_fields: dict[str, Any] | None = None, 

780 ): 

781 if version <= 0: 

782 raise ValueError(f"Metafile version must be > 0, got {version}") 

783 if length is not None: 

784 self._validate_length(length) 

785 if hashes is not None: 

786 self._validate_hashes(hashes) 

787 

788 self.version = version 

789 self.length = length 

790 self.hashes = hashes 

791 if unrecognized_fields is None: 

792 unrecognized_fields = {} 

793 

794 self.unrecognized_fields = unrecognized_fields 

795 

796 def __eq__(self, other: object) -> bool: 

797 if not isinstance(other, MetaFile): 

798 return False 

799 

800 return ( 

801 self.version == other.version 

802 and self.length == other.length 

803 and self.hashes == other.hashes 

804 and self.unrecognized_fields == other.unrecognized_fields 

805 ) 

806 

807 @classmethod 

808 def from_dict(cls, meta_dict: dict[str, Any]) -> MetaFile: 

809 """Create ``MetaFile`` object from its json/dict representation. 

810 

811 Raises: 

812 ValueError, KeyError: Invalid arguments. 

813 """ 

814 version = meta_dict.pop("version") 

815 length = meta_dict.pop("length", None) 

816 hashes = meta_dict.pop("hashes", None) 

817 

818 # All fields left in the meta_dict are unrecognized. 

819 return cls(version, length, hashes, meta_dict) 

820 

821 @classmethod 

822 def from_data( 

823 cls, 

824 version: int, 

825 data: bytes | IO[bytes], 

826 hash_algorithms: list[str], 

827 ) -> MetaFile: 

828 """Creates MetaFile object from bytes. 

829 This constructor should only be used if hashes are wanted. 

830 By default, MetaFile(ver) should be used. 

831 Args: 

832 version: Version of the metadata file. 

833 data: Metadata bytes that the metafile represents. 

834 hash_algorithms: Hash algorithms to create the hashes with. If not 

835 specified, the securesystemslib default hash algorithm is used. 

836 

837 Raises: 

838 ValueError: The hash algorithms list contains an unsupported 

839 algorithm. 

840 """ 

841 length, hashes = cls._get_length_and_hashes(data, hash_algorithms) 

842 return cls(version, length, hashes) 

843 

844 def to_dict(self) -> dict[str, Any]: 

845 """Return the dictionary representation of self.""" 

846 res_dict: dict[str, Any] = { 

847 "version": self.version, 

848 **self.unrecognized_fields, 

849 } 

850 

851 if self.length is not None: 

852 res_dict["length"] = self.length 

853 

854 if self.hashes is not None: 

855 res_dict["hashes"] = self.hashes 

856 

857 return res_dict 

858 

859 def verify_length_and_hashes(self, data: bytes | IO[bytes]) -> None: 

860 """Verify that the length and hashes of ``data`` match expected values. 

861 

862 Args: 

863 data: File object or its content in bytes. 

864 

865 Raises: 

866 LengthOrHashMismatchError: Calculated length or hashes do not 

867 match expected values or hash algorithm is not supported. 

868 """ 

869 if self.length is not None: 

870 self._verify_length(data, self.length) 

871 

872 if self.hashes is not None: 

873 self._verify_hashes(data, self.hashes) 

874 

875 

876class Timestamp(Signed): 

877 """A container for the signed part of timestamp metadata. 

878 

879 TUF file format uses a dictionary to contain the snapshot information: 

880 this is not the case with ``Timestamp.snapshot_meta`` which is a 

881 ``MetaFile``. 

882 

883 *All parameters named below are not just constructor arguments but also 

884 instance attributes.* 

885 

886 Args: 

887 version: Metadata version number. Default is 1. 

888 spec_version: Supported TUF specification version. Default is the 

889 version currently supported by the library. 

890 expires: Metadata expiry date. Default is current date and time. 

891 unrecognized_fields: Dictionary of all attributes that are not managed 

892 by TUF Metadata API 

893 snapshot_meta: Meta information for snapshot metadata. Default is a 

894 MetaFile with version 1. 

895 

896 Raises: 

897 ValueError: Invalid arguments. 

898 """ 

899 

900 type = _TIMESTAMP 

901 

902 def __init__( 

903 self, 

904 version: int | None = None, 

905 spec_version: str | None = None, 

906 expires: datetime | None = None, 

907 snapshot_meta: MetaFile | None = None, 

908 unrecognized_fields: dict[str, Any] | None = None, 

909 ): 

910 super().__init__(version, spec_version, expires, unrecognized_fields) 

911 self.snapshot_meta = snapshot_meta or MetaFile(1) 

912 

913 def __eq__(self, other: object) -> bool: 

914 if not isinstance(other, Timestamp): 

915 return False 

916 

917 return ( 

918 super().__eq__(other) and self.snapshot_meta == other.snapshot_meta 

919 ) 

920 

921 @classmethod 

922 def from_dict(cls, signed_dict: dict[str, Any]) -> Timestamp: 

923 """Create ``Timestamp`` object from its json/dict representation. 

924 

925 Raises: 

926 ValueError, KeyError: Invalid arguments. 

927 """ 

928 common_args = cls._common_fields_from_dict(signed_dict) 

929 meta_dict = signed_dict.pop("meta") 

930 snapshot_meta = MetaFile.from_dict(meta_dict["snapshot.json"]) 

931 # All fields left in the timestamp_dict are unrecognized. 

932 return cls(*common_args, snapshot_meta, signed_dict) 

933 

934 def to_dict(self) -> dict[str, Any]: 

935 """Return the dict representation of self.""" 

936 res_dict = self._common_fields_to_dict() 

937 res_dict["meta"] = {"snapshot.json": self.snapshot_meta.to_dict()} 

938 return res_dict 

939 

940 

941class Snapshot(Signed): 

942 """A container for the signed part of snapshot metadata. 

943 

944 Snapshot contains information about all target Metadata files. 

945 

946 *All parameters named below are not just constructor arguments but also 

947 instance attributes.* 

948 

949 Args: 

950 version: Metadata version number. Default is 1. 

951 spec_version: Supported TUF specification version. Default is the 

952 version currently supported by the library. 

953 expires: Metadata expiry date. Default is current date and time. 

954 unrecognized_fields: Dictionary of all attributes that are not managed 

955 by TUF Metadata API 

956 meta: Dictionary of targets filenames to ``MetaFile`` objects. Default 

957 is a dictionary with a Metafile for "snapshot.json" version 1. 

958 

959 Raises: 

960 ValueError: Invalid arguments. 

961 """ 

962 

963 type = _SNAPSHOT 

964 

965 def __init__( 

966 self, 

967 version: int | None = None, 

968 spec_version: str | None = None, 

969 expires: datetime | None = None, 

970 meta: dict[str, MetaFile] | None = None, 

971 unrecognized_fields: dict[str, Any] | None = None, 

972 ): 

973 super().__init__(version, spec_version, expires, unrecognized_fields) 

974 self.meta = meta if meta is not None else {"targets.json": MetaFile(1)} 

975 

976 def __eq__(self, other: object) -> bool: 

977 if not isinstance(other, Snapshot): 

978 return False 

979 

980 return super().__eq__(other) and self.meta == other.meta 

981 

982 @classmethod 

983 def from_dict(cls, signed_dict: dict[str, Any]) -> Snapshot: 

984 """Create ``Snapshot`` object from its json/dict representation. 

985 

986 Raises: 

987 ValueError, KeyError: Invalid arguments. 

988 """ 

989 common_args = cls._common_fields_from_dict(signed_dict) 

990 meta_dicts = signed_dict.pop("meta") 

991 meta = {} 

992 for meta_path, meta_dict in meta_dicts.items(): 

993 meta[meta_path] = MetaFile.from_dict(meta_dict) 

994 # All fields left in the snapshot_dict are unrecognized. 

995 return cls(*common_args, meta, signed_dict) 

996 

997 def to_dict(self) -> dict[str, Any]: 

998 """Return the dict representation of self.""" 

999 snapshot_dict = self._common_fields_to_dict() 

1000 meta_dict = {} 

1001 for meta_path, meta_info in self.meta.items(): 

1002 meta_dict[meta_path] = meta_info.to_dict() 

1003 

1004 snapshot_dict["meta"] = meta_dict 

1005 return snapshot_dict 

1006 

1007 

1008class DelegatedRole(Role): 

1009 """A container with information about a delegated role. 

1010 

1011 A delegation can happen in two ways: 

1012 

1013 - ``paths`` is set: delegates targets matching any path pattern in 

1014 ``paths`` 

1015 - ``path_hash_prefixes`` is set: delegates targets whose target path 

1016 hash starts with any of the prefixes in ``path_hash_prefixes`` 

1017 

1018 ``paths`` and ``path_hash_prefixes`` are mutually exclusive: 

1019 both cannot be set, at least one of them must be set. 

1020 

1021 *All parameters named below are not just constructor arguments but also 

1022 instance attributes.* 

1023 

1024 Args: 

1025 name: Delegated role name. 

1026 keyids: Delegated role signing key identifiers. 

1027 threshold: Number of keys required to sign this role's metadata. 

1028 terminating: ``True`` if this delegation terminates a target lookup. 

1029 paths: Path patterns. See note above. 

1030 path_hash_prefixes: Hash prefixes. See note above. 

1031 unrecognized_fields: Dictionary of all attributes that are not managed 

1032 by TUF Metadata API. 

1033 

1034 Raises: 

1035 ValueError: Invalid arguments. 

1036 """ 

1037 

1038 def __init__( 

1039 self, 

1040 name: str, 

1041 keyids: list[str], 

1042 threshold: int, 

1043 terminating: bool, 

1044 paths: list[str] | None = None, 

1045 path_hash_prefixes: list[str] | None = None, 

1046 unrecognized_fields: dict[str, Any] | None = None, 

1047 ): 

1048 super().__init__(keyids, threshold, unrecognized_fields) 

1049 self.name = name 

1050 self.terminating = terminating 

1051 exclusive_vars = [paths, path_hash_prefixes] 

1052 if sum(1 for var in exclusive_vars if var is not None) != 1: 

1053 raise ValueError( 

1054 "Only one of (paths, path_hash_prefixes) must be set" 

1055 ) 

1056 

1057 if paths is not None and any(not isinstance(p, str) for p in paths): 

1058 raise ValueError("Paths must be strings") 

1059 if path_hash_prefixes is not None and any( 

1060 not isinstance(p, str) for p in path_hash_prefixes 

1061 ): 

1062 raise ValueError("Path_hash_prefixes must be strings") 

1063 

1064 self.paths = paths 

1065 self.path_hash_prefixes = path_hash_prefixes 

1066 

1067 def __eq__(self, other: object) -> bool: 

1068 if not isinstance(other, DelegatedRole): 

1069 return False 

1070 

1071 return ( 

1072 super().__eq__(other) 

1073 and self.name == other.name 

1074 and self.terminating == other.terminating 

1075 and self.paths == other.paths 

1076 and self.path_hash_prefixes == other.path_hash_prefixes 

1077 ) 

1078 

1079 @classmethod 

1080 def from_dict(cls, role_dict: dict[str, Any]) -> DelegatedRole: 

1081 """Create ``DelegatedRole`` object from its json/dict representation. 

1082 

1083 Raises: 

1084 ValueError, KeyError, TypeError: Invalid arguments. 

1085 """ 

1086 name = role_dict.pop("name") 

1087 keyids = role_dict.pop("keyids") 

1088 threshold = role_dict.pop("threshold") 

1089 terminating = role_dict.pop("terminating") 

1090 paths = role_dict.pop("paths", None) 

1091 path_hash_prefixes = role_dict.pop("path_hash_prefixes", None) 

1092 # All fields left in the role_dict are unrecognized. 

1093 return cls( 

1094 name, 

1095 keyids, 

1096 threshold, 

1097 terminating, 

1098 paths, 

1099 path_hash_prefixes, 

1100 role_dict, 

1101 ) 

1102 

1103 def to_dict(self) -> dict[str, Any]: 

1104 """Return the dict representation of self.""" 

1105 base_role_dict = super().to_dict() 

1106 res_dict = { 

1107 "name": self.name, 

1108 "terminating": self.terminating, 

1109 **base_role_dict, 

1110 } 

1111 if self.paths is not None: 

1112 res_dict["paths"] = self.paths 

1113 elif self.path_hash_prefixes is not None: 

1114 res_dict["path_hash_prefixes"] = self.path_hash_prefixes 

1115 return res_dict 

1116 

1117 @staticmethod 

1118 def _is_target_in_pathpattern(targetpath: str, pathpattern: str) -> bool: 

1119 """Determine whether ``targetpath`` matches the ``pathpattern``.""" 

1120 # We need to make sure that targetpath and pathpattern are pointing to 

1121 # the same directory as fnmatch doesn't threat "/" as a special symbol. 

1122 target_parts = targetpath.split("/") 

1123 pattern_parts = pathpattern.split("/") 

1124 if len(target_parts) != len(pattern_parts): 

1125 return False 

1126 

1127 # Every part in the pathpattern could include a glob pattern, that's why 

1128 # each of the target and pathpattern parts should match. 

1129 for target_dir, pattern_dir in zip(target_parts, pattern_parts): 

1130 if not fnmatch.fnmatch(target_dir, pattern_dir): 

1131 return False 

1132 

1133 return True 

1134 

1135 def is_delegated_path(self, target_filepath: str) -> bool: 

1136 """Determine whether the given ``target_filepath`` is in one of 

1137 the paths that ``DelegatedRole`` is trusted to provide. 

1138 

1139 The ``target_filepath`` and the ``DelegatedRole`` paths are expected to 

1140 be in their canonical forms, so e.g. "a/b" instead of "a//b" . Only "/" 

1141 is supported as target path separator. Leading separators are not 

1142 handled as special cases (see `TUF specification on targetpath 

1143 <https://theupdateframework.github.io/specification/latest/#targetpath>`_). 

1144 

1145 Args: 

1146 target_filepath: URL path to a target file, relative to a base 

1147 targets URL. 

1148 """ 

1149 

1150 if self.path_hash_prefixes is not None: 

1151 # Calculate the hash of the filepath 

1152 # to determine in which bin to find the target. 

1153 digest_object = sslib_hash.digest(algorithm="sha256") 

1154 digest_object.update(target_filepath.encode("utf-8")) 

1155 target_filepath_hash = digest_object.hexdigest() 

1156 

1157 for path_hash_prefix in self.path_hash_prefixes: 

1158 if target_filepath_hash.startswith(path_hash_prefix): 

1159 return True 

1160 

1161 elif self.paths is not None: 

1162 for pathpattern in self.paths: 

1163 # A delegated role path may be an explicit path or glob 

1164 # pattern (Unix shell-style wildcards). 

1165 if self._is_target_in_pathpattern(target_filepath, pathpattern): 

1166 return True 

1167 

1168 return False 

1169 

1170 

1171class SuccinctRoles(Role): 

1172 """Succinctly defines a hash bin delegation graph. 

1173 

1174 A ``SuccinctRoles`` object describes a delegation graph that covers all 

1175 targets, distributing them uniformly over the delegated roles (i.e. bins) 

1176 in the graph. 

1177 

1178 The total number of bins is 2 to the power of the passed ``bit_length``. 

1179 

1180 Bin names are the concatenation of the passed ``name_prefix`` and a 

1181 zero-padded hex representation of the bin index separated by a hyphen. 

1182 

1183 The passed ``keyids`` and ``threshold`` is used for each bin, and each bin 

1184 is 'terminating'. 

1185 

1186 For details: https://github.com/theupdateframework/taps/blob/master/tap15.md 

1187 

1188 Args: 

1189 keyids: Signing key identifiers for any bin metadata. 

1190 threshold: Number of keys required to sign any bin metadata. 

1191 bit_length: Number of bits between 1 and 32. 

1192 name_prefix: Prefix of all bin names. 

1193 unrecognized_fields: Dictionary of all attributes that are not managed 

1194 by TUF Metadata API. 

1195 

1196 Raises: 

1197 ValueError, TypeError, AttributeError: Invalid arguments. 

1198 """ 

1199 

1200 def __init__( 

1201 self, 

1202 keyids: list[str], 

1203 threshold: int, 

1204 bit_length: int, 

1205 name_prefix: str, 

1206 unrecognized_fields: dict[str, Any] | None = None, 

1207 ) -> None: 

1208 super().__init__(keyids, threshold, unrecognized_fields) 

1209 

1210 if bit_length <= 0 or bit_length > 32: 

1211 raise ValueError("bit_length must be between 1 and 32") 

1212 if not isinstance(name_prefix, str): 

1213 raise ValueError("name_prefix must be a string") 

1214 

1215 self.bit_length = bit_length 

1216 self.name_prefix = name_prefix 

1217 

1218 # Calculate the suffix_len value based on the total number of bins in 

1219 # hex. If bit_length = 10 then number_of_bins = 1024 or bin names will 

1220 # have a suffix between "000" and "3ff" in hex and suffix_len will be 3 

1221 # meaning the third bin will have a suffix of "003". 

1222 self.number_of_bins = 2**bit_length 

1223 # suffix_len is calculated based on "number_of_bins - 1" as the name 

1224 # of the last bin contains the number "number_of_bins -1" as a suffix. 

1225 self.suffix_len = len(f"{self.number_of_bins - 1:x}") 

1226 

1227 def __eq__(self, other: object) -> bool: 

1228 if not isinstance(other, SuccinctRoles): 

1229 return False 

1230 

1231 return ( 

1232 super().__eq__(other) 

1233 and self.bit_length == other.bit_length 

1234 and self.name_prefix == other.name_prefix 

1235 ) 

1236 

1237 @classmethod 

1238 def from_dict(cls, role_dict: dict[str, Any]) -> SuccinctRoles: 

1239 """Create ``SuccinctRoles`` object from its json/dict representation. 

1240 

1241 Raises: 

1242 ValueError, KeyError, AttributeError, TypeError: Invalid arguments. 

1243 """ 

1244 keyids = role_dict.pop("keyids") 

1245 threshold = role_dict.pop("threshold") 

1246 bit_length = role_dict.pop("bit_length") 

1247 name_prefix = role_dict.pop("name_prefix") 

1248 # All fields left in the role_dict are unrecognized. 

1249 return cls(keyids, threshold, bit_length, name_prefix, role_dict) 

1250 

1251 def to_dict(self) -> dict[str, Any]: 

1252 """Return the dict representation of self.""" 

1253 base_role_dict = super().to_dict() 

1254 return { 

1255 "bit_length": self.bit_length, 

1256 "name_prefix": self.name_prefix, 

1257 **base_role_dict, 

1258 } 

1259 

1260 def get_role_for_target(self, target_filepath: str) -> str: 

1261 """Calculate the name of the delegated role responsible for 

1262 ``target_filepath``. 

1263 

1264 The target at path ``target_filepath`` is assigned to a bin by casting 

1265 the left-most ``bit_length`` of bits of the file path hash digest to 

1266 int, using it as bin index between 0 and ``2**bit_length - 1``. 

1267 

1268 Args: 

1269 target_filepath: URL path to a target file, relative to a base 

1270 targets URL. 

1271 """ 

1272 hasher = sslib_hash.digest(algorithm="sha256") 

1273 hasher.update(target_filepath.encode("utf-8")) 

1274 

1275 # We can't ever need more than 4 bytes (32 bits). 

1276 hash_bytes = hasher.digest()[:4] 

1277 # Right shift hash bytes, so that we only have the leftmost 

1278 # bit_length bits that we care about. 

1279 shift_value = 32 - self.bit_length 

1280 bin_number = int.from_bytes(hash_bytes, byteorder="big") >> shift_value 

1281 # Add zero padding if necessary and cast to hex the suffix. 

1282 suffix = f"{bin_number:0{self.suffix_len}x}" 

1283 return f"{self.name_prefix}-{suffix}" 

1284 

1285 def get_roles(self) -> Iterator[str]: 

1286 """Yield the names of all different delegated roles one by one.""" 

1287 for i in range(self.number_of_bins): 

1288 suffix = f"{i:0{self.suffix_len}x}" 

1289 yield f"{self.name_prefix}-{suffix}" 

1290 

1291 def is_delegated_role(self, role_name: str) -> bool: 

1292 """Determine whether the given ``role_name`` is in one of 

1293 the delegated roles that ``SuccinctRoles`` represents. 

1294 

1295 Args: 

1296 role_name: The name of the role to check against. 

1297 """ 

1298 desired_prefix = self.name_prefix + "-" 

1299 

1300 if not role_name.startswith(desired_prefix): 

1301 return False 

1302 

1303 suffix = role_name[len(desired_prefix) :] 

1304 if len(suffix) != self.suffix_len: 

1305 return False 

1306 

1307 try: 

1308 # make sure suffix is hex value 

1309 num = int(suffix, 16) 

1310 except ValueError: 

1311 return False 

1312 

1313 return 0 <= num < self.number_of_bins 

1314 

1315 

1316class Delegations: 

1317 """A container object storing information about all delegations. 

1318 

1319 *All parameters named below are not just constructor arguments but also 

1320 instance attributes.* 

1321 

1322 Args: 

1323 keys: Dictionary of keyids to Keys. Defines the keys used in ``roles``. 

1324 roles: Ordered dictionary of role names to DelegatedRoles instances. It 

1325 defines which keys are required to sign the metadata for a specific 

1326 role. The roles order also defines the order that role delegations 

1327 are considered during target searches. 

1328 succinct_roles: Contains succinct information about hash bin 

1329 delegations. Note that succinct roles is not a TUF specification 

1330 feature yet and setting `succinct_roles` to a value makes the 

1331 resulting metadata non-compliant. The metadata will not be accepted 

1332 as valid by specification compliant clients such as those built with 

1333 python-tuf <= 1.1.0. For more information see: https://github.com/theupdateframework/taps/blob/master/tap15.md 

1334 unrecognized_fields: Dictionary of all attributes that are not managed 

1335 by TUF Metadata API 

1336 

1337 Exactly one of ``roles`` and ``succinct_roles`` must be set. 

1338 

1339 Raises: 

1340 ValueError: Invalid arguments. 

1341 """ 

1342 

1343 def __init__( 

1344 self, 

1345 keys: dict[str, Key], 

1346 roles: dict[str, DelegatedRole] | None = None, 

1347 succinct_roles: SuccinctRoles | None = None, 

1348 unrecognized_fields: dict[str, Any] | None = None, 

1349 ): 

1350 self.keys = keys 

1351 if sum(1 for v in [roles, succinct_roles] if v is not None) != 1: 

1352 raise ValueError("One of roles and succinct_roles must be set") 

1353 

1354 if roles is not None: 

1355 for role in roles: 

1356 if not role or role in TOP_LEVEL_ROLE_NAMES: 

1357 raise ValueError( 

1358 "Delegated roles cannot be empty or use top-level " 

1359 "role names" 

1360 ) 

1361 

1362 self.roles = roles 

1363 self.succinct_roles = succinct_roles 

1364 if unrecognized_fields is None: 

1365 unrecognized_fields = {} 

1366 

1367 self.unrecognized_fields = unrecognized_fields 

1368 

1369 def __eq__(self, other: object) -> bool: 

1370 if not isinstance(other, Delegations): 

1371 return False 

1372 

1373 all_attributes_check = ( 

1374 self.keys == other.keys 

1375 and self.roles == other.roles 

1376 and self.succinct_roles == other.succinct_roles 

1377 and self.unrecognized_fields == other.unrecognized_fields 

1378 ) 

1379 

1380 if self.roles is not None and other.roles is not None: 

1381 all_attributes_check = ( 

1382 all_attributes_check 

1383 # Order of the delegated roles matters (see issue #1788). 

1384 and list(self.roles.items()) == list(other.roles.items()) 

1385 ) 

1386 

1387 return all_attributes_check 

1388 

1389 @classmethod 

1390 def from_dict(cls, delegations_dict: dict[str, Any]) -> Delegations: 

1391 """Create ``Delegations`` object from its json/dict representation. 

1392 

1393 Raises: 

1394 ValueError, KeyError, TypeError: Invalid arguments. 

1395 """ 

1396 keys = delegations_dict.pop("keys") 

1397 keys_res = {} 

1398 for keyid, key_dict in keys.items(): 

1399 keys_res[keyid] = Key.from_dict(keyid, key_dict) 

1400 roles = delegations_dict.pop("roles", None) 

1401 roles_res: dict[str, DelegatedRole] | None = None 

1402 

1403 if roles is not None: 

1404 roles_res = {} 

1405 for role_dict in roles: 

1406 new_role = DelegatedRole.from_dict(role_dict) 

1407 if new_role.name in roles_res: 

1408 raise ValueError(f"Duplicate role {new_role.name}") 

1409 roles_res[new_role.name] = new_role 

1410 

1411 succinct_roles_dict = delegations_dict.pop("succinct_roles", None) 

1412 succinct_roles_info = None 

1413 if succinct_roles_dict is not None: 

1414 succinct_roles_info = SuccinctRoles.from_dict(succinct_roles_dict) 

1415 

1416 # All fields left in the delegations_dict are unrecognized. 

1417 return cls(keys_res, roles_res, succinct_roles_info, delegations_dict) 

1418 

1419 def to_dict(self) -> dict[str, Any]: 

1420 """Return the dict representation of self.""" 

1421 keys = {keyid: key.to_dict() for keyid, key in self.keys.items()} 

1422 res_dict: dict[str, Any] = { 

1423 "keys": keys, 

1424 **self.unrecognized_fields, 

1425 } 

1426 if self.roles is not None: 

1427 roles = [role_obj.to_dict() for role_obj in self.roles.values()] 

1428 res_dict["roles"] = roles 

1429 elif self.succinct_roles is not None: 

1430 res_dict["succinct_roles"] = self.succinct_roles.to_dict() 

1431 

1432 return res_dict 

1433 

1434 def get_roles_for_target( 

1435 self, target_filepath: str 

1436 ) -> Iterator[tuple[str, bool]]: 

1437 """Given ``target_filepath`` get names and terminating status of all 

1438 delegated roles who are responsible for it. 

1439 

1440 Args: 

1441 target_filepath: URL path to a target file, relative to a base 

1442 targets URL. 

1443 """ 

1444 if self.roles is not None: 

1445 for role in self.roles.values(): 

1446 if role.is_delegated_path(target_filepath): 

1447 yield role.name, role.terminating 

1448 

1449 elif self.succinct_roles is not None: 

1450 # We consider all succinct_roles as terminating. 

1451 # For more information read TAP 15. 

1452 yield self.succinct_roles.get_role_for_target(target_filepath), True 

1453 

1454 

1455class TargetFile(BaseFile): 

1456 """A container with information about a particular target file. 

1457 

1458 *All parameters named below are not just constructor arguments but also 

1459 instance attributes.* 

1460 

1461 Args: 

1462 length: Length of the target file in bytes. 

1463 hashes: Dictionary of hash algorithm names to hashes of the target 

1464 file content. 

1465 path: URL path to a target file, relative to a base targets URL. 

1466 unrecognized_fields: Dictionary of all attributes that are not managed 

1467 by TUF Metadata API 

1468 

1469 Raises: 

1470 ValueError, TypeError: Invalid arguments. 

1471 """ 

1472 

1473 def __init__( 

1474 self, 

1475 length: int, 

1476 hashes: dict[str, str], 

1477 path: str, 

1478 unrecognized_fields: dict[str, Any] | None = None, 

1479 ): 

1480 self._validate_length(length) 

1481 self._validate_hashes(hashes) 

1482 

1483 self.length = length 

1484 self.hashes = hashes 

1485 self.path = path 

1486 if unrecognized_fields is None: 

1487 unrecognized_fields = {} 

1488 

1489 self.unrecognized_fields = unrecognized_fields 

1490 

1491 @property 

1492 def custom(self) -> Any: # noqa: ANN401 

1493 """Get implementation specific data related to the target. 

1494 

1495 python-tuf does not use or validate this data. 

1496 """ 

1497 return self.unrecognized_fields.get("custom") 

1498 

1499 def __eq__(self, other: object) -> bool: 

1500 if not isinstance(other, TargetFile): 

1501 return False 

1502 

1503 return ( 

1504 self.length == other.length 

1505 and self.hashes == other.hashes 

1506 and self.path == other.path 

1507 and self.unrecognized_fields == other.unrecognized_fields 

1508 ) 

1509 

1510 @classmethod 

1511 def from_dict(cls, target_dict: dict[str, Any], path: str) -> TargetFile: 

1512 """Create ``TargetFile`` object from its json/dict representation. 

1513 

1514 Raises: 

1515 ValueError, KeyError, TypeError: Invalid arguments. 

1516 """ 

1517 length = target_dict.pop("length") 

1518 hashes = target_dict.pop("hashes") 

1519 

1520 # All fields left in the target_dict are unrecognized. 

1521 return cls(length, hashes, path, target_dict) 

1522 

1523 def to_dict(self) -> dict[str, Any]: 

1524 """Return the JSON-serializable dictionary representation of self.""" 

1525 return { 

1526 "length": self.length, 

1527 "hashes": self.hashes, 

1528 **self.unrecognized_fields, 

1529 } 

1530 

1531 @classmethod 

1532 def from_file( 

1533 cls, 

1534 target_file_path: str, 

1535 local_path: str, 

1536 hash_algorithms: list[str] | None = None, 

1537 ) -> TargetFile: 

1538 """Create ``TargetFile`` object from a file. 

1539 

1540 Args: 

1541 target_file_path: URL path to a target file, relative to a base 

1542 targets URL. 

1543 local_path: Local path to target file content. 

1544 hash_algorithms: Hash algorithms to calculate hashes with. If not 

1545 specified the securesystemslib default hash algorithm is used. 

1546 

1547 Raises: 

1548 FileNotFoundError: The file doesn't exist. 

1549 ValueError: The hash algorithms list contains an unsupported 

1550 algorithm. 

1551 """ 

1552 with open(local_path, "rb") as file: 

1553 return cls.from_data(target_file_path, file, hash_algorithms) 

1554 

1555 @classmethod 

1556 def from_data( 

1557 cls, 

1558 target_file_path: str, 

1559 data: bytes | IO[bytes], 

1560 hash_algorithms: list[str] | None = None, 

1561 ) -> TargetFile: 

1562 """Create ``TargetFile`` object from bytes. 

1563 

1564 Args: 

1565 target_file_path: URL path to a target file, relative to a base 

1566 targets URL. 

1567 data: Target file content. 

1568 hash_algorithms: Hash algorithms to create the hashes with. If not 

1569 specified the securesystemslib default hash algorithm is used. 

1570 

1571 Raises: 

1572 ValueError: The hash algorithms list contains an unsupported 

1573 algorithm. 

1574 """ 

1575 length, hashes = cls._get_length_and_hashes(data, hash_algorithms) 

1576 return cls(length, hashes, target_file_path) 

1577 

1578 def verify_length_and_hashes(self, data: bytes | IO[bytes]) -> None: 

1579 """Verify that length and hashes of ``data`` match expected values. 

1580 

1581 Args: 

1582 data: Target file object or its content in bytes. 

1583 

1584 Raises: 

1585 LengthOrHashMismatchError: Calculated length or hashes do not 

1586 match expected values or hash algorithm is not supported. 

1587 """ 

1588 self._verify_length(data, self.length) 

1589 self._verify_hashes(data, self.hashes) 

1590 

1591 def get_prefixed_paths(self) -> list[str]: 

1592 """ 

1593 Return hash-prefixed URL path fragments for the target file path. 

1594 """ 

1595 paths = [] 

1596 parent, sep, name = self.path.rpartition("/") 

1597 for hash_value in self.hashes.values(): 

1598 paths.append(f"{parent}{sep}{hash_value}.{name}") 

1599 

1600 return paths 

1601 

1602 

1603class Targets(Signed, _DelegatorMixin): 

1604 """A container for the signed part of targets metadata. 

1605 

1606 Targets contains verifying information about target files and also 

1607 delegates responsibility to other Targets roles. 

1608 

1609 *All parameters named below are not just constructor arguments but also 

1610 instance attributes.* 

1611 

1612 Args: 

1613 version: Metadata version number. Default is 1. 

1614 spec_version: Supported TUF specification version. Default is the 

1615 version currently supported by the library. 

1616 expires: Metadata expiry date. Default is current date and time. 

1617 targets: Dictionary of target filenames to TargetFiles. Default is an 

1618 empty dictionary. 

1619 delegations: Defines how this Targets delegates responsibility to other 

1620 Targets Metadata files. Default is None. 

1621 unrecognized_fields: Dictionary of all attributes that are not managed 

1622 by TUF Metadata API 

1623 

1624 Raises: 

1625 ValueError: Invalid arguments. 

1626 """ 

1627 

1628 type = _TARGETS 

1629 

1630 def __init__( 

1631 self, 

1632 version: int | None = None, 

1633 spec_version: str | None = None, 

1634 expires: datetime | None = None, 

1635 targets: dict[str, TargetFile] | None = None, 

1636 delegations: Delegations | None = None, 

1637 unrecognized_fields: dict[str, Any] | None = None, 

1638 ) -> None: 

1639 super().__init__(version, spec_version, expires, unrecognized_fields) 

1640 self.targets = targets if targets is not None else {} 

1641 self.delegations = delegations 

1642 

1643 def __eq__(self, other: object) -> bool: 

1644 if not isinstance(other, Targets): 

1645 return False 

1646 

1647 return ( 

1648 super().__eq__(other) 

1649 and self.targets == other.targets 

1650 and self.delegations == other.delegations 

1651 ) 

1652 

1653 @classmethod 

1654 def from_dict(cls, signed_dict: dict[str, Any]) -> Targets: 

1655 """Create ``Targets`` object from its json/dict representation. 

1656 

1657 Raises: 

1658 ValueError, KeyError, TypeError: Invalid arguments. 

1659 """ 

1660 common_args = cls._common_fields_from_dict(signed_dict) 

1661 targets = signed_dict.pop(_TARGETS) 

1662 try: 

1663 delegations_dict = signed_dict.pop("delegations") 

1664 except KeyError: 

1665 delegations = None 

1666 else: 

1667 delegations = Delegations.from_dict(delegations_dict) 

1668 res_targets = {} 

1669 for target_path, target_info in targets.items(): 

1670 res_targets[target_path] = TargetFile.from_dict( 

1671 target_info, target_path 

1672 ) 

1673 # All fields left in the targets_dict are unrecognized. 

1674 return cls(*common_args, res_targets, delegations, signed_dict) 

1675 

1676 def to_dict(self) -> dict[str, Any]: 

1677 """Return the dict representation of self.""" 

1678 targets_dict = self._common_fields_to_dict() 

1679 targets = {} 

1680 for target_path, target_file_obj in self.targets.items(): 

1681 targets[target_path] = target_file_obj.to_dict() 

1682 targets_dict[_TARGETS] = targets 

1683 if self.delegations is not None: 

1684 targets_dict["delegations"] = self.delegations.to_dict() 

1685 return targets_dict 

1686 

1687 def add_key(self, key: Key, role: str | None = None) -> None: 

1688 """Add new signing key for delegated role ``role``. 

1689 

1690 If succinct_roles is used then the ``role`` argument is not required. 

1691 

1692 Args: 

1693 key: Signing key to be added for ``role``. 

1694 role: Name of the role, for which ``key`` is added. 

1695 

1696 Raises: 

1697 ValueError: If the argument order is wrong or if there are no 

1698 delegated roles or if ``role`` is not delegated by this Target. 

1699 """ 

1700 # Verify that our users are not using the old argument order. 

1701 if isinstance(role, Key): 

1702 raise ValueError("Role must be a string, not a Key instance") 

1703 

1704 if self.delegations is None: 

1705 raise ValueError(f"Delegated role {role} doesn't exist") 

1706 

1707 if self.delegations.roles is not None: 

1708 if role not in self.delegations.roles: 

1709 raise ValueError(f"Delegated role {role} doesn't exist") 

1710 if key.keyid not in self.delegations.roles[role].keyids: 

1711 self.delegations.roles[role].keyids.append(key.keyid) 

1712 

1713 elif self.delegations.succinct_roles is not None: 

1714 if key.keyid not in self.delegations.succinct_roles.keyids: 

1715 self.delegations.succinct_roles.keyids.append(key.keyid) 

1716 

1717 self.delegations.keys[key.keyid] = key 

1718 

1719 def revoke_key(self, keyid: str, role: str | None = None) -> None: 

1720 """Revokes key from delegated role ``role`` and updates the delegations 

1721 key store. 

1722 

1723 If succinct_roles is used then the ``role`` argument is not required. 

1724 

1725 Args: 

1726 keyid: Identifier of the key to be removed for ``role``. 

1727 role: Name of the role, for which a signing key is removed. 

1728 

1729 Raises: 

1730 ValueError: If there are no delegated roles or if ``role`` is not 

1731 delegated by this ``Target`` or if key is not used by ``role`` 

1732 or if key with id ``keyid`` is not used by succinct roles. 

1733 """ 

1734 if self.delegations is None: 

1735 raise ValueError(f"Delegated role {role} doesn't exist") 

1736 

1737 if self.delegations.roles is not None: 

1738 if role not in self.delegations.roles: 

1739 raise ValueError(f"Delegated role {role} doesn't exist") 

1740 if keyid not in self.delegations.roles[role].keyids: 

1741 raise ValueError(f"Key with id {keyid} is not used by {role}") 

1742 

1743 self.delegations.roles[role].keyids.remove(keyid) 

1744 for keyinfo in self.delegations.roles.values(): 

1745 if keyid in keyinfo.keyids: 

1746 return 

1747 

1748 elif self.delegations.succinct_roles is not None: 

1749 if keyid not in self.delegations.succinct_roles.keyids: 

1750 raise ValueError( 

1751 f"Key with id {keyid} is not used by succinct_roles" 

1752 ) 

1753 

1754 self.delegations.succinct_roles.keyids.remove(keyid) 

1755 

1756 del self.delegations.keys[keyid] 

1757 

1758 def get_delegated_role(self, delegated_role: str) -> Role: 

1759 """Return the role object for the given delegated role. 

1760 

1761 Raises ValueError if delegated_role is not actually delegated. 

1762 """ 

1763 if self.delegations is None: 

1764 raise ValueError("No delegations found") 

1765 

1766 role: Role | None = None 

1767 if self.delegations.roles is not None: 

1768 role = self.delegations.roles.get(delegated_role) 

1769 elif self.delegations.succinct_roles is not None: 

1770 succinct = self.delegations.succinct_roles 

1771 if succinct.is_delegated_role(delegated_role): 

1772 role = succinct 

1773 

1774 if not role: 

1775 raise ValueError(f"Delegated role {delegated_role} not found") 

1776 

1777 return role 

1778 

1779 def get_key(self, keyid: str) -> Key: 

1780 if self.delegations is None: 

1781 raise ValueError("No delegations found") 

1782 if keyid not in self.delegations.keys: 

1783 raise ValueError(f"Key {keyid} not found") 

1784 

1785 return self.delegations.keys[keyid]