1# Wrapper class for in-toto attestation Statement protos.
2
3import in_toto_attestation.v1.statement_pb2 as spb
4from in_toto_attestation.v1.resource_descriptor import ResourceDescriptor
5
6STATEMENT_TYPE_URI = "https://in-toto.io/Statement/v1"
7
8
9class Statement:
10 def __init__(self, subjects: list, predicate_type: str, predicate: dict) -> None:
11 self.pb = spb.Statement() # type: ignore[attr-defined]
12 self.pb.type = STATEMENT_TYPE_URI
13 self.pb.subject.extend(subjects)
14 self.pb.predicate_type = predicate_type
15 self.pb.predicate.update(predicate)
16
17 @staticmethod
18 def copy_from_pb(proto: spb.Statement) -> "Statement": # type: ignore[name-defined]
19 stmt = Statement([], "", {})
20 stmt.pb.CopyFrom(proto)
21 return stmt
22
23 def validate(self) -> None:
24 if self.pb.type != STATEMENT_TYPE_URI:
25 raise ValueError("Wrong statement type")
26
27 if len(self.pb.subject) == 0:
28 raise ValueError("At least one subject required")
29
30 # check all resource descriptors in the subject
31 subject = self.pb.subject
32 for i, rdpb in enumerate(subject):
33 rd = ResourceDescriptor.copy_from_pb(rdpb)
34 rd.validate()
35
36 # v1 statements require the digest to be set in the subject
37 if len(rd.pb.digest) == 0:
38 # return index in the subjects list in case of failure:
39 # can't assume any other fields in subject are set
40 raise ValueError("At least one digest required (subject {0})".format(i))
41
42 if self.pb.predicate_type == "":
43 raise ValueError("Predicate type required")
44
45 if len(self.pb.predicate) == 0:
46 raise ValueError("Predicate object required")