Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tuf/api/_payload.py: 27%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright the TUF contributors
2# SPDX-License-Identifier: MIT OR Apache-2.0
5"""Helper classes for low-level Metadata API."""
7from __future__ import annotations
9import abc
10import fnmatch
11import hashlib
12import io
13import logging
14import sys
15from dataclasses import dataclass
16from datetime import datetime, timezone
17from typing import (
18 IO,
19 TYPE_CHECKING,
20 Any,
21 ClassVar,
22 TypeVar,
23)
25from securesystemslib import exceptions as sslib_exceptions
26from securesystemslib.signer import Key, Signature
28from tuf.api.exceptions import LengthOrHashMismatchError, UnsignedMetadataError
30if TYPE_CHECKING:
31 from collections.abc import Iterator
33_ROOT = "root"
34_SNAPSHOT = "snapshot"
35_TARGETS = "targets"
36_TIMESTAMP = "timestamp"
38_DEFAULT_HASH_ALGORITHM = "sha256"
39_BLAKE_HASH_ALGORITHM = "blake2b-256"
41# We aim to support SPECIFICATION_VERSION and require the input metadata
42# files to have the same major version (the first number) as ours.
43SPECIFICATION_VERSION = ["1", "0", "31"]
44TOP_LEVEL_ROLE_NAMES = {_ROOT, _TIMESTAMP, _SNAPSHOT, _TARGETS}
46logger = logging.getLogger(__name__)
48# T is a Generic type constraint for container payloads
49T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets")
52def _get_digest(algo: str) -> Any: # noqa: ANN401
53 """New digest helper to support custom "blake2b-256" algo name."""
54 if algo == _BLAKE_HASH_ALGORITHM:
55 return hashlib.blake2b(digest_size=32)
57 return hashlib.new(algo)
60def _hash_bytes(data: bytes, algo: str) -> str:
61 """Returns hexdigest for data using algo."""
62 digest = _get_digest(algo)
63 digest.update(data)
65 return digest.hexdigest()
68def _hash_file(f: IO[bytes], algo: str) -> str:
69 """Returns hexdigest for file using algo."""
70 f.seek(0)
71 if sys.version_info >= (3, 11):
72 digest = hashlib.file_digest(f, lambda: _get_digest(algo)) # type: ignore[arg-type]
74 else:
75 # Fallback for older Pythons. Chunk size is taken from the previously
76 # used and now deprecated `securesystemslib.hash.digest_fileobject`.
77 digest = _get_digest(algo)
78 for chunk in iter(lambda: f.read(4096), b""):
79 digest.update(chunk)
81 return digest.hexdigest()
84class Signed(metaclass=abc.ABCMeta):
85 """A base class for the signed part of TUF metadata.
87 Objects with base class Signed are usually included in a ``Metadata`` object
88 on the signed attribute. This class provides attributes and methods that
89 are common for all TUF metadata types (roles).
91 *All parameters named below are not just constructor arguments but also
92 instance attributes.*
94 Args:
95 version: Metadata version number. If None, then 1 is assigned.
96 spec_version: Supported TUF specification version. If None, then the
97 version currently supported by the library is assigned.
98 expires: Metadata expiry date in UTC timezone. If None, then current
99 date and time is assigned.
100 unrecognized_fields: Dictionary of all attributes that are not managed
101 by TUF Metadata API
103 Raises:
104 ValueError: Invalid arguments.
105 """
107 # type is required for static reference without changing the API
108 type: ClassVar[str] = "signed"
110 # _type and type are identical: 1st replicates file format, 2nd passes lint
111 @property
112 def _type(self) -> str:
113 return self.type
115 @property
116 def expires(self) -> datetime:
117 """Get the metadata expiry date."""
118 return self._expires
120 @expires.setter
121 def expires(self, value: datetime) -> None:
122 """Set the metadata expiry date.
124 # Use 'datetime' module to e.g. expire in seven days from now
125 obj.expires = now(timezone.utc) + timedelta(days=7)
126 """
127 self._expires = value.replace(microsecond=0)
128 if self._expires.tzinfo is None:
129 # Naive datetime: just make it UTC
130 self._expires = self._expires.replace(tzinfo=timezone.utc)
131 elif self._expires.tzinfo != timezone.utc:
132 raise ValueError(f"Expected tz UTC, not {self._expires.tzinfo}")
134 # NOTE: Signed is a stupid name, because this might not be signed yet, but
135 # we keep it to match spec terminology (I often refer to this as "payload",
136 # or "inner metadata")
137 def __init__(
138 self,
139 version: int | None,
140 spec_version: str | None,
141 expires: datetime | None,
142 unrecognized_fields: dict[str, Any] | None,
143 ):
144 if spec_version is None:
145 spec_version = ".".join(SPECIFICATION_VERSION)
146 # Accept semver (X.Y.Z) but also X.Y for legacy compatibility
147 spec_list = spec_version.split(".")
148 if len(spec_list) not in [2, 3] or not all(
149 el.isdigit() for el in spec_list
150 ):
151 raise ValueError(f"Failed to parse spec_version {spec_version}")
153 # major version must match
154 if spec_list[0] != SPECIFICATION_VERSION[0]:
155 raise ValueError(f"Unsupported spec_version {spec_version}")
157 self.spec_version = spec_version
159 self.expires = expires or datetime.now(timezone.utc)
161 if version is None:
162 version = 1
163 elif version <= 0:
164 raise ValueError(f"version must be > 0, got {version}")
165 self.version = version
167 if unrecognized_fields is None:
168 unrecognized_fields = {}
170 self.unrecognized_fields = unrecognized_fields
172 def __eq__(self, other: object) -> bool:
173 if not isinstance(other, Signed):
174 return False
176 return (
177 self.type == other.type
178 and self.version == other.version
179 and self.spec_version == other.spec_version
180 and self.expires == other.expires
181 and self.unrecognized_fields == other.unrecognized_fields
182 )
184 def __hash__(self) -> int:
185 return hash(
186 (
187 self.type,
188 self.version,
189 self.spec_version,
190 self.expires,
191 self.unrecognized_fields,
192 )
193 )
195 @abc.abstractmethod
196 def to_dict(self) -> dict[str, Any]:
197 """Serialize and return a dict representation of self."""
198 raise NotImplementedError
200 @classmethod
201 @abc.abstractmethod
202 def from_dict(cls, signed_dict: dict[str, Any]) -> Signed:
203 """Deserialization helper, creates object from json/dict
204 representation.
205 """
206 raise NotImplementedError
208 @classmethod
209 def _common_fields_from_dict(
210 cls, signed_dict: dict[str, Any]
211 ) -> tuple[int, str, datetime]:
212 """Return common fields of ``Signed`` instances from the passed dict
213 representation, and returns an ordered list to be passed as leading
214 positional arguments to a subclass constructor.
216 See ``{Root, Timestamp, Snapshot, Targets}.from_dict``
217 methods for usage.
219 """
220 _type = signed_dict.pop("_type")
221 if _type != cls.type:
222 raise ValueError(f"Expected type {cls.type}, got {_type}")
224 version = signed_dict.pop("version")
225 spec_version = signed_dict.pop("spec_version")
226 expires_str = signed_dict.pop("expires")
227 # Convert 'expires' TUF metadata string to a datetime object, which is
228 # what the constructor expects and what we store. The inverse operation
229 # is implemented in '_common_fields_to_dict'.
230 expires = datetime.strptime(expires_str, "%Y-%m-%dT%H:%M:%SZ").replace(
231 tzinfo=timezone.utc
232 )
234 return version, spec_version, expires
236 def _common_fields_to_dict(self) -> dict[str, Any]:
237 """Return a dict representation of common fields of
238 ``Signed`` instances.
240 See ``{Root, Timestamp, Snapshot, Targets}.to_dict`` methods for usage.
242 """
243 return {
244 "_type": self._type,
245 "version": self.version,
246 "spec_version": self.spec_version,
247 "expires": self.expires.strftime("%Y-%m-%dT%H:%M:%SZ"),
248 **self.unrecognized_fields,
249 }
251 def is_expired(self, reference_time: datetime | None = None) -> bool:
252 """Check metadata expiration against a reference time.
254 Args:
255 reference_time: Time to check expiration date against. A naive
256 datetime in UTC expected. Default is current UTC date and time.
258 Returns:
259 ``True`` if expiration time is less than the reference time.
260 """
261 if reference_time is None:
262 reference_time = datetime.now(timezone.utc)
264 return reference_time >= self.expires
267class Role:
268 """Container that defines which keys are required to sign roles metadata.
270 Role defines how many keys are required to successfully sign the roles
271 metadata, and which keys are accepted.
273 *All parameters named below are not just constructor arguments but also
274 instance attributes.*
276 Args:
277 keyids: Roles signing key identifiers.
278 threshold: Number of keys required to sign this role's metadata.
279 unrecognized_fields: Dictionary of all attributes that are not managed
280 by TUF Metadata API
282 Raises:
283 ValueError: Invalid arguments.
284 """
286 def __init__(
287 self,
288 keyids: list[str],
289 threshold: int,
290 unrecognized_fields: dict[str, Any] | None = None,
291 ):
292 if len(set(keyids)) != len(keyids):
293 raise ValueError(f"Nonunique keyids: {keyids}")
294 if threshold < 1:
295 raise ValueError("threshold should be at least 1!")
296 self.keyids = keyids
297 self.threshold = threshold
298 if unrecognized_fields is None:
299 unrecognized_fields = {}
301 self.unrecognized_fields = unrecognized_fields
303 def __eq__(self, other: object) -> bool:
304 if not isinstance(other, Role):
305 return False
307 return (
308 self.keyids == other.keyids
309 and self.threshold == other.threshold
310 and self.unrecognized_fields == other.unrecognized_fields
311 )
313 def __hash__(self) -> int:
314 return hash((self.keyids, self.threshold, self.unrecognized_fields))
316 @classmethod
317 def from_dict(cls, role_dict: dict[str, Any]) -> Role:
318 """Create ``Role`` object from its json/dict representation.
320 Raises:
321 ValueError, KeyError: Invalid arguments.
322 """
323 keyids = role_dict.pop("keyids")
324 threshold = role_dict.pop("threshold")
325 # All fields left in the role_dict are unrecognized.
326 return cls(keyids, threshold, role_dict)
328 def to_dict(self) -> dict[str, Any]:
329 """Return the dictionary representation of self."""
330 return {
331 "keyids": self.keyids,
332 "threshold": self.threshold,
333 **self.unrecognized_fields,
334 }
337@dataclass
338class VerificationResult:
339 """Signature verification result for delegated role metadata.
341 Attributes:
342 threshold: Number of required signatures.
343 signed: dict of keyid to Key, containing keys that have signed.
344 unsigned: dict of keyid to Key, containing keys that have not signed.
345 """
347 threshold: int
348 signed: dict[str, Key]
349 unsigned: dict[str, Key]
351 def __bool__(self) -> bool:
352 return self.verified
354 @property
355 def verified(self) -> bool:
356 """True if threshold of signatures is met."""
357 return len(self.signed) >= self.threshold
359 @property
360 def missing(self) -> int:
361 """Number of additional signatures required to reach threshold."""
362 return max(0, self.threshold - len(self.signed))
365@dataclass
366class RootVerificationResult:
367 """Signature verification result for root metadata.
369 Root must be verified by itself and the previous root version. This
370 dataclass represents both results. For the edge case of first version
371 of root, these underlying results are identical.
373 Note that `signed` and `unsigned` correctness requires the underlying
374 VerificationResult keys to not conflict (no reusing the same keyid for
375 different keys).
377 Attributes:
378 first: First underlying VerificationResult
379 second: Second underlying VerificationResult
380 """
382 first: VerificationResult
383 second: VerificationResult
385 def __bool__(self) -> bool:
386 return self.verified
388 @property
389 def verified(self) -> bool:
390 """True if threshold of signatures is met in both underlying
391 VerificationResults.
392 """
393 return self.first.verified and self.second.verified
395 @property
396 def signed(self) -> dict[str, Key]:
397 """Dictionary of all signing keys that have signed, from both
398 VerificationResults.
399 """
400 return self.first.signed | self.second.signed
402 @property
403 def unsigned(self) -> dict[str, Key]:
404 """Dictionary of all signing keys that have not signed, from both
405 VerificationResults.
406 """
407 return self.first.unsigned | self.second.unsigned
410class _DelegatorMixin(metaclass=abc.ABCMeta):
411 """Class that implements verify_delegate() for Root and Targets"""
413 @abc.abstractmethod
414 def get_delegated_role(self, delegated_role: str) -> Role:
415 """Return the role object for the given delegated role.
417 Raises ValueError if delegated_role is not actually delegated.
418 """
419 raise NotImplementedError
421 @abc.abstractmethod
422 def get_key(self, keyid: str) -> Key:
423 """Return the key object for the given keyid.
425 Raises ValueError if key is not found.
426 """
427 raise NotImplementedError
429 def get_verification_result(
430 self,
431 delegated_role: str,
432 payload: bytes,
433 signatures: dict[str, Signature],
434 ) -> VerificationResult:
435 """Return signature threshold verification result for delegated role.
437 NOTE: Unlike `verify_delegate()` this method does not raise, if the
438 role metadata is not fully verified.
440 Args:
441 delegated_role: Name of the delegated role to verify
442 payload: Signed payload bytes for the delegated role
443 signatures: Signatures over payload bytes
445 Raises:
446 ValueError: no delegation was found for ``delegated_role``.
447 """
448 role = self.get_delegated_role(delegated_role)
450 signed = {}
451 unsigned = {}
453 for keyid in role.keyids:
454 try:
455 key = self.get_key(keyid)
456 except ValueError:
457 logger.info("No key for keyid %s", keyid)
458 continue
460 if keyid not in signatures:
461 unsigned[keyid] = key
462 logger.info("No signature for keyid %s", keyid)
463 continue
465 sig = signatures[keyid]
466 try:
467 key.verify_signature(sig, payload)
468 signed[keyid] = key
469 except sslib_exceptions.UnverifiedSignatureError:
470 unsigned[keyid] = key
471 logger.info("Key %s failed to verify %s", keyid, delegated_role)
473 return VerificationResult(role.threshold, signed, unsigned)
475 def verify_delegate(
476 self,
477 delegated_role: str,
478 payload: bytes,
479 signatures: dict[str, Signature],
480 ) -> None:
481 """Verify signature threshold for delegated role.
483 Verify that there are enough valid ``signatures`` over ``payload``, to
484 meet the threshold of keys for ``delegated_role``, as defined by the
485 delegator (``self``).
487 Args:
488 delegated_role: Name of the delegated role to verify
489 payload: Signed payload bytes for the delegated role
490 signatures: Signatures over payload bytes
492 Raises:
493 UnsignedMetadataError: ``delegated_role`` was not signed with
494 required threshold of keys for ``role_name``.
495 ValueError: no delegation was found for ``delegated_role``.
496 """
497 result = self.get_verification_result(
498 delegated_role, payload, signatures
499 )
500 if not result:
501 raise UnsignedMetadataError(
502 f"{delegated_role} was signed by {len(result.signed)}/"
503 f"{result.threshold} keys"
504 )
507class Root(Signed, _DelegatorMixin):
508 """A container for the signed part of root metadata.
510 Parameters listed below are also instance attributes.
512 Args:
513 version: Metadata version number. Default is 1.
514 spec_version: Supported TUF specification version. Default is the
515 version currently supported by the library.
516 expires: Metadata expiry date. Default is current date and time.
517 keys: Dictionary of keyids to Keys. Defines the keys used in ``roles``.
518 Default is empty dictionary.
519 roles: Dictionary of role names to Roles. Defines which keys are
520 required to sign the metadata for a specific role. Default is
521 a dictionary of top level roles without keys and threshold of 1.
522 consistent_snapshot: ``True`` if repository supports consistent
523 snapshots. Default is True.
524 unrecognized_fields: Dictionary of all attributes that are not managed
525 by TUF Metadata API
527 Raises:
528 ValueError: Invalid arguments.
529 """
531 type = _ROOT
533 def __init__(
534 self,
535 version: int | None = None,
536 spec_version: str | None = None,
537 expires: datetime | None = None,
538 keys: dict[str, Key] | None = None,
539 roles: dict[str, Role] | None = None,
540 consistent_snapshot: bool | None = True,
541 unrecognized_fields: dict[str, Any] | None = None,
542 ):
543 super().__init__(version, spec_version, expires, unrecognized_fields)
544 self.consistent_snapshot = consistent_snapshot
545 self.keys = keys if keys is not None else {}
547 if roles is None:
548 roles = {r: Role([], 1) for r in TOP_LEVEL_ROLE_NAMES}
549 elif set(roles) != TOP_LEVEL_ROLE_NAMES:
550 raise ValueError("Role names must be the top-level metadata roles")
551 self.roles = roles
553 def __eq__(self, other: object) -> bool:
554 if not isinstance(other, Root):
555 return False
557 return (
558 super().__eq__(other)
559 and self.keys == other.keys
560 and self.roles == other.roles
561 and self.consistent_snapshot == other.consistent_snapshot
562 )
564 def __hash__(self) -> int:
565 return hash(
566 (
567 super().__hash__(),
568 self.keys,
569 self.roles,
570 self.consistent_snapshot,
571 self.unrecognized_fields,
572 )
573 )
575 @classmethod
576 def from_dict(cls, signed_dict: dict[str, Any]) -> Root:
577 """Create ``Root`` object from its json/dict representation.
579 Raises:
580 ValueError, KeyError, TypeError: Invalid arguments.
581 """
582 common_args = cls._common_fields_from_dict(signed_dict)
583 consistent_snapshot = signed_dict.pop("consistent_snapshot", None)
584 keys = signed_dict.pop("keys")
585 roles = signed_dict.pop("roles")
587 for keyid, key_dict in keys.items():
588 keys[keyid] = Key.from_dict(keyid, key_dict)
589 for role_name, role_dict in roles.items():
590 roles[role_name] = Role.from_dict(role_dict)
592 # All fields left in the signed_dict are unrecognized.
593 return cls(*common_args, keys, roles, consistent_snapshot, signed_dict)
595 def to_dict(self) -> dict[str, Any]:
596 """Return the dict representation of self."""
597 root_dict = self._common_fields_to_dict()
598 keys = {keyid: key.to_dict() for (keyid, key) in self.keys.items()}
599 roles = {}
600 for role_name, role in self.roles.items():
601 roles[role_name] = role.to_dict()
602 if self.consistent_snapshot is not None:
603 root_dict["consistent_snapshot"] = self.consistent_snapshot
605 root_dict.update(
606 {
607 "keys": keys,
608 "roles": roles,
609 }
610 )
611 return root_dict
613 def add_key(self, key: Key, role: str) -> None:
614 """Add new signing key for delegated role ``role``.
616 Args:
617 key: Signing key to be added for ``role``.
618 role: Name of the role, for which ``key`` is added.
620 Raises:
621 ValueError: If the argument order is wrong or if ``role`` doesn't
622 exist.
623 """
624 # Verify that our users are not using the old argument order.
625 if isinstance(role, Key):
626 raise ValueError("Role must be a string, not a Key instance")
628 if role not in self.roles:
629 raise ValueError(f"Role {role} doesn't exist")
630 if key.keyid not in self.roles[role].keyids:
631 self.roles[role].keyids.append(key.keyid)
632 self.keys[key.keyid] = key
634 def revoke_key(self, keyid: str, role: str) -> None:
635 """Revoke key from ``role`` and updates the key store.
637 Args:
638 keyid: Identifier of the key to be removed for ``role``.
639 role: Name of the role, for which a signing key is removed.
641 Raises:
642 ValueError: If ``role`` doesn't exist or if ``role`` doesn't include
643 the key.
644 """
645 if role not in self.roles:
646 raise ValueError(f"Role {role} doesn't exist")
647 if keyid not in self.roles[role].keyids:
648 raise ValueError(f"Key with id {keyid} is not used by {role}")
649 self.roles[role].keyids.remove(keyid)
650 for keyinfo in self.roles.values():
651 if keyid in keyinfo.keyids:
652 return
654 del self.keys[keyid]
656 def get_delegated_role(self, delegated_role: str) -> Role:
657 """Return the role object for the given delegated role.
659 Raises ValueError if delegated_role is not actually delegated.
660 """
661 if delegated_role not in self.roles:
662 raise ValueError(f"Delegated role {delegated_role} not found")
664 return self.roles[delegated_role]
666 def get_key(self, keyid: str) -> Key:
667 if keyid not in self.keys:
668 raise ValueError(f"Key {keyid} not found")
670 return self.keys[keyid]
672 def get_root_verification_result(
673 self,
674 previous: Root | None,
675 payload: bytes,
676 signatures: dict[str, Signature],
677 ) -> RootVerificationResult:
678 """Return signature threshold verification result for two root roles.
680 Verify root metadata with two roles (`self` and optionally `previous`).
682 If the repository has no root role versions yet, `previous` can be left
683 None. In all other cases, `previous` must be the previous version of
684 the Root.
686 NOTE: Unlike `verify_delegate()` this method does not raise, if the
687 root metadata is not fully verified.
689 Args:
690 previous: The previous `Root` to verify payload with, or None
691 payload: Signed payload bytes for root
692 signatures: Signatures over payload bytes
694 Raises:
695 ValueError: no delegation was found for ``root`` or given Root
696 versions are not sequential.
697 """
699 if previous is None:
700 previous = self
701 elif self.version != previous.version + 1:
702 versions = f"v{previous.version} and v{self.version}"
703 raise ValueError(
704 f"Expected sequential root versions, got {versions}."
705 )
707 return RootVerificationResult(
708 previous.get_verification_result(Root.type, payload, signatures),
709 self.get_verification_result(Root.type, payload, signatures),
710 )
713class BaseFile:
714 """A base class of ``MetaFile`` and ``TargetFile``.
716 Encapsulates common static methods for length and hash verification.
717 """
719 @staticmethod
720 def _verify_hashes(
721 data: bytes | IO[bytes], expected_hashes: dict[str, str]
722 ) -> None:
723 """Verify that the hash of ``data`` matches ``expected_hashes``."""
724 for algo, exp_hash in expected_hashes.items():
725 try:
726 if isinstance(data, bytes):
727 observed_hash = _hash_bytes(data, algo)
728 else:
729 # if data is not bytes, assume it is a file object
730 observed_hash = _hash_file(data, algo)
731 except (ValueError, TypeError) as e:
732 raise LengthOrHashMismatchError(
733 f"Unsupported algorithm '{algo}'"
734 ) from e
736 if observed_hash != exp_hash:
737 raise LengthOrHashMismatchError(
738 f"Observed hash {observed_hash} does not match "
739 f"expected hash {exp_hash}"
740 )
742 @staticmethod
743 def _verify_length(data: bytes | IO[bytes], expected_length: int) -> None:
744 """Verify that the length of ``data`` matches ``expected_length``."""
745 if isinstance(data, bytes):
746 observed_length = len(data)
747 else:
748 # if data is not bytes, assume it is a file object
749 data.seek(0, io.SEEK_END)
750 observed_length = data.tell()
752 if observed_length != expected_length:
753 raise LengthOrHashMismatchError(
754 f"Observed length {observed_length} does not match "
755 f"expected length {expected_length}"
756 )
758 @staticmethod
759 def _validate_hashes(hashes: dict[str, str]) -> None:
760 if not hashes:
761 raise ValueError("Hashes must be a non empty dictionary")
762 for key, value in hashes.items():
763 if not (isinstance(key, str) and isinstance(value, str)):
764 raise TypeError("Hashes items must be strings")
766 @staticmethod
767 def _validate_length(length: int) -> None:
768 if length < 0:
769 raise ValueError(f"Length must be >= 0, got {length}")
771 @staticmethod
772 def _get_length_and_hashes(
773 data: bytes | IO[bytes], hash_algorithms: list[str] | None
774 ) -> tuple[int, dict[str, str]]:
775 """Calculate length and hashes of ``data``."""
776 if isinstance(data, bytes):
777 length = len(data)
778 else:
779 data.seek(0, io.SEEK_END)
780 length = data.tell()
782 hashes = {}
784 if hash_algorithms is None:
785 hash_algorithms = [_DEFAULT_HASH_ALGORITHM]
787 for algorithm in hash_algorithms:
788 try:
789 if isinstance(data, bytes):
790 hashes[algorithm] = _hash_bytes(data, algorithm)
791 else:
792 hashes[algorithm] = _hash_file(data, algorithm)
793 except (ValueError, TypeError) as e:
794 raise ValueError(f"Unsupported algorithm '{algorithm}'") from e
796 return (length, hashes)
799class MetaFile(BaseFile):
800 """A container with information about a particular metadata file.
802 *All parameters named below are not just constructor arguments but also
803 instance attributes.*
805 Args:
806 version: Version of the metadata file.
807 length: Length of the metadata file in bytes.
808 hashes: Dictionary of hash algorithm names to hashes of the metadata
809 file content.
810 unrecognized_fields: Dictionary of all attributes that are not managed
811 by TUF Metadata API
813 Raises:
814 ValueError, TypeError: Invalid arguments.
815 """
817 def __init__(
818 self,
819 version: int = 1,
820 length: int | None = None,
821 hashes: dict[str, str] | None = None,
822 unrecognized_fields: dict[str, Any] | None = None,
823 ):
824 if version <= 0:
825 raise ValueError(f"Metafile version must be > 0, got {version}")
826 if length is not None:
827 self._validate_length(length)
828 if hashes is not None:
829 self._validate_hashes(hashes)
831 self.version = version
832 self.length = length
833 self.hashes = hashes
834 if unrecognized_fields is None:
835 unrecognized_fields = {}
837 self.unrecognized_fields = unrecognized_fields
839 def __eq__(self, other: object) -> bool:
840 if not isinstance(other, MetaFile):
841 return False
843 return (
844 self.version == other.version
845 and self.length == other.length
846 and self.hashes == other.hashes
847 and self.unrecognized_fields == other.unrecognized_fields
848 )
850 def __hash__(self) -> int:
851 return hash(
852 (self.version, self.length, self.hashes, self.unrecognized_fields)
853 )
855 @classmethod
856 def from_dict(cls, meta_dict: dict[str, Any]) -> MetaFile:
857 """Create ``MetaFile`` object from its json/dict representation.
859 Raises:
860 ValueError, KeyError: Invalid arguments.
861 """
862 version = meta_dict.pop("version")
863 length = meta_dict.pop("length", None)
864 hashes = meta_dict.pop("hashes", None)
866 # All fields left in the meta_dict are unrecognized.
867 return cls(version, length, hashes, meta_dict)
869 @classmethod
870 def from_data(
871 cls,
872 version: int,
873 data: bytes | IO[bytes],
874 hash_algorithms: list[str],
875 ) -> MetaFile:
876 """Creates MetaFile object from bytes.
877 This constructor should only be used if hashes are wanted.
878 By default, MetaFile(ver) should be used.
879 Args:
880 version: Version of the metadata file.
881 data: Metadata bytes that the metafile represents.
882 hash_algorithms: Hash algorithms to create the hashes with. If not
883 specified, "sha256" is used.
885 Raises:
886 ValueError: The hash algorithms list contains an unsupported
887 algorithm.
888 """
889 length, hashes = cls._get_length_and_hashes(data, hash_algorithms)
890 return cls(version, length, hashes)
892 def to_dict(self) -> dict[str, Any]:
893 """Return the dictionary representation of self."""
894 res_dict: dict[str, Any] = {
895 "version": self.version,
896 **self.unrecognized_fields,
897 }
899 if self.length is not None:
900 res_dict["length"] = self.length
902 if self.hashes is not None:
903 res_dict["hashes"] = self.hashes
905 return res_dict
907 def verify_length_and_hashes(self, data: bytes | IO[bytes]) -> None:
908 """Verify that the length and hashes of ``data`` match expected values.
910 Args:
911 data: File object or its content in bytes.
913 Raises:
914 LengthOrHashMismatchError: Calculated length or hashes do not
915 match expected values or hash algorithm is not supported.
916 """
917 if self.length is not None:
918 self._verify_length(data, self.length)
920 if self.hashes is not None:
921 self._verify_hashes(data, self.hashes)
924class Timestamp(Signed):
925 """A container for the signed part of timestamp metadata.
927 TUF file format uses a dictionary to contain the snapshot information:
928 this is not the case with ``Timestamp.snapshot_meta`` which is a
929 ``MetaFile``.
931 *All parameters named below are not just constructor arguments but also
932 instance attributes.*
934 Args:
935 version: Metadata version number. Default is 1.
936 spec_version: Supported TUF specification version. Default is the
937 version currently supported by the library.
938 expires: Metadata expiry date. Default is current date and time.
939 unrecognized_fields: Dictionary of all attributes that are not managed
940 by TUF Metadata API
941 snapshot_meta: Meta information for snapshot metadata. Default is a
942 MetaFile with version 1.
944 Raises:
945 ValueError: Invalid arguments.
946 """
948 type = _TIMESTAMP
950 def __init__(
951 self,
952 version: int | None = None,
953 spec_version: str | None = None,
954 expires: datetime | None = None,
955 snapshot_meta: MetaFile | None = None,
956 unrecognized_fields: dict[str, Any] | None = None,
957 ):
958 super().__init__(version, spec_version, expires, unrecognized_fields)
959 self.snapshot_meta = snapshot_meta or MetaFile(1)
961 def __eq__(self, other: object) -> bool:
962 if not isinstance(other, Timestamp):
963 return False
965 return (
966 super().__eq__(other) and self.snapshot_meta == other.snapshot_meta
967 )
969 def __hash__(self) -> int:
970 return hash((super().__hash__(), self.snapshot_meta))
972 @classmethod
973 def from_dict(cls, signed_dict: dict[str, Any]) -> Timestamp:
974 """Create ``Timestamp`` object from its json/dict representation.
976 Raises:
977 ValueError, KeyError: Invalid arguments.
978 """
979 common_args = cls._common_fields_from_dict(signed_dict)
980 meta_dict = signed_dict.pop("meta")
981 snapshot_meta = MetaFile.from_dict(meta_dict["snapshot.json"])
982 # All fields left in the timestamp_dict are unrecognized.
983 return cls(*common_args, snapshot_meta, signed_dict)
985 def to_dict(self) -> dict[str, Any]:
986 """Return the dict representation of self."""
987 res_dict = self._common_fields_to_dict()
988 res_dict["meta"] = {"snapshot.json": self.snapshot_meta.to_dict()}
989 return res_dict
992class Snapshot(Signed):
993 """A container for the signed part of snapshot metadata.
995 Snapshot contains information about all target Metadata files.
997 *All parameters named below are not just constructor arguments but also
998 instance attributes.*
1000 Args:
1001 version: Metadata version number. Default is 1.
1002 spec_version: Supported TUF specification version. Default is the
1003 version currently supported by the library.
1004 expires: Metadata expiry date. Default is current date and time.
1005 unrecognized_fields: Dictionary of all attributes that are not managed
1006 by TUF Metadata API
1007 meta: Dictionary of targets filenames to ``MetaFile`` objects. Default
1008 is a dictionary with a Metafile for "snapshot.json" version 1.
1010 Raises:
1011 ValueError: Invalid arguments.
1012 """
1014 type = _SNAPSHOT
1016 def __init__(
1017 self,
1018 version: int | None = None,
1019 spec_version: str | None = None,
1020 expires: datetime | None = None,
1021 meta: dict[str, MetaFile] | None = None,
1022 unrecognized_fields: dict[str, Any] | None = None,
1023 ):
1024 super().__init__(version, spec_version, expires, unrecognized_fields)
1025 self.meta = meta if meta is not None else {"targets.json": MetaFile(1)}
1027 def __eq__(self, other: object) -> bool:
1028 if not isinstance(other, Snapshot):
1029 return False
1031 return super().__eq__(other) and self.meta == other.meta
1033 def __hash__(self) -> int:
1034 return hash((super().__hash__(), self.meta))
1036 @classmethod
1037 def from_dict(cls, signed_dict: dict[str, Any]) -> Snapshot:
1038 """Create ``Snapshot`` object from its json/dict representation.
1040 Raises:
1041 ValueError, KeyError: Invalid arguments.
1042 """
1043 common_args = cls._common_fields_from_dict(signed_dict)
1044 meta_dicts = signed_dict.pop("meta")
1045 meta = {}
1046 for meta_path, meta_dict in meta_dicts.items():
1047 meta[meta_path] = MetaFile.from_dict(meta_dict)
1048 # All fields left in the snapshot_dict are unrecognized.
1049 return cls(*common_args, meta, signed_dict)
1051 def to_dict(self) -> dict[str, Any]:
1052 """Return the dict representation of self."""
1053 snapshot_dict = self._common_fields_to_dict()
1054 meta_dict = {}
1055 for meta_path, meta_info in self.meta.items():
1056 meta_dict[meta_path] = meta_info.to_dict()
1058 snapshot_dict["meta"] = meta_dict
1059 return snapshot_dict
1062class DelegatedRole(Role):
1063 """A container with information about a delegated role.
1065 A delegation can happen in two ways:
1067 - ``paths`` is set: delegates targets matching any path pattern in
1068 ``paths``
1069 - ``path_hash_prefixes`` is set: delegates targets whose target path
1070 hash starts with any of the prefixes in ``path_hash_prefixes``
1072 ``paths`` and ``path_hash_prefixes`` are mutually exclusive:
1073 both cannot be set, at least one of them must be set.
1075 *All parameters named below are not just constructor arguments but also
1076 instance attributes.*
1078 Args:
1079 name: Delegated role name.
1080 keyids: Delegated role signing key identifiers.
1081 threshold: Number of keys required to sign this role's metadata.
1082 terminating: ``True`` if this delegation terminates a target lookup.
1083 paths: Path patterns. See note above.
1084 path_hash_prefixes: Hash prefixes. See note above.
1085 unrecognized_fields: Dictionary of all attributes that are not managed
1086 by TUF Metadata API.
1088 Raises:
1089 ValueError: Invalid arguments.
1090 """
1092 def __init__(
1093 self,
1094 name: str,
1095 keyids: list[str],
1096 threshold: int,
1097 terminating: bool,
1098 paths: list[str] | None = None,
1099 path_hash_prefixes: list[str] | None = None,
1100 unrecognized_fields: dict[str, Any] | None = None,
1101 ):
1102 super().__init__(keyids, threshold, unrecognized_fields)
1103 self.name = name
1104 self.terminating = terminating
1105 exclusive_vars = [paths, path_hash_prefixes]
1106 if sum(1 for var in exclusive_vars if var is not None) != 1:
1107 raise ValueError(
1108 "Only one of (paths, path_hash_prefixes) must be set"
1109 )
1111 if paths is not None and any(not isinstance(p, str) for p in paths):
1112 raise ValueError("Paths must be strings")
1113 if path_hash_prefixes is not None and any(
1114 not isinstance(p, str) for p in path_hash_prefixes
1115 ):
1116 raise ValueError("Path_hash_prefixes must be strings")
1118 self.paths = paths
1119 self.path_hash_prefixes = path_hash_prefixes
1121 def __eq__(self, other: object) -> bool:
1122 if not isinstance(other, DelegatedRole):
1123 return False
1125 return (
1126 super().__eq__(other)
1127 and self.name == other.name
1128 and self.terminating == other.terminating
1129 and self.paths == other.paths
1130 and self.path_hash_prefixes == other.path_hash_prefixes
1131 )
1133 def __hash__(self) -> int:
1134 return hash(
1135 (
1136 super().__hash__(),
1137 self.name,
1138 self.terminating,
1139 self.path,
1140 self.path_hash_prefixes,
1141 )
1142 )
1144 @classmethod
1145 def from_dict(cls, role_dict: dict[str, Any]) -> DelegatedRole:
1146 """Create ``DelegatedRole`` object from its json/dict representation.
1148 Raises:
1149 ValueError, KeyError, TypeError: Invalid arguments.
1150 """
1151 name = role_dict.pop("name")
1152 keyids = role_dict.pop("keyids")
1153 threshold = role_dict.pop("threshold")
1154 terminating = role_dict.pop("terminating")
1155 paths = role_dict.pop("paths", None)
1156 path_hash_prefixes = role_dict.pop("path_hash_prefixes", None)
1157 # All fields left in the role_dict are unrecognized.
1158 return cls(
1159 name,
1160 keyids,
1161 threshold,
1162 terminating,
1163 paths,
1164 path_hash_prefixes,
1165 role_dict,
1166 )
1168 def to_dict(self) -> dict[str, Any]:
1169 """Return the dict representation of self."""
1170 base_role_dict = super().to_dict()
1171 res_dict = {
1172 "name": self.name,
1173 "terminating": self.terminating,
1174 **base_role_dict,
1175 }
1176 if self.paths is not None:
1177 res_dict["paths"] = self.paths
1178 elif self.path_hash_prefixes is not None:
1179 res_dict["path_hash_prefixes"] = self.path_hash_prefixes
1180 return res_dict
1182 @staticmethod
1183 def _is_target_in_pathpattern(targetpath: str, pathpattern: str) -> bool:
1184 """Determine whether ``targetpath`` matches the ``pathpattern``."""
1185 # We need to make sure that targetpath and pathpattern are pointing to
1186 # the same directory as fnmatch doesn't threat "/" as a special symbol.
1187 target_parts = targetpath.split("/")
1188 pattern_parts = pathpattern.split("/")
1189 if len(target_parts) != len(pattern_parts):
1190 return False
1192 # Every part in the pathpattern could include a glob pattern, that's why
1193 # each of the target and pathpattern parts should match.
1194 for target, pattern in zip(target_parts, pattern_parts, strict=True):
1195 if not fnmatch.fnmatchcase(target, pattern):
1196 return False
1198 return True
1200 def is_delegated_path(self, target_filepath: str) -> bool:
1201 """Determine whether the given ``target_filepath`` is in one of
1202 the paths that ``DelegatedRole`` is trusted to provide.
1204 The ``target_filepath`` and the ``DelegatedRole`` paths are expected to
1205 be in their canonical forms, so e.g. "a/b" instead of "a//b" . Only "/"
1206 is supported as target path separator. Leading separators are not
1207 handled as special cases (see `TUF specification on targetpath
1208 <https://theupdateframework.github.io/specification/latest/#targetpath>`_).
1210 Args:
1211 target_filepath: URL path to a target file, relative to a base
1212 targets URL.
1213 """
1215 if self.path_hash_prefixes is not None:
1216 # Calculate the hash of the filepath
1217 # to determine in which bin to find the target.
1218 digest_object = hashlib.new(name="sha256")
1219 digest_object.update(target_filepath.encode("utf-8"))
1220 target_filepath_hash = digest_object.hexdigest()
1222 for path_hash_prefix in self.path_hash_prefixes:
1223 if target_filepath_hash.startswith(path_hash_prefix):
1224 return True
1226 elif self.paths is not None:
1227 for pathpattern in self.paths:
1228 # A delegated role path may be an explicit path or glob
1229 # pattern (Unix shell-style wildcards).
1230 if self._is_target_in_pathpattern(target_filepath, pathpattern):
1231 return True
1233 return False
1236class SuccinctRoles(Role):
1237 """Succinctly defines a hash bin delegation graph.
1239 A ``SuccinctRoles`` object describes a delegation graph that covers all
1240 targets, distributing them uniformly over the delegated roles (i.e. bins)
1241 in the graph.
1243 The total number of bins is 2 to the power of the passed ``bit_length``.
1245 Bin names are the concatenation of the passed ``name_prefix`` and a
1246 zero-padded hex representation of the bin index separated by a hyphen.
1248 The passed ``keyids`` and ``threshold`` is used for each bin, and each bin
1249 is 'terminating'.
1251 For details: https://github.com/theupdateframework/taps/blob/master/tap15.md
1253 Args:
1254 keyids: Signing key identifiers for any bin metadata.
1255 threshold: Number of keys required to sign any bin metadata.
1256 bit_length: Number of bits between 1 and 32.
1257 name_prefix: Prefix of all bin names.
1258 unrecognized_fields: Dictionary of all attributes that are not managed
1259 by TUF Metadata API.
1261 Raises:
1262 ValueError, TypeError, AttributeError: Invalid arguments.
1263 """
1265 def __init__(
1266 self,
1267 keyids: list[str],
1268 threshold: int,
1269 bit_length: int,
1270 name_prefix: str,
1271 unrecognized_fields: dict[str, Any] | None = None,
1272 ) -> None:
1273 super().__init__(keyids, threshold, unrecognized_fields)
1275 if bit_length <= 0 or bit_length > 32:
1276 raise ValueError("bit_length must be between 1 and 32")
1277 if not isinstance(name_prefix, str):
1278 raise ValueError("name_prefix must be a string")
1280 self.bit_length = bit_length
1281 self.name_prefix = name_prefix
1283 # Calculate the suffix_len value based on the total number of bins in
1284 # hex. If bit_length = 10 then number_of_bins = 1024 or bin names will
1285 # have a suffix between "000" and "3ff" in hex and suffix_len will be 3
1286 # meaning the third bin will have a suffix of "003".
1287 self.number_of_bins = 2**bit_length
1288 # suffix_len is calculated based on "number_of_bins - 1" as the name
1289 # of the last bin contains the number "number_of_bins -1" as a suffix.
1290 self.suffix_len = len(f"{self.number_of_bins - 1:x}")
1292 def __eq__(self, other: object) -> bool:
1293 if not isinstance(other, SuccinctRoles):
1294 return False
1296 return (
1297 super().__eq__(other)
1298 and self.bit_length == other.bit_length
1299 and self.name_prefix == other.name_prefix
1300 )
1302 def __hash__(self) -> int:
1303 return hash((super().__hash__(), self.bit_length, self.name_prefix))
1305 @classmethod
1306 def from_dict(cls, role_dict: dict[str, Any]) -> SuccinctRoles:
1307 """Create ``SuccinctRoles`` object from its json/dict representation.
1309 Raises:
1310 ValueError, KeyError, AttributeError, TypeError: Invalid arguments.
1311 """
1312 keyids = role_dict.pop("keyids")
1313 threshold = role_dict.pop("threshold")
1314 bit_length = role_dict.pop("bit_length")
1315 name_prefix = role_dict.pop("name_prefix")
1316 # All fields left in the role_dict are unrecognized.
1317 return cls(keyids, threshold, bit_length, name_prefix, role_dict)
1319 def to_dict(self) -> dict[str, Any]:
1320 """Return the dict representation of self."""
1321 base_role_dict = super().to_dict()
1322 return {
1323 "bit_length": self.bit_length,
1324 "name_prefix": self.name_prefix,
1325 **base_role_dict,
1326 }
1328 def get_role_for_target(self, target_filepath: str) -> str:
1329 """Calculate the name of the delegated role responsible for
1330 ``target_filepath``.
1332 The target at path ``target_filepath`` is assigned to a bin by casting
1333 the left-most ``bit_length`` of bits of the file path hash digest to
1334 int, using it as bin index between 0 and ``2**bit_length - 1``.
1336 Args:
1337 target_filepath: URL path to a target file, relative to a base
1338 targets URL.
1339 """
1340 hasher = hashlib.new(name="sha256")
1341 hasher.update(target_filepath.encode("utf-8"))
1343 # We can't ever need more than 4 bytes (32 bits).
1344 hash_bytes = hasher.digest()[:4]
1345 # Right shift hash bytes, so that we only have the leftmost
1346 # bit_length bits that we care about.
1347 shift_value = 32 - self.bit_length
1348 bin_number = int.from_bytes(hash_bytes, byteorder="big") >> shift_value
1349 # Add zero padding if necessary and cast to hex the suffix.
1350 suffix = f"{bin_number:0{self.suffix_len}x}"
1351 return f"{self.name_prefix}-{suffix}"
1353 def get_roles(self) -> Iterator[str]:
1354 """Yield the names of all different delegated roles one by one."""
1355 for i in range(self.number_of_bins):
1356 suffix = f"{i:0{self.suffix_len}x}"
1357 yield f"{self.name_prefix}-{suffix}"
1359 def is_delegated_role(self, role_name: str) -> bool:
1360 """Determine whether the given ``role_name`` is in one of
1361 the delegated roles that ``SuccinctRoles`` represents.
1363 Args:
1364 role_name: The name of the role to check against.
1365 """
1366 desired_prefix = self.name_prefix + "-"
1368 if not role_name.startswith(desired_prefix):
1369 return False
1371 suffix = role_name[len(desired_prefix) :]
1372 if len(suffix) != self.suffix_len:
1373 return False
1375 try:
1376 # make sure suffix is hex value
1377 num = int(suffix, 16)
1378 except ValueError:
1379 return False
1381 return 0 <= num < self.number_of_bins
1384class Delegations:
1385 """A container object storing information about all delegations.
1387 *All parameters named below are not just constructor arguments but also
1388 instance attributes.*
1390 Args:
1391 keys: Dictionary of keyids to Keys. Defines the keys used in ``roles``.
1392 roles: Ordered dictionary of role names to DelegatedRoles instances. It
1393 defines which keys are required to sign the metadata for a specific
1394 role. The roles order also defines the order that role delegations
1395 are considered during target searches.
1396 succinct_roles: Contains succinct information about hash bin
1397 delegations. Note that succinct roles is not a TUF specification
1398 feature yet and setting `succinct_roles` to a value makes the
1399 resulting metadata non-compliant. The metadata will not be accepted
1400 as valid by specification compliant clients such as those built with
1401 python-tuf <= 1.1.0. For more information see: https://github.com/theupdateframework/taps/blob/master/tap15.md
1402 unrecognized_fields: Dictionary of all attributes that are not managed
1403 by TUF Metadata API
1405 Exactly one of ``roles`` and ``succinct_roles`` must be set.
1407 Raises:
1408 ValueError: Invalid arguments.
1409 """
1411 def __init__(
1412 self,
1413 keys: dict[str, Key],
1414 roles: dict[str, DelegatedRole] | None = None,
1415 succinct_roles: SuccinctRoles | None = None,
1416 unrecognized_fields: dict[str, Any] | None = None,
1417 ):
1418 self.keys = keys
1419 if sum(1 for v in [roles, succinct_roles] if v is not None) != 1:
1420 raise ValueError("One of roles and succinct_roles must be set")
1422 if roles is not None:
1423 for role in roles:
1424 if not role or role in TOP_LEVEL_ROLE_NAMES:
1425 raise ValueError(
1426 "Delegated roles cannot be empty or use top-level "
1427 "role names"
1428 )
1430 self.roles = roles
1431 self.succinct_roles = succinct_roles
1432 if unrecognized_fields is None:
1433 unrecognized_fields = {}
1435 self.unrecognized_fields = unrecognized_fields
1437 def __eq__(self, other: object) -> bool:
1438 if not isinstance(other, Delegations):
1439 return False
1441 all_attributes_check = (
1442 self.keys == other.keys
1443 and self.roles == other.roles
1444 and self.succinct_roles == other.succinct_roles
1445 and self.unrecognized_fields == other.unrecognized_fields
1446 )
1448 if self.roles is not None and other.roles is not None:
1449 all_attributes_check = (
1450 all_attributes_check
1451 # Order of the delegated roles matters (see issue #1788).
1452 and list(self.roles.items()) == list(other.roles.items())
1453 )
1455 return all_attributes_check
1457 def __hash__(self) -> int:
1458 return hash(
1459 (
1460 self.keys,
1461 self.roles,
1462 self.succinct_roles,
1463 self.unrecognized_fields,
1464 )
1465 )
1467 @classmethod
1468 def from_dict(cls, delegations_dict: dict[str, Any]) -> Delegations:
1469 """Create ``Delegations`` object from its json/dict representation.
1471 Raises:
1472 ValueError, KeyError, TypeError: Invalid arguments.
1473 """
1474 keys = delegations_dict.pop("keys")
1475 keys_res = {}
1476 for keyid, key_dict in keys.items():
1477 keys_res[keyid] = Key.from_dict(keyid, key_dict)
1478 roles = delegations_dict.pop("roles", None)
1479 roles_res: dict[str, DelegatedRole] | None = None
1481 if roles is not None:
1482 roles_res = {}
1483 for role_dict in roles:
1484 new_role = DelegatedRole.from_dict(role_dict)
1485 if new_role.name in roles_res:
1486 raise ValueError(f"Duplicate role {new_role.name}")
1487 roles_res[new_role.name] = new_role
1489 succinct_roles_dict = delegations_dict.pop("succinct_roles", None)
1490 succinct_roles_info = None
1491 if succinct_roles_dict is not None:
1492 succinct_roles_info = SuccinctRoles.from_dict(succinct_roles_dict)
1494 # All fields left in the delegations_dict are unrecognized.
1495 return cls(keys_res, roles_res, succinct_roles_info, delegations_dict)
1497 def to_dict(self) -> dict[str, Any]:
1498 """Return the dict representation of self."""
1499 keys = {keyid: key.to_dict() for keyid, key in self.keys.items()}
1500 res_dict: dict[str, Any] = {
1501 "keys": keys,
1502 **self.unrecognized_fields,
1503 }
1504 if self.roles is not None:
1505 roles = [role_obj.to_dict() for role_obj in self.roles.values()]
1506 res_dict["roles"] = roles
1507 elif self.succinct_roles is not None:
1508 res_dict["succinct_roles"] = self.succinct_roles.to_dict()
1510 return res_dict
1512 def get_roles_for_target(
1513 self, target_filepath: str
1514 ) -> Iterator[tuple[str, bool]]:
1515 """Given ``target_filepath`` get names and terminating status of all
1516 delegated roles who are responsible for it.
1518 Args:
1519 target_filepath: URL path to a target file, relative to a base
1520 targets URL.
1521 """
1522 if self.roles is not None:
1523 for role in self.roles.values():
1524 if role.is_delegated_path(target_filepath):
1525 yield role.name, role.terminating
1527 elif self.succinct_roles is not None:
1528 # We consider all succinct_roles as terminating.
1529 # For more information read TAP 15.
1530 yield self.succinct_roles.get_role_for_target(target_filepath), True
1533class TargetFile(BaseFile):
1534 """A container with information about a particular target file.
1536 *All parameters named below are not just constructor arguments but also
1537 instance attributes.*
1539 Args:
1540 length: Length of the target file in bytes.
1541 hashes: Dictionary of hash algorithm names to hashes of the target
1542 file content.
1543 path: URL path to a target file, relative to a base targets URL.
1544 unrecognized_fields: Dictionary of all attributes that are not managed
1545 by TUF Metadata API
1547 Raises:
1548 ValueError, TypeError: Invalid arguments.
1549 """
1551 def __init__(
1552 self,
1553 length: int,
1554 hashes: dict[str, str],
1555 path: str,
1556 unrecognized_fields: dict[str, Any] | None = None,
1557 ):
1558 self._validate_length(length)
1559 self._validate_hashes(hashes)
1561 self.length = length
1562 self.hashes = hashes
1563 self.path = path
1564 if unrecognized_fields is None:
1565 unrecognized_fields = {}
1567 self.unrecognized_fields = unrecognized_fields
1569 @property
1570 def custom(self) -> Any: # noqa: ANN401
1571 """Get implementation specific data related to the target.
1573 python-tuf does not use or validate this data.
1574 """
1575 return self.unrecognized_fields.get("custom")
1577 def __eq__(self, other: object) -> bool:
1578 if not isinstance(other, TargetFile):
1579 return False
1581 return (
1582 self.length == other.length
1583 and self.hashes == other.hashes
1584 and self.path == other.path
1585 and self.unrecognized_fields == other.unrecognized_fields
1586 )
1588 def __hash__(self) -> int:
1589 return hash(
1590 (self.length, self.hashes, self.path, self.unrecognized_fields)
1591 )
1593 @classmethod
1594 def from_dict(cls, target_dict: dict[str, Any], path: str) -> TargetFile:
1595 """Create ``TargetFile`` object from its json/dict representation.
1597 Raises:
1598 ValueError, KeyError, TypeError: Invalid arguments.
1599 """
1600 length = target_dict.pop("length")
1601 hashes = target_dict.pop("hashes")
1603 # All fields left in the target_dict are unrecognized.
1604 return cls(length, hashes, path, target_dict)
1606 def to_dict(self) -> dict[str, Any]:
1607 """Return the JSON-serializable dictionary representation of self."""
1608 return {
1609 "length": self.length,
1610 "hashes": self.hashes,
1611 **self.unrecognized_fields,
1612 }
1614 @classmethod
1615 def from_file(
1616 cls,
1617 target_file_path: str,
1618 local_path: str,
1619 hash_algorithms: list[str] | None = None,
1620 ) -> TargetFile:
1621 """Create ``TargetFile`` object from a file.
1623 Args:
1624 target_file_path: URL path to a target file, relative to a base
1625 targets URL.
1626 local_path: Local path to target file content.
1627 hash_algorithms: Hash algorithms to calculate hashes with. If not
1628 specified, "sha256" is used.
1630 Raises:
1631 FileNotFoundError: The file doesn't exist.
1632 ValueError: The hash algorithms list contains an unsupported
1633 algorithm.
1634 """
1635 with open(local_path, "rb") as file:
1636 return cls.from_data(target_file_path, file, hash_algorithms)
1638 @classmethod
1639 def from_data(
1640 cls,
1641 target_file_path: str,
1642 data: bytes | IO[bytes],
1643 hash_algorithms: list[str] | None = None,
1644 ) -> TargetFile:
1645 """Create ``TargetFile`` object from bytes.
1647 Args:
1648 target_file_path: URL path to a target file, relative to a base
1649 targets URL.
1650 data: Target file content.
1651 hash_algorithms: Hash algorithms to create the hashes with. If not
1652 specified, "sha256" is used.
1654 Raises:
1655 ValueError: The hash algorithms list contains an unsupported
1656 algorithm.
1657 """
1658 length, hashes = cls._get_length_and_hashes(data, hash_algorithms)
1659 return cls(length, hashes, target_file_path)
1661 def verify_length_and_hashes(self, data: bytes | IO[bytes]) -> None:
1662 """Verify that length and hashes of ``data`` match expected values.
1664 Args:
1665 data: Target file object or its content in bytes.
1667 Raises:
1668 LengthOrHashMismatchError: Calculated length or hashes do not
1669 match expected values or hash algorithm is not supported.
1670 """
1671 self._verify_length(data, self.length)
1672 self._verify_hashes(data, self.hashes)
1674 def get_prefixed_paths(self) -> list[str]:
1675 """
1676 Return hash-prefixed URL path fragments for the target file path.
1677 """
1678 paths = []
1679 parent, sep, name = self.path.rpartition("/")
1680 for hash_value in self.hashes.values():
1681 paths.append(f"{parent}{sep}{hash_value}.{name}")
1683 return paths
1686class Targets(Signed, _DelegatorMixin):
1687 """A container for the signed part of targets metadata.
1689 Targets contains verifying information about target files and also
1690 delegates responsibility to other Targets roles.
1692 *All parameters named below are not just constructor arguments but also
1693 instance attributes.*
1695 Args:
1696 version: Metadata version number. Default is 1.
1697 spec_version: Supported TUF specification version. Default is the
1698 version currently supported by the library.
1699 expires: Metadata expiry date. Default is current date and time.
1700 targets: Dictionary of target filenames to TargetFiles. Default is an
1701 empty dictionary.
1702 delegations: Defines how this Targets delegates responsibility to other
1703 Targets Metadata files. Default is None.
1704 unrecognized_fields: Dictionary of all attributes that are not managed
1705 by TUF Metadata API
1707 Raises:
1708 ValueError: Invalid arguments.
1709 """
1711 type = _TARGETS
1713 def __init__(
1714 self,
1715 version: int | None = None,
1716 spec_version: str | None = None,
1717 expires: datetime | None = None,
1718 targets: dict[str, TargetFile] | None = None,
1719 delegations: Delegations | None = None,
1720 unrecognized_fields: dict[str, Any] | None = None,
1721 ) -> None:
1722 super().__init__(version, spec_version, expires, unrecognized_fields)
1723 self.targets = targets if targets is not None else {}
1724 self.delegations = delegations
1726 def __eq__(self, other: object) -> bool:
1727 if not isinstance(other, Targets):
1728 return False
1730 return (
1731 super().__eq__(other)
1732 and self.targets == other.targets
1733 and self.delegations == other.delegations
1734 )
1736 def __hash__(self) -> int:
1737 return hash((super().__hash__(), self.targets, self.delegations))
1739 @classmethod
1740 def from_dict(cls, signed_dict: dict[str, Any]) -> Targets:
1741 """Create ``Targets`` object from its json/dict representation.
1743 Raises:
1744 ValueError, KeyError, TypeError: Invalid arguments.
1745 """
1746 common_args = cls._common_fields_from_dict(signed_dict)
1747 targets = signed_dict.pop(_TARGETS)
1748 try:
1749 delegations_dict = signed_dict.pop("delegations")
1750 except KeyError:
1751 delegations = None
1752 else:
1753 delegations = Delegations.from_dict(delegations_dict)
1754 res_targets = {}
1755 for target_path, target_info in targets.items():
1756 res_targets[target_path] = TargetFile.from_dict(
1757 target_info, target_path
1758 )
1759 # All fields left in the targets_dict are unrecognized.
1760 return cls(*common_args, res_targets, delegations, signed_dict)
1762 def to_dict(self) -> dict[str, Any]:
1763 """Return the dict representation of self."""
1764 targets_dict = self._common_fields_to_dict()
1765 targets = {}
1766 for target_path, target_file_obj in self.targets.items():
1767 targets[target_path] = target_file_obj.to_dict()
1768 targets_dict[_TARGETS] = targets
1769 if self.delegations is not None:
1770 targets_dict["delegations"] = self.delegations.to_dict()
1771 return targets_dict
1773 def add_key(self, key: Key, role: str | None = None) -> None:
1774 """Add new signing key for delegated role ``role``.
1776 If succinct_roles is used then the ``role`` argument is not required.
1778 Args:
1779 key: Signing key to be added for ``role``.
1780 role: Name of the role, for which ``key`` is added.
1782 Raises:
1783 ValueError: If the argument order is wrong or if there are no
1784 delegated roles or if ``role`` is not delegated by this Target.
1785 """
1786 # Verify that our users are not using the old argument order.
1787 if isinstance(role, Key):
1788 raise ValueError("Role must be a string, not a Key instance")
1790 if self.delegations is None:
1791 raise ValueError(f"Delegated role {role} doesn't exist")
1793 if self.delegations.roles is not None:
1794 if role not in self.delegations.roles:
1795 raise ValueError(f"Delegated role {role} doesn't exist")
1796 if key.keyid not in self.delegations.roles[role].keyids:
1797 self.delegations.roles[role].keyids.append(key.keyid)
1799 elif self.delegations.succinct_roles is not None:
1800 if key.keyid not in self.delegations.succinct_roles.keyids:
1801 self.delegations.succinct_roles.keyids.append(key.keyid)
1803 self.delegations.keys[key.keyid] = key
1805 def revoke_key(self, keyid: str, role: str | None = None) -> None:
1806 """Revokes key from delegated role ``role`` and updates the delegations
1807 key store.
1809 If succinct_roles is used then the ``role`` argument is not required.
1811 Args:
1812 keyid: Identifier of the key to be removed for ``role``.
1813 role: Name of the role, for which a signing key is removed.
1815 Raises:
1816 ValueError: If there are no delegated roles or if ``role`` is not
1817 delegated by this ``Target`` or if key is not used by ``role``
1818 or if key with id ``keyid`` is not used by succinct roles.
1819 """
1820 if self.delegations is None:
1821 raise ValueError(f"Delegated role {role} doesn't exist")
1823 if self.delegations.roles is not None:
1824 if role not in self.delegations.roles:
1825 raise ValueError(f"Delegated role {role} doesn't exist")
1826 if keyid not in self.delegations.roles[role].keyids:
1827 raise ValueError(f"Key with id {keyid} is not used by {role}")
1829 self.delegations.roles[role].keyids.remove(keyid)
1830 for keyinfo in self.delegations.roles.values():
1831 if keyid in keyinfo.keyids:
1832 return
1834 elif self.delegations.succinct_roles is not None:
1835 if keyid not in self.delegations.succinct_roles.keyids:
1836 raise ValueError(
1837 f"Key with id {keyid} is not used by succinct_roles"
1838 )
1840 self.delegations.succinct_roles.keyids.remove(keyid)
1842 del self.delegations.keys[keyid]
1844 def get_delegated_role(self, delegated_role: str) -> Role:
1845 """Return the role object for the given delegated role.
1847 Raises ValueError if delegated_role is not actually delegated.
1848 """
1849 if self.delegations is None:
1850 raise ValueError("No delegations found")
1852 role: Role | None = None
1853 if self.delegations.roles is not None:
1854 role = self.delegations.roles.get(delegated_role)
1855 elif self.delegations.succinct_roles is not None:
1856 succinct = self.delegations.succinct_roles
1857 if succinct.is_delegated_role(delegated_role):
1858 role = succinct
1860 if not role:
1861 raise ValueError(f"Delegated role {delegated_role} not found")
1863 return role
1865 def get_key(self, keyid: str) -> Key:
1866 if self.delegations is None:
1867 raise ValueError("No delegations found")
1868 if keyid not in self.delegations.keys:
1869 raise ValueError(f"Key {keyid} not found")
1871 return self.delegations.keys[keyid]