1# Copyright the TUF contributors
2# SPDX-License-Identifier: MIT OR Apache-2.0
3
4
5"""Helper classes for low-level Metadata API."""
6
7from __future__ import annotations
8
9import abc
10import fnmatch
11import io
12import logging
13from dataclasses import dataclass
14from datetime import datetime, timezone
15from typing import (
16 IO,
17 TYPE_CHECKING,
18 Any,
19 ClassVar,
20 TypeVar,
21)
22
23from securesystemslib import exceptions as sslib_exceptions
24from securesystemslib import hash as sslib_hash
25from securesystemslib.signer import Key, Signature
26
27from tuf.api.exceptions import LengthOrHashMismatchError, UnsignedMetadataError
28
29if TYPE_CHECKING:
30 from collections.abc import Iterator
31
32_ROOT = "root"
33_SNAPSHOT = "snapshot"
34_TARGETS = "targets"
35_TIMESTAMP = "timestamp"
36
37# We aim to support SPECIFICATION_VERSION and require the input metadata
38# files to have the same major version (the first number) as ours.
39SPECIFICATION_VERSION = ["1", "0", "31"]
40TOP_LEVEL_ROLE_NAMES = {_ROOT, _TIMESTAMP, _SNAPSHOT, _TARGETS}
41
42logger = logging.getLogger(__name__)
43
44# T is a Generic type constraint for container payloads
45T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets")
46
47
48class Signed(metaclass=abc.ABCMeta):
49 """A base class for the signed part of TUF metadata.
50
51 Objects with base class Signed are usually included in a ``Metadata`` object
52 on the signed attribute. This class provides attributes and methods that
53 are common for all TUF metadata types (roles).
54
55 *All parameters named below are not just constructor arguments but also
56 instance attributes.*
57
58 Args:
59 version: Metadata version number. If None, then 1 is assigned.
60 spec_version: Supported TUF specification version. If None, then the
61 version currently supported by the library is assigned.
62 expires: Metadata expiry date in UTC timezone. If None, then current
63 date and time is assigned.
64 unrecognized_fields: Dictionary of all attributes that are not managed
65 by TUF Metadata API
66
67 Raises:
68 ValueError: Invalid arguments.
69 """
70
71 # type is required for static reference without changing the API
72 type: ClassVar[str] = "signed"
73
74 # _type and type are identical: 1st replicates file format, 2nd passes lint
75 @property
76 def _type(self) -> str:
77 return self.type
78
79 @property
80 def expires(self) -> datetime:
81 """Get the metadata expiry date."""
82 return self._expires
83
84 @expires.setter
85 def expires(self, value: datetime) -> None:
86 """Set the metadata expiry date.
87
88 # Use 'datetime' module to e.g. expire in seven days from now
89 obj.expires = now(timezone.utc) + timedelta(days=7)
90 """
91 self._expires = value.replace(microsecond=0)
92 if self._expires.tzinfo is None:
93 # Naive datetime: just make it UTC
94 self._expires = self._expires.replace(tzinfo=timezone.utc)
95 elif self._expires.tzinfo != timezone.utc:
96 raise ValueError(f"Expected tz UTC, not {self._expires.tzinfo}")
97
98 # NOTE: Signed is a stupid name, because this might not be signed yet, but
99 # we keep it to match spec terminology (I often refer to this as "payload",
100 # or "inner metadata")
101 def __init__(
102 self,
103 version: int | None,
104 spec_version: str | None,
105 expires: datetime | None,
106 unrecognized_fields: dict[str, Any] | None,
107 ):
108 if spec_version is None:
109 spec_version = ".".join(SPECIFICATION_VERSION)
110 # Accept semver (X.Y.Z) but also X.Y for legacy compatibility
111 spec_list = spec_version.split(".")
112 if len(spec_list) not in [2, 3] or not all(
113 el.isdigit() for el in spec_list
114 ):
115 raise ValueError(f"Failed to parse spec_version {spec_version}")
116
117 # major version must match
118 if spec_list[0] != SPECIFICATION_VERSION[0]:
119 raise ValueError(f"Unsupported spec_version {spec_version}")
120
121 self.spec_version = spec_version
122
123 self.expires = expires or datetime.now(timezone.utc)
124
125 if version is None:
126 version = 1
127 elif version <= 0:
128 raise ValueError(f"version must be > 0, got {version}")
129 self.version = version
130
131 if unrecognized_fields is None:
132 unrecognized_fields = {}
133
134 self.unrecognized_fields = unrecognized_fields
135
136 def __eq__(self, other: object) -> bool:
137 if not isinstance(other, Signed):
138 return False
139
140 return (
141 self.type == other.type
142 and self.version == other.version
143 and self.spec_version == other.spec_version
144 and self.expires == other.expires
145 and self.unrecognized_fields == other.unrecognized_fields
146 )
147
148 @abc.abstractmethod
149 def to_dict(self) -> dict[str, Any]:
150 """Serialize and return a dict representation of self."""
151 raise NotImplementedError
152
153 @classmethod
154 @abc.abstractmethod
155 def from_dict(cls, signed_dict: dict[str, Any]) -> Signed:
156 """Deserialization helper, creates object from json/dict
157 representation.
158 """
159 raise NotImplementedError
160
161 @classmethod
162 def _common_fields_from_dict(
163 cls, signed_dict: dict[str, Any]
164 ) -> tuple[int, str, datetime]:
165 """Return common fields of ``Signed`` instances from the passed dict
166 representation, and returns an ordered list to be passed as leading
167 positional arguments to a subclass constructor.
168
169 See ``{Root, Timestamp, Snapshot, Targets}.from_dict``
170 methods for usage.
171
172 """
173 _type = signed_dict.pop("_type")
174 if _type != cls.type:
175 raise ValueError(f"Expected type {cls.type}, got {_type}")
176
177 version = signed_dict.pop("version")
178 spec_version = signed_dict.pop("spec_version")
179 expires_str = signed_dict.pop("expires")
180 # Convert 'expires' TUF metadata string to a datetime object, which is
181 # what the constructor expects and what we store. The inverse operation
182 # is implemented in '_common_fields_to_dict'.
183 expires = datetime.strptime(expires_str, "%Y-%m-%dT%H:%M:%SZ").replace(
184 tzinfo=timezone.utc
185 )
186
187 return version, spec_version, expires
188
189 def _common_fields_to_dict(self) -> dict[str, Any]:
190 """Return a dict representation of common fields of
191 ``Signed`` instances.
192
193 See ``{Root, Timestamp, Snapshot, Targets}.to_dict`` methods for usage.
194
195 """
196 return {
197 "_type": self._type,
198 "version": self.version,
199 "spec_version": self.spec_version,
200 "expires": self.expires.strftime("%Y-%m-%dT%H:%M:%SZ"),
201 **self.unrecognized_fields,
202 }
203
204 def is_expired(self, reference_time: datetime | None = None) -> bool:
205 """Check metadata expiration against a reference time.
206
207 Args:
208 reference_time: Time to check expiration date against. A naive
209 datetime in UTC expected. Default is current UTC date and time.
210
211 Returns:
212 ``True`` if expiration time is less than the reference time.
213 """
214 if reference_time is None:
215 reference_time = datetime.now(timezone.utc)
216
217 return reference_time >= self.expires
218
219
220class Role:
221 """Container that defines which keys are required to sign roles metadata.
222
223 Role defines how many keys are required to successfully sign the roles
224 metadata, and which keys are accepted.
225
226 *All parameters named below are not just constructor arguments but also
227 instance attributes.*
228
229 Args:
230 keyids: Roles signing key identifiers.
231 threshold: Number of keys required to sign this role's metadata.
232 unrecognized_fields: Dictionary of all attributes that are not managed
233 by TUF Metadata API
234
235 Raises:
236 ValueError: Invalid arguments.
237 """
238
239 def __init__(
240 self,
241 keyids: list[str],
242 threshold: int,
243 unrecognized_fields: dict[str, Any] | None = None,
244 ):
245 if len(set(keyids)) != len(keyids):
246 raise ValueError(f"Nonunique keyids: {keyids}")
247 if threshold < 1:
248 raise ValueError("threshold should be at least 1!")
249 self.keyids = keyids
250 self.threshold = threshold
251 if unrecognized_fields is None:
252 unrecognized_fields = {}
253
254 self.unrecognized_fields = unrecognized_fields
255
256 def __eq__(self, other: object) -> bool:
257 if not isinstance(other, Role):
258 return False
259
260 return (
261 self.keyids == other.keyids
262 and self.threshold == other.threshold
263 and self.unrecognized_fields == other.unrecognized_fields
264 )
265
266 @classmethod
267 def from_dict(cls, role_dict: dict[str, Any]) -> Role:
268 """Create ``Role`` object from its json/dict representation.
269
270 Raises:
271 ValueError, KeyError: Invalid arguments.
272 """
273 keyids = role_dict.pop("keyids")
274 threshold = role_dict.pop("threshold")
275 # All fields left in the role_dict are unrecognized.
276 return cls(keyids, threshold, role_dict)
277
278 def to_dict(self) -> dict[str, Any]:
279 """Return the dictionary representation of self."""
280 return {
281 "keyids": self.keyids,
282 "threshold": self.threshold,
283 **self.unrecognized_fields,
284 }
285
286
287@dataclass
288class VerificationResult:
289 """Signature verification result for delegated role metadata.
290
291 Attributes:
292 threshold: Number of required signatures.
293 signed: dict of keyid to Key, containing keys that have signed.
294 unsigned: dict of keyid to Key, containing keys that have not signed.
295 """
296
297 threshold: int
298 signed: dict[str, Key]
299 unsigned: dict[str, Key]
300
301 def __bool__(self) -> bool:
302 return self.verified
303
304 @property
305 def verified(self) -> bool:
306 """True if threshold of signatures is met."""
307 return len(self.signed) >= self.threshold
308
309 @property
310 def missing(self) -> int:
311 """Number of additional signatures required to reach threshold."""
312 return max(0, self.threshold - len(self.signed))
313
314
315@dataclass
316class RootVerificationResult:
317 """Signature verification result for root metadata.
318
319 Root must be verified by itself and the previous root version. This
320 dataclass represents both results. For the edge case of first version
321 of root, these underlying results are identical.
322
323 Note that `signed` and `unsigned` correctness requires the underlying
324 VerificationResult keys to not conflict (no reusing the same keyid for
325 different keys).
326
327 Attributes:
328 first: First underlying VerificationResult
329 second: Second underlying VerificationResult
330 """
331
332 first: VerificationResult
333 second: VerificationResult
334
335 def __bool__(self) -> bool:
336 return self.verified
337
338 @property
339 def verified(self) -> bool:
340 """True if threshold of signatures is met in both underlying
341 VerificationResults.
342 """
343 return self.first.verified and self.second.verified
344
345 @property
346 def signed(self) -> dict[str, Key]:
347 """Dictionary of all signing keys that have signed, from both
348 VerificationResults.
349 return a union of all signed (in python<3.9 this requires
350 dict unpacking)
351 """
352 return {**self.first.signed, **self.second.signed}
353
354 @property
355 def unsigned(self) -> dict[str, Key]:
356 """Dictionary of all signing keys that have not signed, from both
357 VerificationResults.
358 return a union of all unsigned (in python<3.9 this requires
359 dict unpacking)
360 """
361 return {**self.first.unsigned, **self.second.unsigned}
362
363
364class _DelegatorMixin(metaclass=abc.ABCMeta):
365 """Class that implements verify_delegate() for Root and Targets"""
366
367 @abc.abstractmethod
368 def get_delegated_role(self, delegated_role: str) -> Role:
369 """Return the role object for the given delegated role.
370
371 Raises ValueError if delegated_role is not actually delegated.
372 """
373 raise NotImplementedError
374
375 @abc.abstractmethod
376 def get_key(self, keyid: str) -> Key:
377 """Return the key object for the given keyid.
378
379 Raises ValueError if key is not found.
380 """
381 raise NotImplementedError
382
383 def get_verification_result(
384 self,
385 delegated_role: str,
386 payload: bytes,
387 signatures: dict[str, Signature],
388 ) -> VerificationResult:
389 """Return signature threshold verification result for delegated role.
390
391 NOTE: Unlike `verify_delegate()` this method does not raise, if the
392 role metadata is not fully verified.
393
394 Args:
395 delegated_role: Name of the delegated role to verify
396 payload: Signed payload bytes for the delegated role
397 signatures: Signatures over payload bytes
398
399 Raises:
400 ValueError: no delegation was found for ``delegated_role``.
401 """
402 role = self.get_delegated_role(delegated_role)
403
404 signed = {}
405 unsigned = {}
406
407 for keyid in role.keyids:
408 try:
409 key = self.get_key(keyid)
410 except ValueError:
411 logger.info("No key for keyid %s", keyid)
412 continue
413
414 if keyid not in signatures:
415 unsigned[keyid] = key
416 logger.info("No signature for keyid %s", keyid)
417 continue
418
419 sig = signatures[keyid]
420 try:
421 key.verify_signature(sig, payload)
422 signed[keyid] = key
423 except sslib_exceptions.UnverifiedSignatureError:
424 unsigned[keyid] = key
425 logger.info("Key %s failed to verify %s", keyid, delegated_role)
426
427 return VerificationResult(role.threshold, signed, unsigned)
428
429 def verify_delegate(
430 self,
431 delegated_role: str,
432 payload: bytes,
433 signatures: dict[str, Signature],
434 ) -> None:
435 """Verify signature threshold for delegated role.
436
437 Verify that there are enough valid ``signatures`` over ``payload``, to
438 meet the threshold of keys for ``delegated_role``, as defined by the
439 delegator (``self``).
440
441 Args:
442 delegated_role: Name of the delegated role to verify
443 payload: Signed payload bytes for the delegated role
444 signatures: Signatures over payload bytes
445
446 Raises:
447 UnsignedMetadataError: ``delegated_role`` was not signed with
448 required threshold of keys for ``role_name``.
449 ValueError: no delegation was found for ``delegated_role``.
450 """
451 result = self.get_verification_result(
452 delegated_role, payload, signatures
453 )
454 if not result:
455 raise UnsignedMetadataError(
456 f"{delegated_role} was signed by {len(result.signed)}/"
457 f"{result.threshold} keys"
458 )
459
460
461class Root(Signed, _DelegatorMixin):
462 """A container for the signed part of root metadata.
463
464 Parameters listed below are also instance attributes.
465
466 Args:
467 version: Metadata version number. Default is 1.
468 spec_version: Supported TUF specification version. Default is the
469 version currently supported by the library.
470 expires: Metadata expiry date. Default is current date and time.
471 keys: Dictionary of keyids to Keys. Defines the keys used in ``roles``.
472 Default is empty dictionary.
473 roles: Dictionary of role names to Roles. Defines which keys are
474 required to sign the metadata for a specific role. Default is
475 a dictionary of top level roles without keys and threshold of 1.
476 consistent_snapshot: ``True`` if repository supports consistent
477 snapshots. Default is True.
478 unrecognized_fields: Dictionary of all attributes that are not managed
479 by TUF Metadata API
480
481 Raises:
482 ValueError: Invalid arguments.
483 """
484
485 type = _ROOT
486
487 def __init__(
488 self,
489 version: int | None = None,
490 spec_version: str | None = None,
491 expires: datetime | None = None,
492 keys: dict[str, Key] | None = None,
493 roles: dict[str, Role] | None = None,
494 consistent_snapshot: bool | None = True,
495 unrecognized_fields: dict[str, Any] | None = None,
496 ):
497 super().__init__(version, spec_version, expires, unrecognized_fields)
498 self.consistent_snapshot = consistent_snapshot
499 self.keys = keys if keys is not None else {}
500
501 if roles is None:
502 roles = {r: Role([], 1) for r in TOP_LEVEL_ROLE_NAMES}
503 elif set(roles) != TOP_LEVEL_ROLE_NAMES:
504 raise ValueError("Role names must be the top-level metadata roles")
505 self.roles = roles
506
507 def __eq__(self, other: object) -> bool:
508 if not isinstance(other, Root):
509 return False
510
511 return (
512 super().__eq__(other)
513 and self.keys == other.keys
514 and self.roles == other.roles
515 and self.consistent_snapshot == other.consistent_snapshot
516 )
517
518 @classmethod
519 def from_dict(cls, signed_dict: dict[str, Any]) -> Root:
520 """Create ``Root`` object from its json/dict representation.
521
522 Raises:
523 ValueError, KeyError, TypeError: Invalid arguments.
524 """
525 common_args = cls._common_fields_from_dict(signed_dict)
526 consistent_snapshot = signed_dict.pop("consistent_snapshot", None)
527 keys = signed_dict.pop("keys")
528 roles = signed_dict.pop("roles")
529
530 for keyid, key_dict in keys.items():
531 keys[keyid] = Key.from_dict(keyid, key_dict)
532 for role_name, role_dict in roles.items():
533 roles[role_name] = Role.from_dict(role_dict)
534
535 # All fields left in the signed_dict are unrecognized.
536 return cls(*common_args, keys, roles, consistent_snapshot, signed_dict)
537
538 def to_dict(self) -> dict[str, Any]:
539 """Return the dict representation of self."""
540 root_dict = self._common_fields_to_dict()
541 keys = {keyid: key.to_dict() for (keyid, key) in self.keys.items()}
542 roles = {}
543 for role_name, role in self.roles.items():
544 roles[role_name] = role.to_dict()
545 if self.consistent_snapshot is not None:
546 root_dict["consistent_snapshot"] = self.consistent_snapshot
547
548 root_dict.update(
549 {
550 "keys": keys,
551 "roles": roles,
552 }
553 )
554 return root_dict
555
556 def add_key(self, key: Key, role: str) -> None:
557 """Add new signing key for delegated role ``role``.
558
559 Args:
560 key: Signing key to be added for ``role``.
561 role: Name of the role, for which ``key`` is added.
562
563 Raises:
564 ValueError: If the argument order is wrong or if ``role`` doesn't
565 exist.
566 """
567 # Verify that our users are not using the old argument order.
568 if isinstance(role, Key):
569 raise ValueError("Role must be a string, not a Key instance")
570
571 if role not in self.roles:
572 raise ValueError(f"Role {role} doesn't exist")
573 if key.keyid not in self.roles[role].keyids:
574 self.roles[role].keyids.append(key.keyid)
575 self.keys[key.keyid] = key
576
577 def revoke_key(self, keyid: str, role: str) -> None:
578 """Revoke key from ``role`` and updates the key store.
579
580 Args:
581 keyid: Identifier of the key to be removed for ``role``.
582 role: Name of the role, for which a signing key is removed.
583
584 Raises:
585 ValueError: If ``role`` doesn't exist or if ``role`` doesn't include
586 the key.
587 """
588 if role not in self.roles:
589 raise ValueError(f"Role {role} doesn't exist")
590 if keyid not in self.roles[role].keyids:
591 raise ValueError(f"Key with id {keyid} is not used by {role}")
592 self.roles[role].keyids.remove(keyid)
593 for keyinfo in self.roles.values():
594 if keyid in keyinfo.keyids:
595 return
596
597 del self.keys[keyid]
598
599 def get_delegated_role(self, delegated_role: str) -> Role:
600 """Return the role object for the given delegated role.
601
602 Raises ValueError if delegated_role is not actually delegated.
603 """
604 if delegated_role not in self.roles:
605 raise ValueError(f"Delegated role {delegated_role} not found")
606
607 return self.roles[delegated_role]
608
609 def get_key(self, keyid: str) -> Key:
610 if keyid not in self.keys:
611 raise ValueError(f"Key {keyid} not found")
612
613 return self.keys[keyid]
614
615 def get_root_verification_result(
616 self,
617 previous: Root | None,
618 payload: bytes,
619 signatures: dict[str, Signature],
620 ) -> RootVerificationResult:
621 """Return signature threshold verification result for two root roles.
622
623 Verify root metadata with two roles (`self` and optionally `previous`).
624
625 If the repository has no root role versions yet, `previous` can be left
626 None. In all other cases, `previous` must be the previous version of
627 the Root.
628
629 NOTE: Unlike `verify_delegate()` this method does not raise, if the
630 root metadata is not fully verified.
631
632 Args:
633 previous: The previous `Root` to verify payload with, or None
634 payload: Signed payload bytes for root
635 signatures: Signatures over payload bytes
636
637 Raises:
638 ValueError: no delegation was found for ``root`` or given Root
639 versions are not sequential.
640 """
641
642 if previous is None:
643 previous = self
644 elif self.version != previous.version + 1:
645 versions = f"v{previous.version} and v{self.version}"
646 raise ValueError(
647 f"Expected sequential root versions, got {versions}."
648 )
649
650 return RootVerificationResult(
651 previous.get_verification_result(Root.type, payload, signatures),
652 self.get_verification_result(Root.type, payload, signatures),
653 )
654
655
656class BaseFile:
657 """A base class of ``MetaFile`` and ``TargetFile``.
658
659 Encapsulates common static methods for length and hash verification.
660 """
661
662 @staticmethod
663 def _verify_hashes(
664 data: bytes | IO[bytes], expected_hashes: dict[str, str]
665 ) -> None:
666 """Verify that the hash of ``data`` matches ``expected_hashes``."""
667 is_bytes = isinstance(data, bytes)
668 for algo, exp_hash in expected_hashes.items():
669 try:
670 if is_bytes:
671 digest_object = sslib_hash.digest(algo)
672 digest_object.update(data)
673 else:
674 # if data is not bytes, assume it is a file object
675 digest_object = sslib_hash.digest_fileobject(data, algo)
676 except (
677 sslib_exceptions.UnsupportedAlgorithmError,
678 sslib_exceptions.FormatError,
679 ) as e:
680 raise LengthOrHashMismatchError(
681 f"Unsupported algorithm '{algo}'"
682 ) from e
683
684 observed_hash = digest_object.hexdigest()
685 if observed_hash != exp_hash:
686 raise LengthOrHashMismatchError(
687 f"Observed hash {observed_hash} does not match "
688 f"expected hash {exp_hash}"
689 )
690
691 @staticmethod
692 def _verify_length(data: bytes | IO[bytes], expected_length: int) -> None:
693 """Verify that the length of ``data`` matches ``expected_length``."""
694 if isinstance(data, bytes):
695 observed_length = len(data)
696 else:
697 # if data is not bytes, assume it is a file object
698 data.seek(0, io.SEEK_END)
699 observed_length = data.tell()
700
701 if observed_length != expected_length:
702 raise LengthOrHashMismatchError(
703 f"Observed length {observed_length} does not match "
704 f"expected length {expected_length}"
705 )
706
707 @staticmethod
708 def _validate_hashes(hashes: dict[str, str]) -> None:
709 if not hashes:
710 raise ValueError("Hashes must be a non empty dictionary")
711 for key, value in hashes.items():
712 if not (isinstance(key, str) and isinstance(value, str)):
713 raise TypeError("Hashes items must be strings")
714
715 @staticmethod
716 def _validate_length(length: int) -> None:
717 if length < 0:
718 raise ValueError(f"Length must be >= 0, got {length}")
719
720 @staticmethod
721 def _get_length_and_hashes(
722 data: bytes | IO[bytes], hash_algorithms: list[str] | None
723 ) -> tuple[int, dict[str, str]]:
724 """Calculate length and hashes of ``data``."""
725 if isinstance(data, bytes):
726 length = len(data)
727 else:
728 data.seek(0, io.SEEK_END)
729 length = data.tell()
730
731 hashes = {}
732
733 if hash_algorithms is None:
734 hash_algorithms = [sslib_hash.DEFAULT_HASH_ALGORITHM]
735
736 for algorithm in hash_algorithms:
737 try:
738 if isinstance(data, bytes):
739 digest_object = sslib_hash.digest(algorithm)
740 digest_object.update(data)
741 else:
742 digest_object = sslib_hash.digest_fileobject(
743 data, algorithm
744 )
745 except (
746 sslib_exceptions.UnsupportedAlgorithmError,
747 sslib_exceptions.FormatError,
748 ) as e:
749 raise ValueError(f"Unsupported algorithm '{algorithm}'") from e
750
751 hashes[algorithm] = digest_object.hexdigest()
752
753 return (length, hashes)
754
755
756class MetaFile(BaseFile):
757 """A container with information about a particular metadata file.
758
759 *All parameters named below are not just constructor arguments but also
760 instance attributes.*
761
762 Args:
763 version: Version of the metadata file.
764 length: Length of the metadata file in bytes.
765 hashes: Dictionary of hash algorithm names to hashes of the metadata
766 file content.
767 unrecognized_fields: Dictionary of all attributes that are not managed
768 by TUF Metadata API
769
770 Raises:
771 ValueError, TypeError: Invalid arguments.
772 """
773
774 def __init__(
775 self,
776 version: int = 1,
777 length: int | None = None,
778 hashes: dict[str, str] | None = None,
779 unrecognized_fields: dict[str, Any] | None = None,
780 ):
781 if version <= 0:
782 raise ValueError(f"Metafile version must be > 0, got {version}")
783 if length is not None:
784 self._validate_length(length)
785 if hashes is not None:
786 self._validate_hashes(hashes)
787
788 self.version = version
789 self.length = length
790 self.hashes = hashes
791 if unrecognized_fields is None:
792 unrecognized_fields = {}
793
794 self.unrecognized_fields = unrecognized_fields
795
796 def __eq__(self, other: object) -> bool:
797 if not isinstance(other, MetaFile):
798 return False
799
800 return (
801 self.version == other.version
802 and self.length == other.length
803 and self.hashes == other.hashes
804 and self.unrecognized_fields == other.unrecognized_fields
805 )
806
807 @classmethod
808 def from_dict(cls, meta_dict: dict[str, Any]) -> MetaFile:
809 """Create ``MetaFile`` object from its json/dict representation.
810
811 Raises:
812 ValueError, KeyError: Invalid arguments.
813 """
814 version = meta_dict.pop("version")
815 length = meta_dict.pop("length", None)
816 hashes = meta_dict.pop("hashes", None)
817
818 # All fields left in the meta_dict are unrecognized.
819 return cls(version, length, hashes, meta_dict)
820
821 @classmethod
822 def from_data(
823 cls,
824 version: int,
825 data: bytes | IO[bytes],
826 hash_algorithms: list[str],
827 ) -> MetaFile:
828 """Creates MetaFile object from bytes.
829 This constructor should only be used if hashes are wanted.
830 By default, MetaFile(ver) should be used.
831 Args:
832 version: Version of the metadata file.
833 data: Metadata bytes that the metafile represents.
834 hash_algorithms: Hash algorithms to create the hashes with. If not
835 specified, the securesystemslib default hash algorithm is used.
836
837 Raises:
838 ValueError: The hash algorithms list contains an unsupported
839 algorithm.
840 """
841 length, hashes = cls._get_length_and_hashes(data, hash_algorithms)
842 return cls(version, length, hashes)
843
844 def to_dict(self) -> dict[str, Any]:
845 """Return the dictionary representation of self."""
846 res_dict: dict[str, Any] = {
847 "version": self.version,
848 **self.unrecognized_fields,
849 }
850
851 if self.length is not None:
852 res_dict["length"] = self.length
853
854 if self.hashes is not None:
855 res_dict["hashes"] = self.hashes
856
857 return res_dict
858
859 def verify_length_and_hashes(self, data: bytes | IO[bytes]) -> None:
860 """Verify that the length and hashes of ``data`` match expected values.
861
862 Args:
863 data: File object or its content in bytes.
864
865 Raises:
866 LengthOrHashMismatchError: Calculated length or hashes do not
867 match expected values or hash algorithm is not supported.
868 """
869 if self.length is not None:
870 self._verify_length(data, self.length)
871
872 if self.hashes is not None:
873 self._verify_hashes(data, self.hashes)
874
875
876class Timestamp(Signed):
877 """A container for the signed part of timestamp metadata.
878
879 TUF file format uses a dictionary to contain the snapshot information:
880 this is not the case with ``Timestamp.snapshot_meta`` which is a
881 ``MetaFile``.
882
883 *All parameters named below are not just constructor arguments but also
884 instance attributes.*
885
886 Args:
887 version: Metadata version number. Default is 1.
888 spec_version: Supported TUF specification version. Default is the
889 version currently supported by the library.
890 expires: Metadata expiry date. Default is current date and time.
891 unrecognized_fields: Dictionary of all attributes that are not managed
892 by TUF Metadata API
893 snapshot_meta: Meta information for snapshot metadata. Default is a
894 MetaFile with version 1.
895
896 Raises:
897 ValueError: Invalid arguments.
898 """
899
900 type = _TIMESTAMP
901
902 def __init__(
903 self,
904 version: int | None = None,
905 spec_version: str | None = None,
906 expires: datetime | None = None,
907 snapshot_meta: MetaFile | None = None,
908 unrecognized_fields: dict[str, Any] | None = None,
909 ):
910 super().__init__(version, spec_version, expires, unrecognized_fields)
911 self.snapshot_meta = snapshot_meta or MetaFile(1)
912
913 def __eq__(self, other: object) -> bool:
914 if not isinstance(other, Timestamp):
915 return False
916
917 return (
918 super().__eq__(other) and self.snapshot_meta == other.snapshot_meta
919 )
920
921 @classmethod
922 def from_dict(cls, signed_dict: dict[str, Any]) -> Timestamp:
923 """Create ``Timestamp`` object from its json/dict representation.
924
925 Raises:
926 ValueError, KeyError: Invalid arguments.
927 """
928 common_args = cls._common_fields_from_dict(signed_dict)
929 meta_dict = signed_dict.pop("meta")
930 snapshot_meta = MetaFile.from_dict(meta_dict["snapshot.json"])
931 # All fields left in the timestamp_dict are unrecognized.
932 return cls(*common_args, snapshot_meta, signed_dict)
933
934 def to_dict(self) -> dict[str, Any]:
935 """Return the dict representation of self."""
936 res_dict = self._common_fields_to_dict()
937 res_dict["meta"] = {"snapshot.json": self.snapshot_meta.to_dict()}
938 return res_dict
939
940
941class Snapshot(Signed):
942 """A container for the signed part of snapshot metadata.
943
944 Snapshot contains information about all target Metadata files.
945
946 *All parameters named below are not just constructor arguments but also
947 instance attributes.*
948
949 Args:
950 version: Metadata version number. Default is 1.
951 spec_version: Supported TUF specification version. Default is the
952 version currently supported by the library.
953 expires: Metadata expiry date. Default is current date and time.
954 unrecognized_fields: Dictionary of all attributes that are not managed
955 by TUF Metadata API
956 meta: Dictionary of targets filenames to ``MetaFile`` objects. Default
957 is a dictionary with a Metafile for "snapshot.json" version 1.
958
959 Raises:
960 ValueError: Invalid arguments.
961 """
962
963 type = _SNAPSHOT
964
965 def __init__(
966 self,
967 version: int | None = None,
968 spec_version: str | None = None,
969 expires: datetime | None = None,
970 meta: dict[str, MetaFile] | None = None,
971 unrecognized_fields: dict[str, Any] | None = None,
972 ):
973 super().__init__(version, spec_version, expires, unrecognized_fields)
974 self.meta = meta if meta is not None else {"targets.json": MetaFile(1)}
975
976 def __eq__(self, other: object) -> bool:
977 if not isinstance(other, Snapshot):
978 return False
979
980 return super().__eq__(other) and self.meta == other.meta
981
982 @classmethod
983 def from_dict(cls, signed_dict: dict[str, Any]) -> Snapshot:
984 """Create ``Snapshot`` object from its json/dict representation.
985
986 Raises:
987 ValueError, KeyError: Invalid arguments.
988 """
989 common_args = cls._common_fields_from_dict(signed_dict)
990 meta_dicts = signed_dict.pop("meta")
991 meta = {}
992 for meta_path, meta_dict in meta_dicts.items():
993 meta[meta_path] = MetaFile.from_dict(meta_dict)
994 # All fields left in the snapshot_dict are unrecognized.
995 return cls(*common_args, meta, signed_dict)
996
997 def to_dict(self) -> dict[str, Any]:
998 """Return the dict representation of self."""
999 snapshot_dict = self._common_fields_to_dict()
1000 meta_dict = {}
1001 for meta_path, meta_info in self.meta.items():
1002 meta_dict[meta_path] = meta_info.to_dict()
1003
1004 snapshot_dict["meta"] = meta_dict
1005 return snapshot_dict
1006
1007
1008class DelegatedRole(Role):
1009 """A container with information about a delegated role.
1010
1011 A delegation can happen in two ways:
1012
1013 - ``paths`` is set: delegates targets matching any path pattern in
1014 ``paths``
1015 - ``path_hash_prefixes`` is set: delegates targets whose target path
1016 hash starts with any of the prefixes in ``path_hash_prefixes``
1017
1018 ``paths`` and ``path_hash_prefixes`` are mutually exclusive:
1019 both cannot be set, at least one of them must be set.
1020
1021 *All parameters named below are not just constructor arguments but also
1022 instance attributes.*
1023
1024 Args:
1025 name: Delegated role name.
1026 keyids: Delegated role signing key identifiers.
1027 threshold: Number of keys required to sign this role's metadata.
1028 terminating: ``True`` if this delegation terminates a target lookup.
1029 paths: Path patterns. See note above.
1030 path_hash_prefixes: Hash prefixes. See note above.
1031 unrecognized_fields: Dictionary of all attributes that are not managed
1032 by TUF Metadata API.
1033
1034 Raises:
1035 ValueError: Invalid arguments.
1036 """
1037
1038 def __init__(
1039 self,
1040 name: str,
1041 keyids: list[str],
1042 threshold: int,
1043 terminating: bool,
1044 paths: list[str] | None = None,
1045 path_hash_prefixes: list[str] | None = None,
1046 unrecognized_fields: dict[str, Any] | None = None,
1047 ):
1048 super().__init__(keyids, threshold, unrecognized_fields)
1049 self.name = name
1050 self.terminating = terminating
1051 exclusive_vars = [paths, path_hash_prefixes]
1052 if sum(1 for var in exclusive_vars if var is not None) != 1:
1053 raise ValueError(
1054 "Only one of (paths, path_hash_prefixes) must be set"
1055 )
1056
1057 if paths is not None and any(not isinstance(p, str) for p in paths):
1058 raise ValueError("Paths must be strings")
1059 if path_hash_prefixes is not None and any(
1060 not isinstance(p, str) for p in path_hash_prefixes
1061 ):
1062 raise ValueError("Path_hash_prefixes must be strings")
1063
1064 self.paths = paths
1065 self.path_hash_prefixes = path_hash_prefixes
1066
1067 def __eq__(self, other: object) -> bool:
1068 if not isinstance(other, DelegatedRole):
1069 return False
1070
1071 return (
1072 super().__eq__(other)
1073 and self.name == other.name
1074 and self.terminating == other.terminating
1075 and self.paths == other.paths
1076 and self.path_hash_prefixes == other.path_hash_prefixes
1077 )
1078
1079 @classmethod
1080 def from_dict(cls, role_dict: dict[str, Any]) -> DelegatedRole:
1081 """Create ``DelegatedRole`` object from its json/dict representation.
1082
1083 Raises:
1084 ValueError, KeyError, TypeError: Invalid arguments.
1085 """
1086 name = role_dict.pop("name")
1087 keyids = role_dict.pop("keyids")
1088 threshold = role_dict.pop("threshold")
1089 terminating = role_dict.pop("terminating")
1090 paths = role_dict.pop("paths", None)
1091 path_hash_prefixes = role_dict.pop("path_hash_prefixes", None)
1092 # All fields left in the role_dict are unrecognized.
1093 return cls(
1094 name,
1095 keyids,
1096 threshold,
1097 terminating,
1098 paths,
1099 path_hash_prefixes,
1100 role_dict,
1101 )
1102
1103 def to_dict(self) -> dict[str, Any]:
1104 """Return the dict representation of self."""
1105 base_role_dict = super().to_dict()
1106 res_dict = {
1107 "name": self.name,
1108 "terminating": self.terminating,
1109 **base_role_dict,
1110 }
1111 if self.paths is not None:
1112 res_dict["paths"] = self.paths
1113 elif self.path_hash_prefixes is not None:
1114 res_dict["path_hash_prefixes"] = self.path_hash_prefixes
1115 return res_dict
1116
1117 @staticmethod
1118 def _is_target_in_pathpattern(targetpath: str, pathpattern: str) -> bool:
1119 """Determine whether ``targetpath`` matches the ``pathpattern``."""
1120 # We need to make sure that targetpath and pathpattern are pointing to
1121 # the same directory as fnmatch doesn't threat "/" as a special symbol.
1122 target_parts = targetpath.split("/")
1123 pattern_parts = pathpattern.split("/")
1124 if len(target_parts) != len(pattern_parts):
1125 return False
1126
1127 # Every part in the pathpattern could include a glob pattern, that's why
1128 # each of the target and pathpattern parts should match.
1129 for target_dir, pattern_dir in zip(target_parts, pattern_parts):
1130 if not fnmatch.fnmatch(target_dir, pattern_dir):
1131 return False
1132
1133 return True
1134
1135 def is_delegated_path(self, target_filepath: str) -> bool:
1136 """Determine whether the given ``target_filepath`` is in one of
1137 the paths that ``DelegatedRole`` is trusted to provide.
1138
1139 The ``target_filepath`` and the ``DelegatedRole`` paths are expected to
1140 be in their canonical forms, so e.g. "a/b" instead of "a//b" . Only "/"
1141 is supported as target path separator. Leading separators are not
1142 handled as special cases (see `TUF specification on targetpath
1143 <https://theupdateframework.github.io/specification/latest/#targetpath>`_).
1144
1145 Args:
1146 target_filepath: URL path to a target file, relative to a base
1147 targets URL.
1148 """
1149
1150 if self.path_hash_prefixes is not None:
1151 # Calculate the hash of the filepath
1152 # to determine in which bin to find the target.
1153 digest_object = sslib_hash.digest(algorithm="sha256")
1154 digest_object.update(target_filepath.encode("utf-8"))
1155 target_filepath_hash = digest_object.hexdigest()
1156
1157 for path_hash_prefix in self.path_hash_prefixes:
1158 if target_filepath_hash.startswith(path_hash_prefix):
1159 return True
1160
1161 elif self.paths is not None:
1162 for pathpattern in self.paths:
1163 # A delegated role path may be an explicit path or glob
1164 # pattern (Unix shell-style wildcards).
1165 if self._is_target_in_pathpattern(target_filepath, pathpattern):
1166 return True
1167
1168 return False
1169
1170
1171class SuccinctRoles(Role):
1172 """Succinctly defines a hash bin delegation graph.
1173
1174 A ``SuccinctRoles`` object describes a delegation graph that covers all
1175 targets, distributing them uniformly over the delegated roles (i.e. bins)
1176 in the graph.
1177
1178 The total number of bins is 2 to the power of the passed ``bit_length``.
1179
1180 Bin names are the concatenation of the passed ``name_prefix`` and a
1181 zero-padded hex representation of the bin index separated by a hyphen.
1182
1183 The passed ``keyids`` and ``threshold`` is used for each bin, and each bin
1184 is 'terminating'.
1185
1186 For details: https://github.com/theupdateframework/taps/blob/master/tap15.md
1187
1188 Args:
1189 keyids: Signing key identifiers for any bin metadata.
1190 threshold: Number of keys required to sign any bin metadata.
1191 bit_length: Number of bits between 1 and 32.
1192 name_prefix: Prefix of all bin names.
1193 unrecognized_fields: Dictionary of all attributes that are not managed
1194 by TUF Metadata API.
1195
1196 Raises:
1197 ValueError, TypeError, AttributeError: Invalid arguments.
1198 """
1199
1200 def __init__(
1201 self,
1202 keyids: list[str],
1203 threshold: int,
1204 bit_length: int,
1205 name_prefix: str,
1206 unrecognized_fields: dict[str, Any] | None = None,
1207 ) -> None:
1208 super().__init__(keyids, threshold, unrecognized_fields)
1209
1210 if bit_length <= 0 or bit_length > 32:
1211 raise ValueError("bit_length must be between 1 and 32")
1212 if not isinstance(name_prefix, str):
1213 raise ValueError("name_prefix must be a string")
1214
1215 self.bit_length = bit_length
1216 self.name_prefix = name_prefix
1217
1218 # Calculate the suffix_len value based on the total number of bins in
1219 # hex. If bit_length = 10 then number_of_bins = 1024 or bin names will
1220 # have a suffix between "000" and "3ff" in hex and suffix_len will be 3
1221 # meaning the third bin will have a suffix of "003".
1222 self.number_of_bins = 2**bit_length
1223 # suffix_len is calculated based on "number_of_bins - 1" as the name
1224 # of the last bin contains the number "number_of_bins -1" as a suffix.
1225 self.suffix_len = len(f"{self.number_of_bins - 1:x}")
1226
1227 def __eq__(self, other: object) -> bool:
1228 if not isinstance(other, SuccinctRoles):
1229 return False
1230
1231 return (
1232 super().__eq__(other)
1233 and self.bit_length == other.bit_length
1234 and self.name_prefix == other.name_prefix
1235 )
1236
1237 @classmethod
1238 def from_dict(cls, role_dict: dict[str, Any]) -> SuccinctRoles:
1239 """Create ``SuccinctRoles`` object from its json/dict representation.
1240
1241 Raises:
1242 ValueError, KeyError, AttributeError, TypeError: Invalid arguments.
1243 """
1244 keyids = role_dict.pop("keyids")
1245 threshold = role_dict.pop("threshold")
1246 bit_length = role_dict.pop("bit_length")
1247 name_prefix = role_dict.pop("name_prefix")
1248 # All fields left in the role_dict are unrecognized.
1249 return cls(keyids, threshold, bit_length, name_prefix, role_dict)
1250
1251 def to_dict(self) -> dict[str, Any]:
1252 """Return the dict representation of self."""
1253 base_role_dict = super().to_dict()
1254 return {
1255 "bit_length": self.bit_length,
1256 "name_prefix": self.name_prefix,
1257 **base_role_dict,
1258 }
1259
1260 def get_role_for_target(self, target_filepath: str) -> str:
1261 """Calculate the name of the delegated role responsible for
1262 ``target_filepath``.
1263
1264 The target at path ``target_filepath`` is assigned to a bin by casting
1265 the left-most ``bit_length`` of bits of the file path hash digest to
1266 int, using it as bin index between 0 and ``2**bit_length - 1``.
1267
1268 Args:
1269 target_filepath: URL path to a target file, relative to a base
1270 targets URL.
1271 """
1272 hasher = sslib_hash.digest(algorithm="sha256")
1273 hasher.update(target_filepath.encode("utf-8"))
1274
1275 # We can't ever need more than 4 bytes (32 bits).
1276 hash_bytes = hasher.digest()[:4]
1277 # Right shift hash bytes, so that we only have the leftmost
1278 # bit_length bits that we care about.
1279 shift_value = 32 - self.bit_length
1280 bin_number = int.from_bytes(hash_bytes, byteorder="big") >> shift_value
1281 # Add zero padding if necessary and cast to hex the suffix.
1282 suffix = f"{bin_number:0{self.suffix_len}x}"
1283 return f"{self.name_prefix}-{suffix}"
1284
1285 def get_roles(self) -> Iterator[str]:
1286 """Yield the names of all different delegated roles one by one."""
1287 for i in range(self.number_of_bins):
1288 suffix = f"{i:0{self.suffix_len}x}"
1289 yield f"{self.name_prefix}-{suffix}"
1290
1291 def is_delegated_role(self, role_name: str) -> bool:
1292 """Determine whether the given ``role_name`` is in one of
1293 the delegated roles that ``SuccinctRoles`` represents.
1294
1295 Args:
1296 role_name: The name of the role to check against.
1297 """
1298 desired_prefix = self.name_prefix + "-"
1299
1300 if not role_name.startswith(desired_prefix):
1301 return False
1302
1303 suffix = role_name[len(desired_prefix) :]
1304 if len(suffix) != self.suffix_len:
1305 return False
1306
1307 try:
1308 # make sure suffix is hex value
1309 num = int(suffix, 16)
1310 except ValueError:
1311 return False
1312
1313 return 0 <= num < self.number_of_bins
1314
1315
1316class Delegations:
1317 """A container object storing information about all delegations.
1318
1319 *All parameters named below are not just constructor arguments but also
1320 instance attributes.*
1321
1322 Args:
1323 keys: Dictionary of keyids to Keys. Defines the keys used in ``roles``.
1324 roles: Ordered dictionary of role names to DelegatedRoles instances. It
1325 defines which keys are required to sign the metadata for a specific
1326 role. The roles order also defines the order that role delegations
1327 are considered during target searches.
1328 succinct_roles: Contains succinct information about hash bin
1329 delegations. Note that succinct roles is not a TUF specification
1330 feature yet and setting `succinct_roles` to a value makes the
1331 resulting metadata non-compliant. The metadata will not be accepted
1332 as valid by specification compliant clients such as those built with
1333 python-tuf <= 1.1.0. For more information see: https://github.com/theupdateframework/taps/blob/master/tap15.md
1334 unrecognized_fields: Dictionary of all attributes that are not managed
1335 by TUF Metadata API
1336
1337 Exactly one of ``roles`` and ``succinct_roles`` must be set.
1338
1339 Raises:
1340 ValueError: Invalid arguments.
1341 """
1342
1343 def __init__(
1344 self,
1345 keys: dict[str, Key],
1346 roles: dict[str, DelegatedRole] | None = None,
1347 succinct_roles: SuccinctRoles | None = None,
1348 unrecognized_fields: dict[str, Any] | None = None,
1349 ):
1350 self.keys = keys
1351 if sum(1 for v in [roles, succinct_roles] if v is not None) != 1:
1352 raise ValueError("One of roles and succinct_roles must be set")
1353
1354 if roles is not None:
1355 for role in roles:
1356 if not role or role in TOP_LEVEL_ROLE_NAMES:
1357 raise ValueError(
1358 "Delegated roles cannot be empty or use top-level "
1359 "role names"
1360 )
1361
1362 self.roles = roles
1363 self.succinct_roles = succinct_roles
1364 if unrecognized_fields is None:
1365 unrecognized_fields = {}
1366
1367 self.unrecognized_fields = unrecognized_fields
1368
1369 def __eq__(self, other: object) -> bool:
1370 if not isinstance(other, Delegations):
1371 return False
1372
1373 all_attributes_check = (
1374 self.keys == other.keys
1375 and self.roles == other.roles
1376 and self.succinct_roles == other.succinct_roles
1377 and self.unrecognized_fields == other.unrecognized_fields
1378 )
1379
1380 if self.roles is not None and other.roles is not None:
1381 all_attributes_check = (
1382 all_attributes_check
1383 # Order of the delegated roles matters (see issue #1788).
1384 and list(self.roles.items()) == list(other.roles.items())
1385 )
1386
1387 return all_attributes_check
1388
1389 @classmethod
1390 def from_dict(cls, delegations_dict: dict[str, Any]) -> Delegations:
1391 """Create ``Delegations`` object from its json/dict representation.
1392
1393 Raises:
1394 ValueError, KeyError, TypeError: Invalid arguments.
1395 """
1396 keys = delegations_dict.pop("keys")
1397 keys_res = {}
1398 for keyid, key_dict in keys.items():
1399 keys_res[keyid] = Key.from_dict(keyid, key_dict)
1400 roles = delegations_dict.pop("roles", None)
1401 roles_res: dict[str, DelegatedRole] | None = None
1402
1403 if roles is not None:
1404 roles_res = {}
1405 for role_dict in roles:
1406 new_role = DelegatedRole.from_dict(role_dict)
1407 if new_role.name in roles_res:
1408 raise ValueError(f"Duplicate role {new_role.name}")
1409 roles_res[new_role.name] = new_role
1410
1411 succinct_roles_dict = delegations_dict.pop("succinct_roles", None)
1412 succinct_roles_info = None
1413 if succinct_roles_dict is not None:
1414 succinct_roles_info = SuccinctRoles.from_dict(succinct_roles_dict)
1415
1416 # All fields left in the delegations_dict are unrecognized.
1417 return cls(keys_res, roles_res, succinct_roles_info, delegations_dict)
1418
1419 def to_dict(self) -> dict[str, Any]:
1420 """Return the dict representation of self."""
1421 keys = {keyid: key.to_dict() for keyid, key in self.keys.items()}
1422 res_dict: dict[str, Any] = {
1423 "keys": keys,
1424 **self.unrecognized_fields,
1425 }
1426 if self.roles is not None:
1427 roles = [role_obj.to_dict() for role_obj in self.roles.values()]
1428 res_dict["roles"] = roles
1429 elif self.succinct_roles is not None:
1430 res_dict["succinct_roles"] = self.succinct_roles.to_dict()
1431
1432 return res_dict
1433
1434 def get_roles_for_target(
1435 self, target_filepath: str
1436 ) -> Iterator[tuple[str, bool]]:
1437 """Given ``target_filepath`` get names and terminating status of all
1438 delegated roles who are responsible for it.
1439
1440 Args:
1441 target_filepath: URL path to a target file, relative to a base
1442 targets URL.
1443 """
1444 if self.roles is not None:
1445 for role in self.roles.values():
1446 if role.is_delegated_path(target_filepath):
1447 yield role.name, role.terminating
1448
1449 elif self.succinct_roles is not None:
1450 # We consider all succinct_roles as terminating.
1451 # For more information read TAP 15.
1452 yield self.succinct_roles.get_role_for_target(target_filepath), True
1453
1454
1455class TargetFile(BaseFile):
1456 """A container with information about a particular target file.
1457
1458 *All parameters named below are not just constructor arguments but also
1459 instance attributes.*
1460
1461 Args:
1462 length: Length of the target file in bytes.
1463 hashes: Dictionary of hash algorithm names to hashes of the target
1464 file content.
1465 path: URL path to a target file, relative to a base targets URL.
1466 unrecognized_fields: Dictionary of all attributes that are not managed
1467 by TUF Metadata API
1468
1469 Raises:
1470 ValueError, TypeError: Invalid arguments.
1471 """
1472
1473 def __init__(
1474 self,
1475 length: int,
1476 hashes: dict[str, str],
1477 path: str,
1478 unrecognized_fields: dict[str, Any] | None = None,
1479 ):
1480 self._validate_length(length)
1481 self._validate_hashes(hashes)
1482
1483 self.length = length
1484 self.hashes = hashes
1485 self.path = path
1486 if unrecognized_fields is None:
1487 unrecognized_fields = {}
1488
1489 self.unrecognized_fields = unrecognized_fields
1490
1491 @property
1492 def custom(self) -> Any: # noqa: ANN401
1493 """Get implementation specific data related to the target.
1494
1495 python-tuf does not use or validate this data.
1496 """
1497 return self.unrecognized_fields.get("custom")
1498
1499 def __eq__(self, other: object) -> bool:
1500 if not isinstance(other, TargetFile):
1501 return False
1502
1503 return (
1504 self.length == other.length
1505 and self.hashes == other.hashes
1506 and self.path == other.path
1507 and self.unrecognized_fields == other.unrecognized_fields
1508 )
1509
1510 @classmethod
1511 def from_dict(cls, target_dict: dict[str, Any], path: str) -> TargetFile:
1512 """Create ``TargetFile`` object from its json/dict representation.
1513
1514 Raises:
1515 ValueError, KeyError, TypeError: Invalid arguments.
1516 """
1517 length = target_dict.pop("length")
1518 hashes = target_dict.pop("hashes")
1519
1520 # All fields left in the target_dict are unrecognized.
1521 return cls(length, hashes, path, target_dict)
1522
1523 def to_dict(self) -> dict[str, Any]:
1524 """Return the JSON-serializable dictionary representation of self."""
1525 return {
1526 "length": self.length,
1527 "hashes": self.hashes,
1528 **self.unrecognized_fields,
1529 }
1530
1531 @classmethod
1532 def from_file(
1533 cls,
1534 target_file_path: str,
1535 local_path: str,
1536 hash_algorithms: list[str] | None = None,
1537 ) -> TargetFile:
1538 """Create ``TargetFile`` object from a file.
1539
1540 Args:
1541 target_file_path: URL path to a target file, relative to a base
1542 targets URL.
1543 local_path: Local path to target file content.
1544 hash_algorithms: Hash algorithms to calculate hashes with. If not
1545 specified the securesystemslib default hash algorithm is used.
1546
1547 Raises:
1548 FileNotFoundError: The file doesn't exist.
1549 ValueError: The hash algorithms list contains an unsupported
1550 algorithm.
1551 """
1552 with open(local_path, "rb") as file:
1553 return cls.from_data(target_file_path, file, hash_algorithms)
1554
1555 @classmethod
1556 def from_data(
1557 cls,
1558 target_file_path: str,
1559 data: bytes | IO[bytes],
1560 hash_algorithms: list[str] | None = None,
1561 ) -> TargetFile:
1562 """Create ``TargetFile`` object from bytes.
1563
1564 Args:
1565 target_file_path: URL path to a target file, relative to a base
1566 targets URL.
1567 data: Target file content.
1568 hash_algorithms: Hash algorithms to create the hashes with. If not
1569 specified the securesystemslib default hash algorithm is used.
1570
1571 Raises:
1572 ValueError: The hash algorithms list contains an unsupported
1573 algorithm.
1574 """
1575 length, hashes = cls._get_length_and_hashes(data, hash_algorithms)
1576 return cls(length, hashes, target_file_path)
1577
1578 def verify_length_and_hashes(self, data: bytes | IO[bytes]) -> None:
1579 """Verify that length and hashes of ``data`` match expected values.
1580
1581 Args:
1582 data: Target file object or its content in bytes.
1583
1584 Raises:
1585 LengthOrHashMismatchError: Calculated length or hashes do not
1586 match expected values or hash algorithm is not supported.
1587 """
1588 self._verify_length(data, self.length)
1589 self._verify_hashes(data, self.hashes)
1590
1591 def get_prefixed_paths(self) -> list[str]:
1592 """
1593 Return hash-prefixed URL path fragments for the target file path.
1594 """
1595 paths = []
1596 parent, sep, name = self.path.rpartition("/")
1597 for hash_value in self.hashes.values():
1598 paths.append(f"{parent}{sep}{hash_value}.{name}")
1599
1600 return paths
1601
1602
1603class Targets(Signed, _DelegatorMixin):
1604 """A container for the signed part of targets metadata.
1605
1606 Targets contains verifying information about target files and also
1607 delegates responsibility to other Targets roles.
1608
1609 *All parameters named below are not just constructor arguments but also
1610 instance attributes.*
1611
1612 Args:
1613 version: Metadata version number. Default is 1.
1614 spec_version: Supported TUF specification version. Default is the
1615 version currently supported by the library.
1616 expires: Metadata expiry date. Default is current date and time.
1617 targets: Dictionary of target filenames to TargetFiles. Default is an
1618 empty dictionary.
1619 delegations: Defines how this Targets delegates responsibility to other
1620 Targets Metadata files. Default is None.
1621 unrecognized_fields: Dictionary of all attributes that are not managed
1622 by TUF Metadata API
1623
1624 Raises:
1625 ValueError: Invalid arguments.
1626 """
1627
1628 type = _TARGETS
1629
1630 def __init__(
1631 self,
1632 version: int | None = None,
1633 spec_version: str | None = None,
1634 expires: datetime | None = None,
1635 targets: dict[str, TargetFile] | None = None,
1636 delegations: Delegations | None = None,
1637 unrecognized_fields: dict[str, Any] | None = None,
1638 ) -> None:
1639 super().__init__(version, spec_version, expires, unrecognized_fields)
1640 self.targets = targets if targets is not None else {}
1641 self.delegations = delegations
1642
1643 def __eq__(self, other: object) -> bool:
1644 if not isinstance(other, Targets):
1645 return False
1646
1647 return (
1648 super().__eq__(other)
1649 and self.targets == other.targets
1650 and self.delegations == other.delegations
1651 )
1652
1653 @classmethod
1654 def from_dict(cls, signed_dict: dict[str, Any]) -> Targets:
1655 """Create ``Targets`` object from its json/dict representation.
1656
1657 Raises:
1658 ValueError, KeyError, TypeError: Invalid arguments.
1659 """
1660 common_args = cls._common_fields_from_dict(signed_dict)
1661 targets = signed_dict.pop(_TARGETS)
1662 try:
1663 delegations_dict = signed_dict.pop("delegations")
1664 except KeyError:
1665 delegations = None
1666 else:
1667 delegations = Delegations.from_dict(delegations_dict)
1668 res_targets = {}
1669 for target_path, target_info in targets.items():
1670 res_targets[target_path] = TargetFile.from_dict(
1671 target_info, target_path
1672 )
1673 # All fields left in the targets_dict are unrecognized.
1674 return cls(*common_args, res_targets, delegations, signed_dict)
1675
1676 def to_dict(self) -> dict[str, Any]:
1677 """Return the dict representation of self."""
1678 targets_dict = self._common_fields_to_dict()
1679 targets = {}
1680 for target_path, target_file_obj in self.targets.items():
1681 targets[target_path] = target_file_obj.to_dict()
1682 targets_dict[_TARGETS] = targets
1683 if self.delegations is not None:
1684 targets_dict["delegations"] = self.delegations.to_dict()
1685 return targets_dict
1686
1687 def add_key(self, key: Key, role: str | None = None) -> None:
1688 """Add new signing key for delegated role ``role``.
1689
1690 If succinct_roles is used then the ``role`` argument is not required.
1691
1692 Args:
1693 key: Signing key to be added for ``role``.
1694 role: Name of the role, for which ``key`` is added.
1695
1696 Raises:
1697 ValueError: If the argument order is wrong or if there are no
1698 delegated roles or if ``role`` is not delegated by this Target.
1699 """
1700 # Verify that our users are not using the old argument order.
1701 if isinstance(role, Key):
1702 raise ValueError("Role must be a string, not a Key instance")
1703
1704 if self.delegations is None:
1705 raise ValueError(f"Delegated role {role} doesn't exist")
1706
1707 if self.delegations.roles is not None:
1708 if role not in self.delegations.roles:
1709 raise ValueError(f"Delegated role {role} doesn't exist")
1710 if key.keyid not in self.delegations.roles[role].keyids:
1711 self.delegations.roles[role].keyids.append(key.keyid)
1712
1713 elif self.delegations.succinct_roles is not None:
1714 if key.keyid not in self.delegations.succinct_roles.keyids:
1715 self.delegations.succinct_roles.keyids.append(key.keyid)
1716
1717 self.delegations.keys[key.keyid] = key
1718
1719 def revoke_key(self, keyid: str, role: str | None = None) -> None:
1720 """Revokes key from delegated role ``role`` and updates the delegations
1721 key store.
1722
1723 If succinct_roles is used then the ``role`` argument is not required.
1724
1725 Args:
1726 keyid: Identifier of the key to be removed for ``role``.
1727 role: Name of the role, for which a signing key is removed.
1728
1729 Raises:
1730 ValueError: If there are no delegated roles or if ``role`` is not
1731 delegated by this ``Target`` or if key is not used by ``role``
1732 or if key with id ``keyid`` is not used by succinct roles.
1733 """
1734 if self.delegations is None:
1735 raise ValueError(f"Delegated role {role} doesn't exist")
1736
1737 if self.delegations.roles is not None:
1738 if role not in self.delegations.roles:
1739 raise ValueError(f"Delegated role {role} doesn't exist")
1740 if keyid not in self.delegations.roles[role].keyids:
1741 raise ValueError(f"Key with id {keyid} is not used by {role}")
1742
1743 self.delegations.roles[role].keyids.remove(keyid)
1744 for keyinfo in self.delegations.roles.values():
1745 if keyid in keyinfo.keyids:
1746 return
1747
1748 elif self.delegations.succinct_roles is not None:
1749 if keyid not in self.delegations.succinct_roles.keyids:
1750 raise ValueError(
1751 f"Key with id {keyid} is not used by succinct_roles"
1752 )
1753
1754 self.delegations.succinct_roles.keyids.remove(keyid)
1755
1756 del self.delegations.keys[keyid]
1757
1758 def get_delegated_role(self, delegated_role: str) -> Role:
1759 """Return the role object for the given delegated role.
1760
1761 Raises ValueError if delegated_role is not actually delegated.
1762 """
1763 if self.delegations is None:
1764 raise ValueError("No delegations found")
1765
1766 role: Role | None = None
1767 if self.delegations.roles is not None:
1768 role = self.delegations.roles.get(delegated_role)
1769 elif self.delegations.succinct_roles is not None:
1770 succinct = self.delegations.succinct_roles
1771 if succinct.is_delegated_role(delegated_role):
1772 role = succinct
1773
1774 if not role:
1775 raise ValueError(f"Delegated role {delegated_role} not found")
1776
1777 return role
1778
1779 def get_key(self, keyid: str) -> Key:
1780 if self.delegations is None:
1781 raise ValueError("No delegations found")
1782 if keyid not in self.delegations.keys:
1783 raise ValueError(f"Key {keyid} not found")
1784
1785 return self.delegations.keys[keyid]