1"""Dead Simple Signing Envelope"""
2
3from __future__ import annotations
4
5import logging
6from typing import Any
7
8from securesystemslib import exceptions
9from securesystemslib._internal.utils import b64dec, b64enc
10from securesystemslib.signer import Key, Signature, Signer
11
12logger = logging.getLogger(__name__)
13
14
15class Envelope:
16 """DSSE Envelope to provide interface for signing arbitrary data.
17
18 Attributes:
19 payload: Arbitrary byte sequence of serialized body.
20 payload_type: string that identifies how to interpret payload.
21 signatures: dict of Signature key id and Signatures.
22
23 """
24
25 def __init__(
26 self,
27 payload: bytes,
28 payload_type: str,
29 signatures: dict[str, Signature],
30 ):
31 self.payload = payload
32 self.payload_type = payload_type
33 self.signatures = signatures
34
35 def __eq__(self, other: Any) -> bool:
36 if not isinstance(other, Envelope):
37 return False
38
39 return (
40 self.payload == other.payload
41 and self.payload_type == other.payload_type
42 and self.signatures == other.signatures
43 )
44
45 def __hash__(self) -> int:
46 return hash((self.payload, self.payload_type, self.signatures))
47
48 @classmethod
49 def from_dict(cls, data: dict) -> Envelope:
50 """Creates a DSSE Envelope from its JSON/dict representation.
51
52 Arguments:
53 data: A dict containing a valid payload, payloadType and signatures
54
55 Raises:
56 KeyError: If any of the "payload", "payloadType" and "signatures"
57 fields are missing from the "data".
58
59 FormatError: If signature in "signatures" is incorrect.
60
61 Returns:
62 A "Envelope" instance.
63 """
64
65 payload = b64dec(data["payload"])
66 payload_type = data["payloadType"]
67
68 signatures = {}
69 for signature in data["signatures"]:
70 signature["sig"] = b64dec(signature["sig"]).hex()
71 signature = Signature.from_dict(signature) # noqa: PLW2901
72 if signature.keyid in signatures:
73 raise ValueError(
74 f"Multiple signatures found for keyid {signature.keyid}"
75 )
76 signatures[signature.keyid] = signature
77
78 return cls(payload, payload_type, signatures)
79
80 def to_dict(self) -> dict:
81 """Returns the JSON-serializable dictionary representation of self."""
82
83 signatures = []
84 for signature in self.signatures.values():
85 sig_dict = signature.to_dict()
86 sig_dict["sig"] = b64enc(bytes.fromhex(sig_dict["sig"]))
87 signatures.append(sig_dict)
88
89 return {
90 "payload": b64enc(self.payload),
91 "payloadType": self.payload_type,
92 "signatures": signatures,
93 }
94
95 def pae(self) -> bytes:
96 """Pre-Auth-Encoding byte sequence of self."""
97
98 return b"DSSEv1 %d %b %d %b" % (
99 len(self.payload_type),
100 self.payload_type.encode("utf-8"),
101 len(self.payload),
102 self.payload,
103 )
104
105 def sign(self, signer: Signer) -> Signature:
106 """Sign the payload and create the signature.
107
108 Arguments:
109 signer: A "Signer" class instance.
110
111 Returns:
112 A "Signature" instance.
113 """
114
115 signature = signer.sign(self.pae())
116 self.signatures[signature.keyid] = signature
117
118 return signature
119
120 def verify(self, keys: list[Key], threshold: int) -> dict[str, Key]:
121 """Verify the payload with the provided Keys.
122
123 Arguments:
124 keys: A list of public keys to verify the signatures.
125 threshold: Number of signatures needed to pass the verification.
126
127 Raises:
128 ValueError: If "threshold" is not valid.
129 VerificationError: If the enclosed signatures do not pass the
130 verification.
131
132 Note:
133 Mandating keyid in signatures and matching them with keyid of Key
134 in order to consider them for verification, is not DSSE spec
135 compliant (Issue #416).
136
137 Returns:
138 A dict of the threshold of unique public keys that verified a
139 signature.
140 """
141
142 accepted_keys = {}
143 pae = self.pae()
144
145 # checks for threshold value.
146 if threshold <= 0:
147 raise ValueError("Threshold must be greater than 0")
148
149 if len(keys) < threshold:
150 raise ValueError("Number of keys can't be less than threshold")
151
152 for signature in self.signatures.values():
153 for key in keys:
154 # If Signature keyid doesn't match with Key, skip.
155 if not key.keyid == signature.keyid:
156 continue
157
158 # If a key verifies the signature, we exit and use the result.
159 try:
160 key.verify_signature(signature, pae)
161 accepted_keys[key.keyid] = key
162 break
163 except exceptions.UnverifiedSignatureError:
164 continue
165
166 # Break, if amount of accepted_keys are more than threshold.
167 if len(accepted_keys) >= threshold:
168 break
169
170 if threshold > len(accepted_keys):
171 raise exceptions.VerificationError(
172 "Accepted signatures do not match threshold,"
173 f" Found: {len(accepted_keys)}, Expected {threshold}"
174 )
175
176 return accepted_keys