Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tuf/api/metadata.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

95 statements  

1# Copyright New York University and the TUF contributors 

2# SPDX-License-Identifier: MIT OR Apache-2.0 

3 

4"""The low-level Metadata API. 

5 

6The low-level Metadata API in ``tuf.api.metadata`` module contains: 

7 

8* Safe de/serialization of metadata to and from files. 

9* Access to and modification of signed metadata content. 

10* Signing metadata and verifying signatures. 

11 

12Metadata API implements functionality at the metadata file level, it does 

13not provide TUF repository or client functionality on its own (but can be used 

14to implement them). 

15 

16The API design is based on the file format defined in the `TUF specification 

17<https://theupdateframework.github.io/specification/latest/>`_ and the object 

18attributes generally follow the JSON format used in the specification. 

19 

20The above principle means that a ``Metadata`` object represents a single 

21metadata file, and has a ``signed`` attribute that is an instance of one of the 

22four top level signed classes (``Root``, ``Timestamp``, ``Snapshot`` and 

23``Targets``). To make Python type annotations useful ``Metadata`` can be 

24type constrained: e.g. the signed attribute of ``Metadata[Root]`` 

25is known to be ``Root``. 

26 

27Currently Metadata API supports JSON as the file format. 

28 

29A basic example of repository implementation using the Metadata is available in 

30`examples/repository <https://github.com/theupdateframework/python-tuf/tree/develop/examples/repository>`_. 

31""" 

32 

33from __future__ import annotations 

34 

35import logging 

36import tempfile 

37from typing import TYPE_CHECKING, Any, Generic, cast 

38 

39from securesystemslib.signer import Signature, Signer 

40from securesystemslib.storage import FilesystemBackend, StorageBackendInterface 

41 

42# Expose payload classes via ``tuf.api.metadata`` to maintain the API, 

43# even if they are unused in the local scope. 

44from tuf.api._payload import ( # noqa: F401 

45 _ROOT, 

46 _SNAPSHOT, 

47 _TARGETS, 

48 _TIMESTAMP, 

49 SPECIFICATION_VERSION, 

50 TOP_LEVEL_ROLE_NAMES, 

51 BaseFile, 

52 DelegatedRole, 

53 Delegations, 

54 Key, 

55 LengthOrHashMismatchError, 

56 MetaFile, 

57 Role, 

58 Root, 

59 RootVerificationResult, 

60 Signed, 

61 Snapshot, 

62 SuccinctRoles, 

63 T, 

64 TargetFile, 

65 Targets, 

66 Timestamp, 

67 VerificationResult, 

68) 

69from tuf.api.exceptions import UnsignedMetadataError 

70 

71if TYPE_CHECKING: 

72 from tuf.api.serialization import ( 

73 MetadataDeserializer, 

74 MetadataSerializer, 

75 SignedSerializer, 

76 ) 

77 

78logger = logging.getLogger(__name__) 

79 

80 

81class Metadata(Generic[T]): 

82 """A container for signed TUF metadata. 

83 

84 Provides methods to convert to and from dictionary, read and write to and 

85 from file and to create and verify metadata signatures. 

86 

87 ``Metadata[T]`` is a generic container type where T can be any one type of 

88 [``Root``, ``Timestamp``, ``Snapshot``, ``Targets``]. The purpose of this 

89 is to allow static type checking of the signed attribute in code using 

90 Metadata:: 

91 

92 root_md = Metadata[Root].from_file("root.json") 

93 # root_md type is now Metadata[Root]. This means signed and its 

94 # attributes like consistent_snapshot are now statically typed and the 

95 # types can be verified by static type checkers and shown by IDEs 

96 print(root_md.signed.consistent_snapshot) 

97 

98 Using a type constraint is not required but not doing so means T is not a 

99 specific type so static typing cannot happen. Note that the type constraint 

100 ``[Root]`` is not validated at runtime (as pure annotations are not 

101 available then). 

102 

103 New Metadata instances can be created from scratch with:: 

104 

105 one_day = datetime.now(timezone.utc) + timedelta(days=1) 

106 timestamp = Metadata(Timestamp(expires=one_day)) 

107 

108 Apart from ``expires`` all of the arguments to the inner constructors have 

109 reasonable default values for new metadata. 

110 

111 *All parameters named below are not just constructor arguments but also 

112 instance attributes.* 

113 

114 Args: 

115 signed: Actual metadata payload, i.e. one of ``Targets``, 

116 ``Snapshot``, ``Timestamp`` or ``Root``. 

117 signatures: Ordered dictionary of keyids to ``Signature`` objects, each 

118 signing the canonical serialized representation of ``signed``. 

119 Default is an empty dictionary. 

120 unrecognized_fields: Dictionary of all attributes that are not managed 

121 by TUF Metadata API. These fields are NOT signed and it's preferable 

122 if unrecognized fields are added to the Signed derivative classes. 

123 """ 

124 

125 def __init__( 

126 self, 

127 signed: T, 

128 signatures: dict[str, Signature] | None = None, 

129 unrecognized_fields: dict[str, Any] | None = None, 

130 ): 

131 self.signed: T = signed 

