1"""Dead Simple Signing Envelope"""
2
3from __future__ import annotations
4
5import logging
6from typing import Any
7
8from securesystemslib import exceptions
9from securesystemslib._internal.utils import b64dec, b64enc
10from securesystemslib.signer import Key, Signature, Signer
11
12logger = logging.getLogger(__name__)
13
14
15class Envelope:
16 """DSSE Envelope to provide interface for signing arbitrary data.
17
18 Attributes:
19 payload: Arbitrary byte sequence of serialized body.
20 payload_type: string that identifies how to interpret payload.
21 signatures: dict of Signature key id and Signatures.
22
23 """
24
25 def __init__(
26 self,
27 payload: bytes,
28 payload_type: str,
29 signatures: dict[str, Signature],
30 ):
31 self.payload = payload
32 self.payload_type = payload_type
33 self.signatures = signatures
34
35 def __eq__(self, other: Any) -> bool:
36 if not isinstance(other, Envelope):
37 return False
38
39 return (
40 self.payload == other.payload
41 and self.payload_type == other.payload_type
42 and self.signatures == other.signatures
43 )
44
45 @classmethod
46 def from_dict(cls, data: dict) -> Envelope:
47 """Creates a DSSE Envelope from its JSON/dict representation.
48
49 Arguments:
50 data: A dict containing a valid payload, payloadType and signatures
51
52 Raises:
53 KeyError: If any of the "payload", "payloadType" and "signatures"
54 fields are missing from the "data".
55
56 FormatError: If signature in "signatures" is incorrect.
57
58 Returns:
59 A "Envelope" instance.
60 """
61
62 payload = b64dec(data["payload"])
63 payload_type = data["payloadType"]
64
65 signatures = {}
66 for signature in data["signatures"]:
67 signature["sig"] = b64dec(signature["sig"]).hex()
68 signature = Signature.from_dict(signature) # noqa: PLW2901
69 if signature.keyid in signatures:
70 raise ValueError(
71 f"Multiple signatures found for keyid {signature.keyid}"
72 )
73 signatures[signature.keyid] = signature
74
75 return cls(payload, payload_type, signatures)
76
77 def to_dict(self) -> dict:
78 """Returns the JSON-serializable dictionary representation of self."""
79
80 signatures = []
81 for signature in self.signatures.values():
82 sig_dict = signature.to_dict()
83 sig_dict["sig"] = b64enc(bytes.fromhex(sig_dict["sig"]))
84 signatures.append(sig_dict)
85
86 return {
87 "payload": b64enc(self.payload),
88 "payloadType": self.payload_type,
89 "signatures": signatures,
90 }
91
92 def pae(self) -> bytes:
93 """Pre-Auth-Encoding byte sequence of self."""
94
95 return b"DSSEv1 %d %b %d %b" % (
96 len(self.payload_type),
97 self.payload_type.encode("utf-8"),
98 len(self.payload),
99 self.payload,
100 )
101
102 def sign(self, signer: Signer) -> Signature:
103 """Sign the payload and create the signature.
104
105 Arguments:
106 signer: A "Signer" class instance.
107
108 Returns:
109 A "Signature" instance.
110 """
111
112 signature = signer.sign(self.pae())
113 self.signatures[signature.keyid] = signature
114
115 return signature
116
117 def verify(self, keys: list[Key], threshold: int) -> dict[str, Key]:
118 """Verify the payload with the provided Keys.
119
120 Arguments:
121 keys: A list of public keys to verify the signatures.
122 threshold: Number of signatures needed to pass the verification.
123
124 Raises:
125 ValueError: If "threshold" is not valid.
126 VerificationError: If the enclosed signatures do not pass the
127 verification.
128
129 Note:
130 Mandating keyid in signatures and matching them with keyid of Key
131 in order to consider them for verification, is not DSSE spec
132 compliant (Issue #416).
133
134 Returns:
135 A dict of the threshold of unique public keys that verified a
136 signature.
137 """
138
139 accepted_keys = {}
140 pae = self.pae()
141
142 # checks for threshold value.
143 if threshold <= 0:
144 raise ValueError("Threshold must be greater than 0")
145
146 if len(keys) < threshold:
147 raise ValueError("Number of keys can't be less than threshold")
148
149 for signature in self.signatures.values():
150 for key in keys:
151 # If Signature keyid doesn't match with Key, skip.
152 if not key.keyid == signature.keyid:
153 continue
154
155 # If a key verifies the signature, we exit and use the result.
156 try:
157 key.verify_signature(signature, pae)
158 accepted_keys[key.keyid] = key
159 break
160 except exceptions.UnverifiedSignatureError:
161 continue
162
163 # Break, if amount of accepted_keys are more than threshold.
164 if len(accepted_keys) >= threshold:
165 break
166
167 if threshold > len(accepted_keys):
168 raise exceptions.VerificationError(
169 "Accepted signatures do not match threshold,"
170 f" Found: {len(accepted_keys)}, Expected {threshold}"
171 )
172
173 return accepted_keys