Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tuf/api/_payload.py: 26%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright the TUF contributors
2# SPDX-License-Identifier: MIT OR Apache-2.0
5"""Helper classes for low-level Metadata API."""
7from __future__ import annotations
9import abc
10import fnmatch
11import io
12import logging
13from dataclasses import dataclass
14from datetime import datetime, timezone
15from typing import (
16 IO,
17 TYPE_CHECKING,
18 Any,
19 ClassVar,
20 TypeVar,
21)
23from securesystemslib import exceptions as sslib_exceptions
24from securesystemslib import hash as sslib_hash
25from securesystemslib.signer import Key, Signature
27from tuf.api.exceptions import LengthOrHashMismatchError, UnsignedMetadataError
29if TYPE_CHECKING:
30 from collections.abc import Iterator
32_ROOT = "root"
33_SNAPSHOT = "snapshot"
34_TARGETS = "targets"
35_TIMESTAMP = "timestamp"
37# We aim to support SPECIFICATION_VERSION and require the input metadata
38# files to have the same major version (the first number) as ours.
39SPECIFICATION_VERSION = ["1", "0", "31"]
40TOP_LEVEL_ROLE_NAMES = {_ROOT, _TIMESTAMP, _SNAPSHOT, _TARGETS}
42logger = logging.getLogger(__name__)
44# T is a Generic type constraint for container payloads
45T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets")
48class Signed(metaclass=abc.ABCMeta):
49 """A base class for the signed part of TUF metadata.
51 Objects with base class Signed are usually included in a ``Metadata`` object
52 on the signed attribute. This class provides attributes and methods that
53 are common for all TUF metadata types (roles).
55 *All parameters named below are not just constructor arguments but also
56 instance attributes.*
58 Args:
59 version: Metadata version number. If None, then 1 is assigned.
60 spec_version: Supported TUF specification version. If None, then the
61 version currently supported by the library is assigned.
62 expires: Metadata expiry date in UTC timezone. If None, then current
63 date and time is assigned.
64 unrecognized_fields: Dictionary of all attributes that are not managed
65 by TUF Metadata API
67 Raises:
68 ValueError: Invalid arguments.
69 """
71 # type is required for static reference without changing the API
72 type: ClassVar[str] = "signed"
74 # _type and type are identical: 1st replicates file format, 2nd passes lint
75 @property
76 def _type(self) -> str:
77 return self.type
79 @property
80 def expires(self) -> datetime:
81 """Get the metadata expiry date."""
82 return self._expires
84 @expires.setter
85 def expires(self, value: datetime) -> None:
86 """Set the metadata expiry date.
88 # Use 'datetime' module to e.g. expire in seven days from now
89 obj.expires = now(timezone.utc) + timedelta(days=7)
90 """
91 self._expires = value.replace(microsecond=0)
92 if self._expires.tzinfo is None:
93 # Naive datetime: just make it UTC
94 self._expires = self._expires.replace(tzinfo=timezone.utc)
95 elif self._expires.tzinfo != timezone.utc:
96 raise ValueError(f"Expected tz UTC, not {self._expires.tzinfo}")
98 # NOTE: Signed is a stupid name, because this might not be signed yet, but
99 # we keep it to match spec terminology (I often refer to this as "payload",
100 # or "inner metadata")
101 def __init__(
102 self,
103 version: int | None,
104 spec_version: str | None,
105 expires: datetime | None,
106 unrecognized_fields: dict[str, Any] | None,
107 ):
108 if spec_version is None:
109 spec_version = ".".join(SPECIFICATION_VERSION)
110 # Accept semver (X.Y.Z) but also X.Y for legacy compatibility
111 spec_list = spec_version.split(".")
112 if len(spec_list) not in [2, 3] or not all(
113 el.isdigit() for el in spec_list
114 ):
115 raise ValueError(f"Failed to parse spec_version {spec_version}")
117 # major version must match
118 if spec_list[0] != SPECIFICATION_VERSION[0]:
119 raise ValueError(f"Unsupported spec_version {spec_version}")
121 self.spec_version = spec_version
123 self.expires = expires or datetime.now(timezone.utc)
125 if version is None:
126 version = 1
127 elif version <= 0:
128 raise ValueError(f"version must be > 0, got {version}")
129 self.version = version
131 if unrecognized_fields is None:
132 unrecognized_fields = {}
134 self.unrecognized_fields = unrecognized_fields
136 def __eq__(self, other: object) -> bool:
137 if not isinstance(other, Signed):
138 return False
140 return (
141 self.type == other.type
142 and self.version == other.version
143 and self.spec_version == other.spec_version
144 and self.expires == other.expires
145 and self.unrecognized_fields == other.unrecognized_fields
146 )
148 @abc.abstractmethod
149 def to_dict(self) -> dict[str, Any]:
150 """Serialize and return a dict representation of self."""
151 raise NotImplementedError
153 @classmethod
154 @abc.abstractmethod
155 def from_dict(cls, signed_dict: dict[str, Any]) -> Signed:
156 """Deserialization helper, creates object from json/dict
157 representation.
158 """
159 raise NotImplementedError
161 @classmethod
162 def _common_fields_from_dict(
163 cls, signed_dict: dict[str, Any]
164 ) -> tuple[int, str, datetime]:
165 """Return common fields of ``Signed`` instances from the passed dict
166 representation, and returns an ordered list to be passed as leading
167 positional arguments to a subclass constructor.
169 See ``{Root, Timestamp, Snapshot, Targets}.from_dict``
170 methods for usage.
172 """
173 _type = signed_dict.pop("_type")
174 if _type != cls.type:
175 raise ValueError(f"Expected type {cls.type}, got {_type}")
177 version = signed_dict.pop("version")
178 spec_version = signed_dict.pop("spec_version")
179 expires_str = signed_dict.pop("expires")
180 # Convert 'expires' TUF metadata string to a datetime object, which is
181 # what the constructor expects and what we store. The inverse operation
182 # is implemented in '_common_fields_to_dict'.
183 expires = datetime.strptime(expires_str, "%Y-%m-%dT%H:%M:%SZ").replace(
184 tzinfo=timezone.utc
185 )
187 return version, spec_version, expires
189 def _common_fields_to_dict(self) -> dict[str, Any]:
190 """Return a dict representation of common fields of
191 ``Signed`` instances.
193 See ``{Root, Timestamp, Snapshot, Targets}.to_dict`` methods for usage.
195 """
196 return {
197 "_type": self._type,
198 "version": self.version,
199 "spec_version": self.spec_version,
200 "expires": self.expires.strftime("%Y-%m-%dT%H:%M:%SZ"),
201 **self.unrecognized_fields,
202 }
204 def is_expired(self, reference_time: datetime | None = None) -> bool:
205 """Check metadata expiration against a reference time.
207 Args:
208 reference_time: Time to check expiration date against. A naive
209 datetime in UTC expected. Default is current UTC date and time.
211 Returns:
212 ``True`` if expiration time is less than the reference time.
213 """
214 if reference_time is None:
215 reference_time = datetime.now(timezone.utc)
217 return reference_time >= self.expires
220class Role:
221 """Container that defines which keys are required to sign roles metadata.
223 Role defines how many keys are required to successfully sign the roles
224 metadata, and which keys are accepted.
226 *All parameters named below are not just constructor arguments but also
227 instance attributes.*
229 Args:
230 keyids: Roles signing key identifiers.
231 threshold: Number of keys required to sign this role's metadata.
232 unrecognized_fields: Dictionary of all attributes that are not managed
233 by TUF Metadata API
235 Raises:
236 ValueError: Invalid arguments.
237 """
239 def __init__(
240 self,
241 keyids: list[str],
242 threshold: int,
243 unrecognized_fields: dict[str, Any] | None = None,
244 ):
245 if len(set(keyids)) != len(keyids):
246 raise ValueError(f"Nonunique keyids: {keyids}")
247 if threshold < 1:
248 raise ValueError("threshold should be at least 1!")
249 self.keyids = keyids
250 self.threshold = threshold
251 if unrecognized_fields is None:
252 unrecognized_fields = {}
254 self.unrecognized_fields = unrecognized_fields
256 def __eq__(self, other: object) -> bool:
257 if not isinstance(other, Role):
258 return False
260 return (
261 self.keyids == other.keyids
262 and self.threshold == other.threshold
263 and self.unrecognized_fields == other.unrecognized_fields
264 )
266 @classmethod
267 def from_dict(cls, role_dict: dict[str, Any]) -> Role:
268 """Create ``Role`` object from its json/dict representation.
270 Raises:
271 ValueError, KeyError: Invalid arguments.
272 """
273 keyids = role_dict.pop("keyids")
274 threshold = role_dict.pop("threshold")
275 # All fields left in the role_dict are unrecognized.
276 return cls(keyids, threshold, role_dict)
278 def to_dict(self) -> dict[str, Any]:
279 """Return the dictionary representation of self."""
280 return {
281 "keyids": self.keyids,
282 "threshold": self.threshold,
283 **self.unrecognized_fields,
284 }
287@dataclass
288class VerificationResult:
289 """Signature verification result for delegated role metadata.
291 Attributes:
292 threshold: Number of required signatures.
293 signed: dict of keyid to Key, containing keys that have signed.
294 unsigned: dict of keyid to Key, containing keys that have not signed.
295 """
297 threshold: int
298 signed: dict[str, Key]
299 unsigned: dict[str, Key]
301 def __bool__(self) -> bool:
302 return self.verified
304 @property
305 def verified(self) -> bool:
306 """True if threshold of signatures is met."""
307 return len(self.signed) >= self.threshold
309 @property
310 def missing(self) -> int:
311 """Number of additional signatures required to reach threshold."""
312 return max(0, self.threshold - len(self.signed))
315@dataclass
316class RootVerificationResult:
317 """Signature verification result for root metadata.
319 Root must be verified by itself and the previous root version. This
320 dataclass represents both results. For the edge case of first version
321 of root, these underlying results are identical.
323 Note that `signed` and `unsigned` correctness requires the underlying
324 VerificationResult keys to not conflict (no reusing the same keyid for
325 different keys).
327 Attributes:
328 first: First underlying VerificationResult
329 second: Second underlying VerificationResult
330 """
332 first: VerificationResult
333 second: VerificationResult
335 def __bool__(self) -> bool:
336 return self.verified
338 @property
339 def verified(self) -> bool:
340 """True if threshold of signatures is met in both underlying
341 VerificationResults.
342 """
343 return self.first.verified and self.second.verified
345 @property
346 def signed(self) -> dict[str, Key]:
347 """Dictionary of all signing keys that have signed, from both
348 VerificationResults.
349 return a union of all signed (in python<3.9 this requires
350 dict unpacking)
351 """
352 return {**self.first.signed, **self.second.signed}
354 @property
355 def unsigned(self) -> dict[str, Key]:
356 """Dictionary of all signing keys that have not signed, from both
357 VerificationResults.
358 return a union of all unsigned (in python<3.9 this requires
359 dict unpacking)
360 """
361 return {**self.first.unsigned, **self.second.unsigned}
364class _DelegatorMixin(metaclass=abc.ABCMeta):
365 """Class that implements verify_delegate() for Root and Targets"""
367 @abc.abstractmethod
368 def get_delegated_role(self, delegated_role: str) -> Role:
369 """Return the role object for the given delegated role.
371 Raises ValueError if delegated_role is not actually delegated.
372 """
373 raise NotImplementedError
375 @abc.abstractmethod
376 def get_key(self, keyid: str) -> Key:
377 """Return the key object for the given keyid.
379 Raises ValueError if key is not found.
380 """
381 raise NotImplementedError
383 def get_verification_result(
384 self,
385 delegated_role: str,
386 payload: bytes,
387 signatures: dict[str, Signature],
388 ) -> VerificationResult:
389 """Return signature threshold verification result for delegated role.
391 NOTE: Unlike `verify_delegate()` this method does not raise, if the
392 role metadata is not fully verified.
394 Args:
395 delegated_role: Name of the delegated role to verify
396 payload: Signed payload bytes for the delegated role
397 signatures: Signatures over payload bytes
399 Raises:
400 ValueError: no delegation was found for ``delegated_role``.
401 """
402 role = self.get_delegated_role(delegated_role)
404 signed = {}
405 unsigned = {}
407 for keyid in role.keyids:
408 try:
409 key = self.get_key(keyid)
410 except ValueError:
411 logger.info("No key for keyid %s", keyid)
412 continue
414 if keyid not in signatures:
415 unsigned[keyid] = key
416 logger.info("No signature for keyid %s", keyid)
417 continue
419 sig = signatures[keyid]
420 try:
421 key.verify_signature(sig, payload)
422 signed[keyid] = key
423 except sslib_exceptions.UnverifiedSignatureError:
424 unsigned[keyid] = key
425 logger.info("Key %s failed to verify %s", keyid, delegated_role)
427 return VerificationResult(role.threshold, signed, unsigned)
429 def verify_delegate(
430 self,
431 delegated_role: str,
432 payload: bytes,
433 signatures: dict[str, Signature],
434 ) -> None:
435 """Verify signature threshold for delegated role.
437 Verify that there are enough valid ``signatures`` over ``payload``, to
438 meet the threshold of keys for ``delegated_role``, as defined by the
439 delegator (``self``).
441 Args:
442 delegated_role: Name of the delegated role to verify
443 payload: Signed payload bytes for the delegated role
444 signatures: Signatures over payload bytes
446 Raises:
447 UnsignedMetadataError: ``delegated_role`` was not signed with
448 required threshold of keys for ``role_name``.
449 ValueError: no delegation was found for ``delegated_role``.
450 """
451 result = self.get_verification_result(
452 delegated_role, payload, signatures
453 )
454 if not result:
455 raise UnsignedMetadataError(
456 f"{delegated_role} was signed by {len(result.signed)}/"
457 f"{result.threshold} keys"
458 )
461class Root(Signed, _DelegatorMixin):
462 """A container for the signed part of root metadata.
464 Parameters listed below are also instance attributes.
466 Args:
467 version: Metadata version number. Default is 1.
468 spec_version: Supported TUF specification version. Default is the
469 version currently supported by the library.
470 expires: Metadata expiry date. Default is current date and time.
471 keys: Dictionary of keyids to Keys. Defines the keys used in ``roles``.
472 Default is empty dictionary.
473 roles: Dictionary of role names to Roles. Defines which keys are
474 required to sign the metadata for a specific role. Default is
475 a dictionary of top level roles without keys and threshold of 1.
476 consistent_snapshot: ``True`` if repository supports consistent
477 snapshots. Default is True.
478 unrecognized_fields: Dictionary of all attributes that are not managed
479 by TUF Metadata API
481 Raises:
482 ValueError: Invalid arguments.
483 """
485 type = _ROOT
487 def __init__(
488 self,
489 version: int | None = None,
490 spec_version: str | None = None,
491 expires: datetime | None = None,
492 keys: dict[str, Key] | None = None,
493 roles: dict[str, Role] | None = None,
494 consistent_snapshot: bool | None = True,
495 unrecognized_fields: dict[str, Any] | None = None,
496 ):
497 super().__init__(version, spec_version, expires, unrecognized_fields)
498 self.consistent_snapshot = consistent_snapshot
499 self.keys = keys if keys is not None else {}
501 if roles is None:
502 roles = {r: Role([], 1) for r in TOP_LEVEL_ROLE_NAMES}
503 elif set(roles) != TOP_LEVEL_ROLE_NAMES:
504 raise ValueError("Role names must be the top-level metadata roles")
505 self.roles = roles
507 def __eq__(self, other: object) -> bool:
508 if not isinstance(other, Root):
509 return False
511 return (
512 super().__eq__(other)
513 and self.keys == other.keys
514 and self.roles == other.roles
515 and self.consistent_snapshot == other.consistent_snapshot
516 )
518 @classmethod
519 def from_dict(cls, signed_dict: dict[str, Any]) -> Root:
520 """Create ``Root`` object from its json/dict representation.
522 Raises:
523 ValueError, KeyError, TypeError: Invalid arguments.
524 """
525 common_args = cls._common_fields_from_dict(signed_dict)
526 consistent_snapshot = signed_dict.pop("consistent_snapshot", None)
527 keys = signed_dict.pop("keys")
528 roles = signed_dict.pop("roles")
530 for keyid, key_dict in keys.items():
531 keys[keyid] = Key.from_dict(keyid, key_dict)
532 for role_name, role_dict in roles.items():
533 roles[role_name] = Role.from_dict(role_dict)
535 # All fields left in the signed_dict are unrecognized.
536 return cls(*common_args, keys, roles, consistent_snapshot, signed_dict)
538 def to_dict(self) -> dict[str, Any]:
539 """Return the dict representation of self."""
540 root_dict = self._common_fields_to_dict()
541 keys = {keyid: key.to_dict() for (keyid, key) in self.keys.items()}
542 roles = {}
543 for role_name, role in self.roles.items():
544 roles[role_name] = role.to_dict()
545 if self.consistent_snapshot is not None:
546 root_dict["consistent_snapshot"] = self.consistent_snapshot
548 root_dict.update(
549 {
550 "keys": keys,
551 "roles": roles,
552 }
553 )
554 return root_dict
556 def add_key(self, key: Key, role: str) -> None:
557 """Add new signing key for delegated role ``role``.
559 Args:
560 key: Signing key to be added for ``role``.
561 role: Name of the role, for which ``key`` is added.
563 Raises:
564 ValueError: If the argument order is wrong or if ``role`` doesn't
565 exist.
566 """
567 # Verify that our users are not using the old argument order.
568 if isinstance(role, Key):
569 raise ValueError("Role must be a string, not a Key instance")
571 if role not in self.roles:
572 raise ValueError(f"Role {role} doesn't exist")
573 if key.keyid not in self.roles[role].keyids:
574 self.roles[role].keyids.append(key.keyid)
575 self.keys[key.keyid] = key
577 def revoke_key(self, keyid: str, role: str) -> None:
578 """Revoke key from ``role`` and updates the key store.
580 Args:
581 keyid: Identifier of the key to be removed for ``role``.
582 role: Name of the role, for which a signing key is removed.
584 Raises:
585 ValueError: If ``role`` doesn't exist or if ``role`` doesn't include
586 the key.
587 """
588 if role not in self.roles:
589 raise ValueError(f"Role {role} doesn't exist")
590 if keyid not in self.roles[role].keyids:
591 raise ValueError(f"Key with id {keyid} is not used by {role}")
592 self.roles[role].keyids.remove(keyid)
593 for keyinfo in self.roles.values():
594 if keyid in keyinfo.keyids:
595 return
597 del self.keys[keyid]
599 def get_delegated_role(self, delegated_role: str) -> Role:
600 """Return the role object for the given delegated role.
602 Raises ValueError if delegated_role is not actually delegated.
603 """
604 if delegated_role not in self.roles:
605 raise ValueError(f"Delegated role {delegated_role} not found")
607 return self.roles[delegated_role]
609 def get_key(self, keyid: str) -> Key:
610 if keyid not in self.keys:
611 raise ValueError(f"Key {keyid} not found")
613 return self.keys[keyid]
615 def get_root_verification_result(
616 self,
617 previous: Root | None,
618 payload: bytes,
619 signatures: dict[str, Signature],
620 ) -> RootVerificationResult:
621 """Return signature threshold verification result for two root roles.
623 Verify root metadata with two roles (`self` and optionally `previous`).
625 If the repository has no root role versions yet, `previous` can be left
626 None. In all other cases, `previous` must be the previous version of
627 the Root.
629 NOTE: Unlike `verify_delegate()` this method does not raise, if the
630 root metadata is not fully verified.
632 Args:
633 previous: The previous `Root` to verify payload with, or None
634 payload: Signed payload bytes for root
635 signatures: Signatures over payload bytes
637 Raises:
638 ValueError: no delegation was found for ``root`` or given Root
639 versions are not sequential.
640 """
642 if previous is None:
643 previous = self
644 elif self.version != previous.version + 1:
645 versions = f"v{previous.version} and v{self.version}"
646 raise ValueError(
647 f"Expected sequential root versions, got {versions}."
648 )
650 return RootVerificationResult(
651 previous.get_verification_result(Root.type, payload, signatures),
652 self.get_verification_result(Root.type, payload, signatures),
653 )
656class BaseFile:
657 """A base class of ``MetaFile`` and ``TargetFile``.
659 Encapsulates common static methods for length and hash verification.
660 """
662 @staticmethod
663 def _verify_hashes(
664 data: bytes | IO[bytes], expected_hashes: dict[str, str]
665 ) -> None:
666 """Verify that the hash of ``data`` matches ``expected_hashes``."""
667 is_bytes = isinstance(data, bytes)
668 for algo, exp_hash in expected_hashes.items():
669 try:
670 if is_bytes:
671 digest_object = sslib_hash.digest(algo)
672 digest_object.update(data)
673 else:
674 # if data is not bytes, assume it is a file object
675 digest_object = sslib_hash.digest_fileobject(data, algo)
676 except (
677 sslib_exceptions.UnsupportedAlgorithmError,
678 sslib_exceptions.FormatError,
679 ) as e:
680 raise LengthOrHashMismatchError(
681 f"Unsupported algorithm '{algo}'"
682 ) from e
684 observed_hash = digest_object.hexdigest()
685 if observed_hash != exp_hash:
686 raise LengthOrHashMismatchError(
687 f"Observed hash {observed_hash} does not match "
688 f"expected hash {exp_hash}"
689 )
691 @staticmethod
692 def _verify_length(data: bytes | IO[bytes], expected_length: int) -> None:
693 """Verify that the length of ``data`` matches ``expected_length``."""
694 if isinstance(data, bytes):
695 observed_length = len(data)
696 else:
697 # if data is not bytes, assume it is a file object
698 data.seek(0, io.SEEK_END)
699 observed_length = data.tell()
701 if observed_length != expected_length:
702 raise LengthOrHashMismatchError(
703 f"Observed length {observed_length} does not match "
704 f"expected length {expected_length}"
705 )
707 @staticmethod
708 def _validate_hashes(hashes: dict[str, str]) -> None:
709 if not hashes:
710 raise ValueError("Hashes must be a non empty dictionary")
711 for key, value in hashes.items():
712 if not (isinstance(key, str) and isinstance(value, str)):
713 raise TypeError("Hashes items must be strings")
715 @staticmethod
716 def _validate_length(length: int) -> None:
717 if length < 0:
718 raise ValueError(f"Length must be >= 0, got {length}")
720 @staticmethod
721 def _get_length_and_hashes(
722 data: bytes | IO[bytes], hash_algorithms: list[str] | None
723 ) -> tuple[int, dict[str, str]]:
724 """Calculate length and hashes of ``data``."""
725 if isinstance(data, bytes):
726 length = len(data)
727 else:
728 data.seek(0, io.SEEK_END)
729 length = data.tell()
731 hashes = {}
733 if hash_algorithms is None:
734 hash_algorithms = [sslib_hash.DEFAULT_HASH_ALGORITHM]
736 for algorithm in hash_algorithms:
737 try:
738 if isinstance(data, bytes):
739 digest_object = sslib_hash.digest(algorithm)
740 digest_object.update(data)
741 else:
742 digest_object = sslib_hash.digest_fileobject(
743 data, algorithm
744 )
745 except (
746 sslib_exceptions.UnsupportedAlgorithmError,
747 sslib_exceptions.FormatError,
748 ) as e:
749 raise ValueError(f"Unsupported algorithm '{algorithm}'") from e
751 hashes[algorithm] = digest_object.hexdigest()
753 return (length, hashes)
756class MetaFile(BaseFile):
757 """A container with information about a particular metadata file.
759 *All parameters named below are not just constructor arguments but also
760 instance attributes.*
762 Args:
763 version: Version of the metadata file.
764 length: Length of the metadata file in bytes.
765 hashes: Dictionary of hash algorithm names to hashes of the metadata
766 file content.
767 unrecognized_fields: Dictionary of all attributes that are not managed
768 by TUF Metadata API
770 Raises:
771 ValueError, TypeError: Invalid arguments.
772 """
774 def __init__(
775 self,
776 version: int = 1,
777 length: int | None = None,
778 hashes: dict[str, str] | None = None,
779 unrecognized_fields: dict[str, Any] | None = None,
780 ):
781 if version <= 0:
782 raise ValueError(f"Metafile version must be > 0, got {version}")
783 if length is not None:
784 self._validate_length(length)
785 if hashes is not None:
786 self._validate_hashes(hashes)
788 self.version = version
789 self.length = length
790 self.hashes = hashes
791 if unrecognized_fields is None:
792 unrecognized_fields = {}
794 self.unrecognized_fields = unrecognized_fields
796 def __eq__(self, other: object) -> bool:
797 if not isinstance(other, MetaFile):
798 return False
800 return (
801 self.version == other.version
802 and self.length == other.length
803 and self.hashes == other.hashes
804 and self.unrecognized_fields == other.unrecognized_fields
805 )
807 @classmethod
808 def from_dict(cls, meta_dict: dict[str, Any]) -> MetaFile:
809 """Create ``MetaFile`` object from its json/dict representation.
811 Raises:
812 ValueError, KeyError: Invalid arguments.
813 """
814 version = meta_dict.pop("version")
815 length = meta_dict.pop("length", None)
816 hashes = meta_dict.pop("hashes", None)
818 # All fields left in the meta_dict are unrecognized.
819 return cls(version, length, hashes, meta_dict)
821 @classmethod
822 def from_data(
823 cls,
824 version: int,
825 data: bytes | IO[bytes],
826 hash_algorithms: list[str],
827 ) -> MetaFile:
828 """Creates MetaFile object from bytes.
829 This constructor should only be used if hashes are wanted.
830 By default, MetaFile(ver) should be used.
831 Args:
832 version: Version of the metadata file.
833 data: Metadata bytes that the metafile represents.
834 hash_algorithms: Hash algorithms to create the hashes with. If not
835 specified, the securesystemslib default hash algorithm is used.
837 Raises:
838 ValueError: The hash algorithms list contains an unsupported
839 algorithm.
840 """
841 length, hashes = cls._get_length_and_hashes(data, hash_algorithms)
842 return cls(version, length, hashes)
844 def to_dict(self) -> dict[str, Any]:
845 """Return the dictionary representation of self."""
846 res_dict: dict[str, Any] = {
847 "version": self.version,
848 **self.unrecognized_fields,
849 }
851 if self.length is not None:
852 res_dict["length"] = self.length
854 if self.hashes is not None:
855 res_dict["hashes"] = self.hashes
857 return res_dict
859 def verify_length_and_hashes(self, data: bytes | IO[bytes]) -> None:
860 """Verify that the length and hashes of ``data`` match expected values.
862 Args:
863 data: File object or its content in bytes.
865 Raises:
866 LengthOrHashMismatchError: Calculated length or hashes do not
867 match expected values or hash algorithm is not supported.
868 """
869 if self.length is not None:
870 self._verify_length(data, self.length)
872 if self.hashes is not None:
873 self._verify_hashes(data, self.hashes)
876class Timestamp(Signed):
877 """A container for the signed part of timestamp metadata.
879 TUF file format uses a dictionary to contain the snapshot information:
880 this is not the case with ``Timestamp.snapshot_meta`` which is a
881 ``MetaFile``.
883 *All parameters named below are not just constructor arguments but also
884 instance attributes.*
886 Args:
887 version: Metadata version number. Default is 1.
888 spec_version: Supported TUF specification version. Default is the
889 version currently supported by the library.
890 expires: Metadata expiry date. Default is current date and time.
891 unrecognized_fields: Dictionary of all attributes that are not managed
892 by TUF Metadata API
893 snapshot_meta: Meta information for snapshot metadata. Default is a
894 MetaFile with version 1.
896 Raises:
897 ValueError: Invalid arguments.
898 """
900 type = _TIMESTAMP
902 def __init__(
903 self,
904 version: int | None = None,
905 spec_version: str | None = None,
906 expires: datetime | None = None,
907 snapshot_meta: MetaFile | None = None,
908 unrecognized_fields: dict[str, Any] | None = None,
909 ):
910 super().__init__(version, spec_version, expires, unrecognized_fields)
911 self.snapshot_meta = snapshot_meta or MetaFile(1)
913 def __eq__(self, other: object) -> bool:
914 if not isinstance(other, Timestamp):
915 return False
917 return (
918 super().__eq__(other) and self.snapshot_meta == other.snapshot_meta
919 )
921 @classmethod
922 def from_dict(cls, signed_dict: dict[str, Any]) -> Timestamp:
923 """Create ``Timestamp`` object from its json/dict representation.
925 Raises:
926 ValueError, KeyError: Invalid arguments.
927 """
928 common_args = cls._common_fields_from_dict(signed_dict)
929 meta_dict = signed_dict.pop("meta")
930 snapshot_meta = MetaFile.from_dict(meta_dict["snapshot.json"])
931 # All fields left in the timestamp_dict are unrecognized.
932 return cls(*common_args, snapshot_meta, signed_dict)
934 def to_dict(self) -> dict[str, Any]:
935 """Return the dict representation of self."""
936 res_dict = self._common_fields_to_dict()
937 res_dict["meta"] = {"snapshot.json": self.snapshot_meta.to_dict()}
938 return res_dict
941class Snapshot(Signed):
942 """A container for the signed part of snapshot metadata.
944 Snapshot contains information about all target Metadata files.
946 *All parameters named below are not just constructor arguments but also
947 instance attributes.*
949 Args:
950 version: Metadata version number. Default is 1.
951 spec_version: Supported TUF specification version. Default is the
952 version currently supported by the library.
953 expires: Metadata expiry date. Default is current date and time.
954 unrecognized_fields: Dictionary of all attributes that are not managed
955 by TUF Metadata API
956 meta: Dictionary of targets filenames to ``MetaFile`` objects. Default
957 is a dictionary with a Metafile for "snapshot.json" version 1.
959 Raises:
960 ValueError: Invalid arguments.
961 """
963 type = _SNAPSHOT
965 def __init__(
966 self,
967 version: int | None = None,
968 spec_version: str | None = None,
969 expires: datetime | None = None,
970 meta: dict[str, MetaFile] | None = None,
971 unrecognized_fields: dict[str, Any] | None = None,
972 ):
973 super().__init__(version, spec_version, expires, unrecognized_fields)
974 self.meta = meta if meta is not None else {"targets.json": MetaFile(1)}
976 def __eq__(self, other: object) -> bool:
977 if not isinstance(other, Snapshot):
978 return False
980 return super().__eq__(other) and self.meta == other.meta
982 @classmethod
983 def from_dict(cls, signed_dict: dict[str, Any]) -> Snapshot:
984 """Create ``Snapshot`` object from its json/dict representation.
986 Raises:
987 ValueError, KeyError: Invalid arguments.
988 """
989 common_args = cls._common_fields_from_dict(signed_dict)
990 meta_dicts = signed_dict.pop("meta")
991 meta = {}
992 for meta_path, meta_dict in meta_dicts.items():
993 meta[meta_path] = MetaFile.from_dict(meta_dict)
994 # All fields left in the snapshot_dict are unrecognized.
995 return cls(*common_args, meta, signed_dict)
997 def to_dict(self) -> dict[str, Any]:
998 """Return the dict representation of self."""
999 snapshot_dict = self._common_fields_to_dict()
1000 meta_dict = {}
1001 for meta_path, meta_info in self.meta.items():
1002 meta_dict[meta_path] = meta_info.to_dict()
1004 snapshot_dict["meta"] = meta_dict
1005 return snapshot_dict
1008class DelegatedRole(Role):
1009 """A container with information about a delegated role.
1011 A delegation can happen in two ways:
1013 - ``paths`` is set: delegates targets matching any path pattern in
1014 ``paths``
1015 - ``path_hash_prefixes`` is set: delegates targets whose target path
1016 hash starts with any of the prefixes in ``path_hash_prefixes``
1018 ``paths`` and ``path_hash_prefixes`` are mutually exclusive:
1019 both cannot be set, at least one of them must be set.
1021 *All parameters named below are not just constructor arguments but also
1022 instance attributes.*
1024 Args:
1025 name: Delegated role name.
1026 keyids: Delegated role signing key identifiers.
1027 threshold: Number of keys required to sign this role's metadata.
1028 terminating: ``True`` if this delegation terminates a target lookup.
1029 paths: Path patterns. See note above.
1030 path_hash_prefixes: Hash prefixes. See note above.
1031 unrecognized_fields: Dictionary of all attributes that are not managed
1032 by TUF Metadata API.
1034 Raises:
1035 ValueError: Invalid arguments.
1036 """
1038 def __init__(
1039 self,
1040 name: str,
1041 keyids: list[str],
1042 threshold: int,
1043 terminating: bool,
1044 paths: list[str] | None = None,
1045 path_hash_prefixes: list[str] | None = None,
1046 unrecognized_fields: dict[str, Any] | None = None,
1047 ):
1048 super().__init__(keyids, threshold, unrecognized_fields)
1049 self.name = name
1050 self.terminating = terminating
1051 exclusive_vars = [paths, path_hash_prefixes]
1052 if sum(1 for var in exclusive_vars if var is not None) != 1:
1053 raise ValueError(
1054 "Only one of (paths, path_hash_prefixes) must be set"
1055 )
1057 if paths is not None and any(not isinstance(p, str) for p in paths):
1058 raise ValueError("Paths must be strings")
1059 if path_hash_prefixes is not None and any(
1060 not isinstance(p, str) for p in path_hash_prefixes
1061 ):
1062 raise ValueError("Path_hash_prefixes must be strings")
1064 self.paths = paths
1065 self.path_hash_prefixes = path_hash_prefixes
1067 def __eq__(self, other: object) -> bool:
1068 if not isinstance(other, DelegatedRole):
1069 return False
1071 return (
1072 super().__eq__(other)
1073 and self.name == other.name
1074 and self.terminating == other.terminating
1075 and self.paths == other.paths
1076 and self.path_hash_prefixes == other.path_hash_prefixes
1077 )
1079 @classmethod
1080 def from_dict(cls, role_dict: dict[str, Any]) -> DelegatedRole:
1081 """Create ``DelegatedRole`` object from its json/dict representation.
1083 Raises:
1084 ValueError, KeyError, TypeError: Invalid arguments.
1085 """
1086 name = role_dict.pop("name")
1087 keyids = role_dict.pop("keyids")
1088 threshold = role_dict.pop("threshold")
1089 terminating = role_dict.pop("terminating")
1090 paths = role_dict.pop("paths", None)
1091 path_hash_prefixes = role_dict.pop("path_hash_prefixes", None)
1092 # All fields left in the role_dict are unrecognized.
1093 return cls(
1094 name,
1095 keyids,
1096 threshold,
1097 terminating,
1098 paths,
1099 path_hash_prefixes,
1100 role_dict,
1101 )
1103 def to_dict(self) -> dict[str, Any]:
1104 """Return the dict representation of self."""
1105 base_role_dict = super().to_dict()
1106 res_dict = {
1107 "name": self.name,
1108 "terminating": self.terminating,
1109 **base_role_dict,
1110 }
1111 if self.paths is not None:
1112 res_dict["paths"] = self.paths
1113 elif self.path_hash_prefixes is not None:
1114 res_dict["path_hash_prefixes"] = self.path_hash_prefixes
1115 return res_dict
1117 @staticmethod
1118 def _is_target_in_pathpattern(targetpath: str, pathpattern: str) -> bool:
1119 """Determine whether ``targetpath`` matches the ``pathpattern``."""
1120 # We need to make sure that targetpath and pathpattern are pointing to
1121 # the same directory as fnmatch doesn't threat "/" as a special symbol.
1122 target_parts = targetpath.split("/")
1123 pattern_parts = pathpattern.split("/")
1124 if len(target_parts) != len(pattern_parts):
1125 return False
1127 # Every part in the pathpattern could include a glob pattern, that's why
1128 # each of the target and pathpattern parts should match.
1129 for target_dir, pattern_dir in zip(target_parts, pattern_parts):
1130 if not fnmatch.fnmatch(target_dir, pattern_dir):
1131 return False
1133 return True
1135 def is_delegated_path(self, target_filepath: str) -> bool:
1136 """Determine whether the given ``target_filepath`` is in one of
1137 the paths that ``DelegatedRole`` is trusted to provide.
1139 The ``target_filepath`` and the ``DelegatedRole`` paths are expected to
1140 be in their canonical forms, so e.g. "a/b" instead of "a//b" . Only "/"
1141 is supported as target path separator. Leading separators are not
1142 handled as special cases (see `TUF specification on targetpath
1143 <https://theupdateframework.github.io/specification/latest/#targetpath>`_).
1145 Args:
1146 target_filepath: URL path to a target file, relative to a base
1147 targets URL.
1148 """
1150 if self.path_hash_prefixes is not None:
1151 # Calculate the hash of the filepath
1152 # to determine in which bin to find the target.
1153 digest_object = sslib_hash.digest(algorithm="sha256")
1154 digest_object.update(target_filepath.encode("utf-8"))
1155 target_filepath_hash = digest_object.hexdigest()
1157 for path_hash_prefix in self.path_hash_prefixes:
1158 if target_filepath_hash.startswith(path_hash_prefix):
1159 return True
1161 elif self.paths is not None:
1162 for pathpattern in self.paths:
1163 # A delegated role path may be an explicit path or glob
1164 # pattern (Unix shell-style wildcards).
1165 if self._is_target_in_pathpattern(target_filepath, pathpattern):
1166 return True
1168 return False
1171class SuccinctRoles(Role):
1172 """Succinctly defines a hash bin delegation graph.
1174 A ``SuccinctRoles`` object describes a delegation graph that covers all
1175 targets, distributing them uniformly over the delegated roles (i.e. bins)
1176 in the graph.
1178 The total number of bins is 2 to the power of the passed ``bit_length``.
1180 Bin names are the concatenation of the passed ``name_prefix`` and a
1181 zero-padded hex representation of the bin index separated by a hyphen.
1183 The passed ``keyids`` and ``threshold`` is used for each bin, and each bin
1184 is 'terminating'.
1186 For details: https://github.com/theupdateframework/taps/blob/master/tap15.md
1188 Args:
1189 keyids: Signing key identifiers for any bin metadata.
1190 threshold: Number of keys required to sign any bin metadata.
1191 bit_length: Number of bits between 1 and 32.
1192 name_prefix: Prefix of all bin names.
1193 unrecognized_fields: Dictionary of all attributes that are not managed
1194 by TUF Metadata API.
1196 Raises:
1197 ValueError, TypeError, AttributeError: Invalid arguments.
1198 """
1200 def __init__(
1201 self,
1202 keyids: list[str],
1203 threshold: int,
1204 bit_length: int,
1205 name_prefix: str,
1206 unrecognized_fields: dict[str, Any] | None = None,
1207 ) -> None:
1208 super().__init__(keyids, threshold, unrecognized_fields)
1210 if bit_length <= 0 or bit_length > 32:
1211 raise ValueError("bit_length must be between 1 and 32")
1212 if not isinstance(name_prefix, str):
1213 raise ValueError("name_prefix must be a string")
1215 self.bit_length = bit_length
1216 self.name_prefix = name_prefix
1218 # Calculate the suffix_len value based on the total number of bins in
1219 # hex. If bit_length = 10 then number_of_bins = 1024 or bin names will
1220 # have a suffix between "000" and "3ff" in hex and suffix_len will be 3
1221 # meaning the third bin will have a suffix of "003".
1222 self.number_of_bins = 2**bit_length
1223 # suffix_len is calculated based on "number_of_bins - 1" as the name
1224 # of the last bin contains the number "number_of_bins -1" as a suffix.
1225 self.suffix_len = len(f"{self.number_of_bins - 1:x}")
1227 def __eq__(self, other: object) -> bool:
1228 if not isinstance(other, SuccinctRoles):
1229 return False
1231 return (
1232 super().__eq__(other)
1233 and self.bit_length == other.bit_length
1234 and self.name_prefix == other.name_prefix
1235 )
1237 @classmethod
1238 def from_dict(cls, role_dict: dict[str, Any]) -> SuccinctRoles:
1239 """Create ``SuccinctRoles`` object from its json/dict representation.
1241 Raises:
1242 ValueError, KeyError, AttributeError, TypeError: Invalid arguments.
1243 """
1244 keyids = role_dict.pop("keyids")
1245 threshold = role_dict.pop("threshold")
1246 bit_length = role_dict.pop("bit_length")
1247 name_prefix = role_dict.pop("name_prefix")
1248 # All fields left in the role_dict are unrecognized.
1249 return cls(keyids, threshold, bit_length, name_prefix, role_dict)
1251 def to_dict(self) -> dict[str, Any]:
1252 """Return the dict representation of self."""
1253 base_role_dict = super().to_dict()
1254 return {
1255 "bit_length": self.bit_length,
1256 "name_prefix": self.name_prefix,
1257 **base_role_dict,
1258 }
1260 def get_role_for_target(self, target_filepath: str) -> str:
1261 """Calculate the name of the delegated role responsible for
1262 ``target_filepath``.
1264 The target at path ``target_filepath`` is assigned to a bin by casting
1265 the left-most ``bit_length`` of bits of the file path hash digest to
1266 int, using it as bin index between 0 and ``2**bit_length - 1``.
1268 Args:
1269 target_filepath: URL path to a target file, relative to a base
1270 targets URL.
1271 """
1272 hasher = sslib_hash.digest(algorithm="sha256")
1273 hasher.update(target_filepath.encode("utf-8"))
1275 # We can't ever need more than 4 bytes (32 bits).
1276 hash_bytes = hasher.digest()[:4]
1277 # Right shift hash bytes, so that we only have the leftmost
1278 # bit_length bits that we care about.
1279 shift_value = 32 - self.bit_length
1280 bin_number = int.from_bytes(hash_bytes, byteorder="big") >> shift_value
1281 # Add zero padding if necessary and cast to hex the suffix.
1282 suffix = f"{bin_number:0{self.suffix_len}x}"
1283 return f"{self.name_prefix}-{suffix}"
1285 def get_roles(self) -> Iterator[str]:
1286 """Yield the names of all different delegated roles one by one."""
1287 for i in range(self.number_of_bins):
1288 suffix = f"{i:0{self.suffix_len}x}"
1289 yield f"{self.name_prefix}-{suffix}"
1291 def is_delegated_role(self, role_name: str) -> bool:
1292 """Determine whether the given ``role_name`` is in one of
1293 the delegated roles that ``SuccinctRoles`` represents.
1295 Args:
1296 role_name: The name of the role to check against.
1297 """
1298 desired_prefix = self.name_prefix + "-"
1300 if not role_name.startswith(desired_prefix):
1301 return False
1303 suffix = role_name[len(desired_prefix) :]
1304 if len(suffix) != self.suffix_len:
1305 return False
1307 try:
1308 # make sure suffix is hex value
1309 num = int(suffix, 16)
1310 except ValueError:
1311 return False
1313 return 0 <= num < self.number_of_bins
1316class Delegations:
1317 """A container object storing information about all delegations.
1319 *All parameters named below are not just constructor arguments but also
1320 instance attributes.*
1322 Args:
1323 keys: Dictionary of keyids to Keys. Defines the keys used in ``roles``.
1324 roles: Ordered dictionary of role names to DelegatedRoles instances. It
1325 defines which keys are required to sign the metadata for a specific
1326 role. The roles order also defines the order that role delegations
1327 are considered during target searches.
1328 succinct_roles: Contains succinct information about hash bin
1329 delegations. Note that succinct roles is not a TUF specification
1330 feature yet and setting `succinct_roles` to a value makes the
1331 resulting metadata non-compliant. The metadata will not be accepted
1332 as valid by specification compliant clients such as those built with
1333 python-tuf <= 1.1.0. For more information see: https://github.com/theupdateframework/taps/blob/master/tap15.md
1334 unrecognized_fields: Dictionary of all attributes that are not managed
1335 by TUF Metadata API
1337 Exactly one of ``roles`` and ``succinct_roles`` must be set.
1339 Raises:
1340 ValueError: Invalid arguments.
1341 """
1343 def __init__(
1344 self,
1345 keys: dict[str, Key],
1346 roles: dict[str, DelegatedRole] | None = None,
1347 succinct_roles: SuccinctRoles | None = None,
1348 unrecognized_fields: dict[str, Any] | None = None,
1349 ):
1350 self.keys = keys
1351 if sum(1 for v in [roles, succinct_roles] if v is not None) != 1:
1352 raise ValueError("One of roles and succinct_roles must be set")
1354 if roles is not None:
1355 for role in roles:
1356 if not role or role in TOP_LEVEL_ROLE_NAMES:
1357 raise ValueError(
1358 "Delegated roles cannot be empty or use top-level "
1359 "role names"
1360 )
1362 self.roles = roles
1363 self.succinct_roles = succinct_roles
1364 if unrecognized_fields is None:
1365 unrecognized_fields = {}
1367 self.unrecognized_fields = unrecognized_fields
1369 def __eq__(self, other: object) -> bool:
1370 if not isinstance(other, Delegations):
1371 return False
1373 all_attributes_check = (
1374 self.keys == other.keys
1375 and self.roles == other.roles
1376 and self.succinct_roles == other.succinct_roles
1377 and self.unrecognized_fields == other.unrecognized_fields
1378 )
1380 if self.roles is not None and other.roles is not None:
1381 all_attributes_check = (
1382 all_attributes_check
1383 # Order of the delegated roles matters (see issue #1788).
1384 and list(self.roles.items()) == list(other.roles.items())
1385 )
1387 return all_attributes_check
1389 @classmethod
1390 def from_dict(cls, delegations_dict: dict[str, Any]) -> Delegations:
1391 """Create ``Delegations`` object from its json/dict representation.
1393 Raises:
1394 ValueError, KeyError, TypeError: Invalid arguments.
1395 """
1396 keys = delegations_dict.pop("keys")
1397 keys_res = {}
1398 for keyid, key_dict in keys.items():
1399 keys_res[keyid] = Key.from_dict(keyid, key_dict)
1400 roles = delegations_dict.pop("roles", None)
1401 roles_res: dict[str, DelegatedRole] | None = None
1403 if roles is not None:
1404 roles_res = {}
1405 for role_dict in roles:
1406 new_role = DelegatedRole.from_dict(role_dict)
1407 if new_role.name in roles_res:
1408 raise ValueError(f"Duplicate role {new_role.name}")
1409 roles_res[new_role.name] = new_role
1411 succinct_roles_dict = delegations_dict.pop("succinct_roles", None)
1412 succinct_roles_info = None
1413 if succinct_roles_dict is not None:
1414 succinct_roles_info = SuccinctRoles.from_dict(succinct_roles_dict)
1416 # All fields left in the delegations_dict are unrecognized.
1417 return cls(keys_res, roles_res, succinct_roles_info, delegations_dict)
1419 def to_dict(self) -> dict[str, Any]:
1420 """Return the dict representation of self."""
1421 keys = {keyid: key.to_dict() for keyid, key in self.keys.items()}
1422 res_dict: dict[str, Any] = {
1423 "keys": keys,
1424 **self.unrecognized_fields,
1425 }
1426 if self.roles is not None:
1427 roles = [role_obj.to_dict() for role_obj in self.roles.values()]
1428 res_dict["roles"] = roles
1429 elif self.succinct_roles is not None:
1430 res_dict["succinct_roles"] = self.succinct_roles.to_dict()
1432 return res_dict
1434 def get_roles_for_target(
1435 self, target_filepath: str
1436 ) -> Iterator[tuple[str, bool]]:
1437 """Given ``target_filepath`` get names and terminating status of all
1438 delegated roles who are responsible for it.
1440 Args:
1441 target_filepath: URL path to a target file, relative to a base
1442 targets URL.
1443 """
1444 if self.roles is not None:
1445 for role in self.roles.values():
1446 if role.is_delegated_path(target_filepath):
1447 yield role.name, role.terminating
1449 elif self.succinct_roles is not None:
1450 # We consider all succinct_roles as terminating.
1451 # For more information read TAP 15.
1452 yield self.succinct_roles.get_role_for_target(target_filepath), True
1455class TargetFile(BaseFile):
1456 """A container with information about a particular target file.
1458 *All parameters named below are not just constructor arguments but also
1459 instance attributes.*
1461 Args:
1462 length: Length of the target file in bytes.
1463 hashes: Dictionary of hash algorithm names to hashes of the target
1464 file content.
1465 path: URL path to a target file, relative to a base targets URL.
1466 unrecognized_fields: Dictionary of all attributes that are not managed
1467 by TUF Metadata API
1469 Raises:
1470 ValueError, TypeError: Invalid arguments.
1471 """
1473 def __init__(
1474 self,
1475 length: int,
1476 hashes: dict[str, str],
1477 path: str,
1478 unrecognized_fields: dict[str, Any] | None = None,
1479 ):
1480 self._validate_length(length)
1481 self._validate_hashes(hashes)
1483 self.length = length
1484 self.hashes = hashes
1485 self.path = path
1486 if unrecognized_fields is None:
1487 unrecognized_fields = {}
1489 self.unrecognized_fields = unrecognized_fields
1491 @property
1492 def custom(self) -> Any: # noqa: ANN401
1493 """Get implementation specific data related to the target.
1495 python-tuf does not use or validate this data.
1496 """
1497 return self.unrecognized_fields.get("custom")
1499 def __eq__(self, other: object) -> bool:
1500 if not isinstance(other, TargetFile):
1501 return False
1503 return (
1504 self.length == other.length
1505 and self.hashes == other.hashes
1506 and self.path == other.path
1507 and self.unrecognized_fields == other.unrecognized_fields
1508 )
1510 @classmethod
1511 def from_dict(cls, target_dict: dict[str, Any], path: str) -> TargetFile:
1512 """Create ``TargetFile`` object from its json/dict representation.
1514 Raises:
1515 ValueError, KeyError, TypeError: Invalid arguments.
1516 """
1517 length = target_dict.pop("length")
1518 hashes = target_dict.pop("hashes")
1520 # All fields left in the target_dict are unrecognized.
1521 return cls(length, hashes, path, target_dict)
1523 def to_dict(self) -> dict[str, Any]:
1524 """Return the JSON-serializable dictionary representation of self."""
1525 return {
1526 "length": self.length,
1527 "hashes": self.hashes,
1528 **self.unrecognized_fields,
1529 }
1531 @classmethod
1532 def from_file(
1533 cls,
1534 target_file_path: str,
1535 local_path: str,
1536 hash_algorithms: list[str] | None = None,
1537 ) -> TargetFile:
1538 """Create ``TargetFile`` object from a file.
1540 Args:
1541 target_file_path: URL path to a target file, relative to a base
1542 targets URL.
1543 local_path: Local path to target file content.
1544 hash_algorithms: Hash algorithms to calculate hashes with. If not
1545 specified the securesystemslib default hash algorithm is used.
1547 Raises:
1548 FileNotFoundError: The file doesn't exist.
1549 ValueError: The hash algorithms list contains an unsupported
1550 algorithm.
1551 """
1552 with open(local_path, "rb") as file:
1553 return cls.from_data(target_file_path, file, hash_algorithms)
1555 @classmethod
1556 def from_data(
1557 cls,
1558 target_file_path: str,
1559 data: bytes | IO[bytes],
1560 hash_algorithms: list[str] | None = None,
1561 ) -> TargetFile:
1562 """Create ``TargetFile`` object from bytes.
1564 Args:
1565 target_file_path: URL path to a target file, relative to a base
1566 targets URL.
1567 data: Target file content.
1568 hash_algorithms: Hash algorithms to create the hashes with. If not
1569 specified the securesystemslib default hash algorithm is used.
1571 Raises:
1572 ValueError: The hash algorithms list contains an unsupported
1573 algorithm.
1574 """
1575 length, hashes = cls._get_length_and_hashes(data, hash_algorithms)
1576 return cls(length, hashes, target_file_path)
1578 def verify_length_and_hashes(self, data: bytes | IO[bytes]) -> None:
1579 """Verify that length and hashes of ``data`` match expected values.
1581 Args:
1582 data: Target file object or its content in bytes.
1584 Raises:
1585 LengthOrHashMismatchError: Calculated length or hashes do not
1586 match expected values or hash algorithm is not supported.
1587 """
1588 self._verify_length(data, self.length)
1589 self._verify_hashes(data, self.hashes)
1591 def get_prefixed_paths(self) -> list[str]:
1592 """
1593 Return hash-prefixed URL path fragments for the target file path.
1594 """
1595 paths = []
1596 parent, sep, name = self.path.rpartition("/")
1597 for hash_value in self.hashes.values():
1598 paths.append(f"{parent}{sep}{hash_value}.{name}")
1600 return paths
1603class Targets(Signed, _DelegatorMixin):
1604 """A container for the signed part of targets metadata.
1606 Targets contains verifying information about target files and also
1607 delegates responsibility to other Targets roles.
1609 *All parameters named below are not just constructor arguments but also
1610 instance attributes.*
1612 Args:
1613 version: Metadata version number. Default is 1.
1614 spec_version: Supported TUF specification version. Default is the
1615 version currently supported by the library.
1616 expires: Metadata expiry date. Default is current date and time.
1617 targets: Dictionary of target filenames to TargetFiles. Default is an
1618 empty dictionary.
1619 delegations: Defines how this Targets delegates responsibility to other
1620 Targets Metadata files. Default is None.
1621 unrecognized_fields: Dictionary of all attributes that are not managed
1622 by TUF Metadata API
1624 Raises:
1625 ValueError: Invalid arguments.
1626 """
1628 type = _TARGETS
1630 def __init__(
1631 self,
1632 version: int | None = None,
1633 spec_version: str | None = None,
1634 expires: datetime | None = None,
1635 targets: dict[str, TargetFile] | None = None,
1636 delegations: Delegations | None = None,
1637 unrecognized_fields: dict[str, Any] | None = None,
1638 ) -> None:
1639 super().__init__(version, spec_version, expires, unrecognized_fields)
1640 self.targets = targets if targets is not None else {}
1641 self.delegations = delegations
1643 def __eq__(self, other: object) -> bool:
1644 if not isinstance(other, Targets):
1645 return False
1647 return (
1648 super().__eq__(other)
1649 and self.targets == other.targets
1650 and self.delegations == other.delegations
1651 )
1653 @classmethod
1654 def from_dict(cls, signed_dict: dict[str, Any]) -> Targets:
1655 """Create ``Targets`` object from its json/dict representation.
1657 Raises:
1658 ValueError, KeyError, TypeError: Invalid arguments.
1659 """
1660 common_args = cls._common_fields_from_dict(signed_dict)
1661 targets = signed_dict.pop(_TARGETS)
1662 try:
1663 delegations_dict = signed_dict.pop("delegations")
1664 except KeyError:
1665 delegations = None
1666 else:
1667 delegations = Delegations.from_dict(delegations_dict)
1668 res_targets = {}
1669 for target_path, target_info in targets.items():
1670 res_targets[target_path] = TargetFile.from_dict(
1671 target_info, target_path
1672 )
1673 # All fields left in the targets_dict are unrecognized.
1674 return cls(*common_args, res_targets, delegations, signed_dict)
1676 def to_dict(self) -> dict[str, Any]:
1677 """Return the dict representation of self."""
1678 targets_dict = self._common_fields_to_dict()
1679 targets = {}
1680 for target_path, target_file_obj in self.targets.items():
1681 targets[target_path] = target_file_obj.to_dict()
1682 targets_dict[_TARGETS] = targets
1683 if self.delegations is not None:
1684 targets_dict["delegations"] = self.delegations.to_dict()
1685 return targets_dict
1687 def add_key(self, key: Key, role: str | None = None) -> None:
1688 """Add new signing key for delegated role ``role``.
1690 If succinct_roles is used then the ``role`` argument is not required.
1692 Args:
1693 key: Signing key to be added for ``role``.
1694 role: Name of the role, for which ``key`` is added.
1696 Raises:
1697 ValueError: If the argument order is wrong or if there are no
1698 delegated roles or if ``role`` is not delegated by this Target.
1699 """
1700 # Verify that our users are not using the old argument order.
1701 if isinstance(role, Key):
1702 raise ValueError("Role must be a string, not a Key instance")
1704 if self.delegations is None:
1705 raise ValueError(f"Delegated role {role} doesn't exist")
1707 if self.delegations.roles is not None:
1708 if role not in self.delegations.roles:
1709 raise ValueError(f"Delegated role {role} doesn't exist")
1710 if key.keyid not in self.delegations.roles[role].keyids:
1711 self.delegations.roles[role].keyids.append(key.keyid)
1713 elif self.delegations.succinct_roles is not None:
1714 if key.keyid not in self.delegations.succinct_roles.keyids:
1715 self.delegations.succinct_roles.keyids.append(key.keyid)
1717 self.delegations.keys[key.keyid] = key
1719 def revoke_key(self, keyid: str, role: str | None = None) -> None:
1720 """Revokes key from delegated role ``role`` and updates the delegations
1721 key store.
1723 If succinct_roles is used then the ``role`` argument is not required.
1725 Args:
1726 keyid: Identifier of the key to be removed for ``role``.
1727 role: Name of the role, for which a signing key is removed.
1729 Raises:
1730 ValueError: If there are no delegated roles or if ``role`` is not
1731 delegated by this ``Target`` or if key is not used by ``role``
1732 or if key with id ``keyid`` is not used by succinct roles.
1733 """
1734 if self.delegations is None:
1735 raise ValueError(f"Delegated role {role} doesn't exist")
1737 if self.delegations.roles is not None:
1738 if role not in self.delegations.roles:
1739 raise ValueError(f"Delegated role {role} doesn't exist")
1740 if keyid not in self.delegations.roles[role].keyids:
1741 raise ValueError(f"Key with id {keyid} is not used by {role}")
1743 self.delegations.roles[role].keyids.remove(keyid)
1744 for keyinfo in self.delegations.roles.values():
1745 if keyid in keyinfo.keyids:
1746 return
1748 elif self.delegations.succinct_roles is not None:
1749 if keyid not in self.delegations.succinct_roles.keyids:
1750 raise ValueError(
1751 f"Key with id {keyid} is not used by succinct_roles"
1752 )
1754 self.delegations.succinct_roles.keyids.remove(keyid)
1756 del self.delegations.keys[keyid]
1758 def get_delegated_role(self, delegated_role: str) -> Role:
1759 """Return the role object for the given delegated role.
1761 Raises ValueError if delegated_role is not actually delegated.
1762 """
1763 if self.delegations is None:
1764 raise ValueError("No delegations found")
1766 role: Role | None = None
1767 if self.delegations.roles is not None:
1768 role = self.delegations.roles.get(delegated_role)
1769 elif self.delegations.succinct_roles is not None:
1770 succinct = self.delegations.succinct_roles
1771 if succinct.is_delegated_role(delegated_role):
1772 role = succinct
1774 if not role:
1775 raise ValueError(f"Delegated role {delegated_role} not found")
1777 return role
1779 def get_key(self, keyid: str) -> Key:
1780 if self.delegations is None:
1781 raise ValueError("No delegations found")
1782 if keyid not in self.delegations.keys:
1783 raise ValueError(f"Key {keyid} not found")
1785 return self.delegations.keys[keyid]