132 self.signatures = signatures if signatures is not None else {} 

133 if unrecognized_fields is None: 

134 unrecognized_fields = {} 

135 

136 self.unrecognized_fields = unrecognized_fields 

137 

138 def __eq__(self, other: object) -> bool: 

139 if not isinstance(other, Metadata): 

140 return False 

141 

142 return ( 

143 self.signatures == other.signatures 

144 # Order of the signatures matters (see issue #1788). 

145 and list(self.signatures.items()) == list(other.signatures.items()) 

146 and self.signed == other.signed 

147 and self.unrecognized_fields == other.unrecognized_fields 

148 ) 

149 

150 def __hash__(self) -> int: 

151 return hash((self.signatures, self.signed, self.unrecognized_fields)) 

152 

153 @property 

154 def signed_bytes(self) -> bytes: 

155 """Default canonical json byte representation of ``self.signed``.""" 

156 

157 # Use local scope import to avoid circular import errors 

158 from tuf.api.serialization.json import CanonicalJSONSerializer # noqa: I001, PLC0415 

159 

160 return CanonicalJSONSerializer().serialize(self.signed) 

161 

162 @classmethod 

163 def from_dict(cls, metadata: dict[str, Any]) -> Metadata[T]: 

164 """Create ``Metadata`` object from its json/dict representation. 

165 

166 Args: 

167 metadata: TUF metadata in dict representation. 

168 

169 Raises: 

170 ValueError, KeyError, TypeError: Invalid arguments. 

171 

172 Side Effect: 

173 Destroys the metadata dict passed by reference. 

174 

175 Returns: 

176 TUF ``Metadata`` object. 

177 """ 

178 

179 # Dispatch to contained metadata class on metadata _type field. 

180 _type = metadata["signed"]["_type"] 

181 

182 if _type == _TARGETS: 

183 inner_cls: type[Signed] = Targets 

184 elif _type == _SNAPSHOT: 

185 inner_cls = Snapshot 

186 elif _type == _TIMESTAMP: 

187 inner_cls = Timestamp 

188 elif _type == _ROOT: 

189 inner_cls = Root 

190 else: 

191 raise ValueError(f'unrecognized metadata type "{_type}"') 

192 

193 # Make sure signatures are unique 

194 signatures: dict[str, Signature] = {} 

195 for sig_dict in metadata.pop("signatures"): 

196 sig = Signature.from_dict(sig_dict) 

197 if sig.keyid in signatures: 

198 raise ValueError( 

199 f"Multiple signatures found for keyid {sig.keyid}" 

200 ) 

201 signatures[sig.keyid] = sig 

202 

203 return cls( 

204 # Specific type T is not known at static type check time: use cast 

205 signed=cast("T", inner_cls.from_dict(metadata.pop("signed"))), 

206 signatures=signatures, 

207 # All fields left in the metadata dict are unrecognized. 

208 unrecognized_fields=metadata, 

209 ) 

210 

211 @classmethod 

212 def from_file( 

213 cls, 

214 filename: str, 

215 deserializer: MetadataDeserializer | None = None, 

216 storage_backend: StorageBackendInterface | None = None, 

217 ) -> Metadata[T]: 

218 """Load TUF metadata from file storage. 

219 

220 Args: 

221 filename: Path to read the file from. 

222 deserializer: ``MetadataDeserializer`` subclass instance that 

223 implements the desired wireline format deserialization. Per 

224 default a ``JSONDeserializer`` is used. 

225 storage_backend: Object that implements 

226 ``securesystemslib.storage.StorageBackendInterface``. 

227 Default is ``FilesystemBackend`` (i.e. a local file). 

228 

229 Raises: 

230 StorageError: The file cannot be read. 

231 tuf.api.serialization.DeserializationError: 

232 The file cannot be deserialized. 

233 

234 Returns: 

235 TUF ``Metadata`` object. 

236 """ 

237 

238 if storage_backend is None: 

239 storage_backend = FilesystemBackend() 

240 

241 with storage_backend.get(filename) as file_obj: 

242 return cls.from_bytes(file_obj.read(), deserializer) 

243 

244 @classmethod 

245 def from_bytes( 

246 cls, 

247 data: bytes, 

248 deserializer: MetadataDeserializer | None = None, 

249 ) -> Metadata[T]: 

250 """Load TUF metadata from raw data. 

251 

252 Args: 

253 data: Metadata content. 

254 deserializer: ``MetadataDeserializer`` implementation to use. 

255 Default is ``JSONDeserializer``. 

256 

257 Raises: 

258 tuf.api.serialization.DeserializationError: 

259 The file cannot be deserialized. 

260 

261 Returns: 

262 TUF ``Metadata`` object. 

263 """ 

264 

265 if deserializer is None: 

266 # Use local scope import to avoid circular import errors 

267 from tuf.api.serialization.json import JSONDeserializer # noqa: I001, PLC0415 

268 

269 deserializer = JSONDeserializer() 

270 

271 return deserializer.deserialize(data) 

272 

273 def to_bytes(self, serializer: MetadataSerializer | None = None) -> bytes: 

274 """Return the serialized TUF file format as bytes. 

275 

276 Note that if bytes are first deserialized into ``Metadata`` and then 

277 serialized with ``to_bytes()``, the two are not required to be 

278 identical even though the signatures are guaranteed to stay valid. If 

279 byte-for-byte equivalence is required (which is the case when content 

280 hashes are used in other metadata), the original content should be used 

281 instead of re-serializing. 

282 

283 Args: 

284 serializer: ``MetadataSerializer`` instance that implements the 

285 desired serialization format. Default is ``JSONSerializer``. 

286 

287 Raises: 

288 tuf.api.serialization.SerializationError: 

289 The metadata object cannot be serialized. 

290 """ 

291 

292 if serializer is None: 

293 # Use local scope import to avoid circular import errors 

294 from tuf.api.serialization.json import JSONSerializer # noqa: I001, PLC0415 

295 

296 serializer = JSONSerializer(compact=True) 

297 

298 return serializer.serialize(self) 

299 

300 def to_dict(self) -> dict[str, Any]: 

301 """Return the dict representation of self.""" 

302 

303 signatures = [sig.to_dict() for sig in self.signatures.values()] 

304 

305 return { 

306 "signatures": signatures, 

307 "signed": self.signed.to_dict(), 

308 **self.unrecognized_fields, 

309 } 

310 

311 def to_file( 

312 self, 

313 filename: str, 

314 serializer: MetadataSerializer | None = None, 

315 storage_backend: StorageBackendInterface | None = None, 

316 ) -> None: 

317 """Write TUF metadata to file storage. 

318 

319 Note that if a file is first deserialized into ``Metadata`` and then 

320 serialized with ``to_file()``, the two files are not required to be 

321 identical even though the signatures are guaranteed to stay valid. If 

322 byte-for-byte equivalence is required (which is the case when file 

323 hashes are used in other metadata), the original file should be used 

324 instead of re-serializing. 

325 

326 Args: 

327 filename: Path to write the file to. 

328 serializer: ``MetadataSerializer`` instance that implements the 

329 desired serialization format. Default is ``JSONSerializer``. 

330 storage_backend: ``StorageBackendInterface`` implementation. Default 

331 is ``FilesystemBackend`` (i.e. a local file). 

332 

333 Raises: 

334 tuf.api.serialization.SerializationError: 

335 The metadata object cannot be serialized. 

336 StorageError: The file cannot be written. 

337 """ 

338 

339 if storage_backend is None: 

340 storage_backend = FilesystemBackend() 

341 

342 bytes_data = self.to_bytes(serializer) 

343 

344 with tempfile.TemporaryFile() as temp_file: 

345 temp_file.write(bytes_data) 

346 storage_backend.put(temp_file, filename) 

347 

348 # Signatures. 

349 def sign( 

350 self, 

351 signer: Signer, 

352 append: bool = False, 

353 signed_serializer: SignedSerializer | None = None, 

354 ) -> Signature: 

355 """Create signature over ``signed`` and assigns it to ``signatures``. 

356 

357 Args: 

358 signer: A ``securesystemslib.signer.Signer`` object that provides a 

359 signing implementation to generate the signature. 

360 append: ``True`` if the signature should be appended to 

361 the list of signatures or replace any existing signatures. The 

362 default behavior is to replace signatures. 

363 signed_serializer: ``SignedSerializer`` that implements the desired 

364 serialization format. Default is ``CanonicalJSONSerializer``. 

365 

366 Raises: 

367 tuf.api.serialization.SerializationError: 

368 ``signed`` cannot be serialized. 

369 UnsignedMetadataError: Signing errors. 

370 

371 Returns: 

372 ``securesystemslib.signer.Signature`` object that was added into 

373 signatures. 

374 """ 

375 

376 if signed_serializer is None: 

377 bytes_data = self.signed_bytes 

378 else: 

379 bytes_data = signed_serializer.serialize(self.signed) 

380 

381 try: 

382 signature = signer.sign(bytes_data) 

383 except Exception as e: 

384 raise UnsignedMetadataError(f"Failed to sign: {e}") from e 

385 

386 if not append: 

387 self.signatures.clear() 

388 

389 self.signatures[signature.keyid] = signature 

390 

391 return signature 

392 

393 def verify_delegate( 

394 self, 

395 delegated_role: str, 

396 delegated_metadata: Metadata, 

397 signed_serializer: SignedSerializer | None = None, 

398 ) -> None: 

399 """Verify that ``delegated_metadata`` is signed with the required 

400 threshold of keys for ``delegated_role``. 

401 

402 .. deprecated:: 3.1.0 

403 Please use ``Root.verify_delegate()`` or 

404 ``Targets.verify_delegate()``. 

405 """ 

406 

407 if self.signed.type not in ["root", "targets"]: 

408 raise TypeError("Call is valid only on delegator metadata") 

409 

410 if signed_serializer is None: 

411 payload = delegated_metadata.signed_bytes 

412 

413 else: 

414 payload = signed_serializer.serialize(delegated_metadata.signed) 

415 

416 self.signed.verify_delegate( 

417 delegated_role, payload, delegated_metadata.signatures 

418 )