Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tuf/api/metadata.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

93 statements  

1# Copyright New York University and the TUF contributors 

2# SPDX-License-Identifier: MIT OR Apache-2.0 

3 

4"""The low-level Metadata API. 

5 

6The low-level Metadata API in ``tuf.api.metadata`` module contains: 

7 

8* Safe de/serialization of metadata to and from files. 

9* Access to and modification of signed metadata content. 

10* Signing metadata and verifying signatures. 

11 

12Metadata API implements functionality at the metadata file level, it does 

13not provide TUF repository or client functionality on its own (but can be used 

14to implement them). 

15 

16The API design is based on the file format defined in the `TUF specification 

17<https://theupdateframework.github.io/specification/latest/>`_ and the object 

18attributes generally follow the JSON format used in the specification. 

19 

20The above principle means that a ``Metadata`` object represents a single 

21metadata file, and has a ``signed`` attribute that is an instance of one of the 

22four top level signed classes (``Root``, ``Timestamp``, ``Snapshot`` and 

23``Targets``). To make Python type annotations useful ``Metadata`` can be 

24type constrained: e.g. the signed attribute of ``Metadata[Root]`` 

25is known to be ``Root``. 

26 

27Currently Metadata API supports JSON as the file format. 

28 

29A basic example of repository implementation using the Metadata is available in 

30`examples/repository <https://github.com/theupdateframework/python-tuf/tree/develop/examples/repository>`_. 

31""" 

32 

33from __future__ import annotations 

34 

35import logging 

36import tempfile 

37from typing import TYPE_CHECKING, Any, Generic, cast 

38 

39from securesystemslib.signer import Signature, Signer 

40from securesystemslib.storage import FilesystemBackend, StorageBackendInterface 

41 

42# Expose payload classes via ``tuf.api.metadata`` to maintain the API, 

43# even if they are unused in the local scope. 

44from tuf.api._payload import ( # noqa: F401 

45 _ROOT, 

46 _SNAPSHOT, 

47 _TARGETS, 

48 _TIMESTAMP, 

49 SPECIFICATION_VERSION, 

50 TOP_LEVEL_ROLE_NAMES, 

51 BaseFile, 

52 DelegatedRole, 

53 Delegations, 

54 Key, 

55 LengthOrHashMismatchError, 

56 MetaFile, 

57 Role, 

58 Root, 

59 RootVerificationResult, 

60 Signed, 

61 Snapshot, 

62 SuccinctRoles, 

63 T, 

64 TargetFile, 

65 Targets, 

66 Timestamp, 

67 VerificationResult, 

68) 

69from tuf.api.exceptions import UnsignedMetadataError 

70 

71if TYPE_CHECKING: 

72 from tuf.api.serialization import ( 

73 MetadataDeserializer, 

74 MetadataSerializer, 

75 SignedSerializer, 

76 ) 

77 

78logger = logging.getLogger(__name__) 

79 

80 

81class Metadata(Generic[T]): 

82 """A container for signed TUF metadata. 

83 

84 Provides methods to convert to and from dictionary, read and write to and 

85 from file and to create and verify metadata signatures. 

86 

87 ``Metadata[T]`` is a generic container type where T can be any one type of 

88 [``Root``, ``Timestamp``, ``Snapshot``, ``Targets``]. The purpose of this 

89 is to allow static type checking of the signed attribute in code using 

90 Metadata:: 

91 

92 root_md = Metadata[Root].from_file("root.json") 

93 # root_md type is now Metadata[Root]. This means signed and its 

94 # attributes like consistent_snapshot are now statically typed and the 

95 # types can be verified by static type checkers and shown by IDEs 

96 print(root_md.signed.consistent_snapshot) 

97 

98 Using a type constraint is not required but not doing so means T is not a 

99 specific type so static typing cannot happen. Note that the type constraint 

100 ``[Root]`` is not validated at runtime (as pure annotations are not 

101 available then). 

102 

103 New Metadata instances can be created from scratch with:: 

104 

105 one_day = datetime.now(timezone.utc) + timedelta(days=1) 

106 timestamp = Metadata(Timestamp(expires=one_day)) 

107 

108 Apart from ``expires`` all of the arguments to the inner constructors have 

109 reasonable default values for new metadata. 

110 

111 *All parameters named below are not just constructor arguments but also 

112 instance attributes.* 

113 

114 Args: 

115 signed: Actual metadata payload, i.e. one of ``Targets``, 

116 ``Snapshot``, ``Timestamp`` or ``Root``. 

117 signatures: Ordered dictionary of keyids to ``Signature`` objects, each 

118 signing the canonical serialized representation of ``signed``. 

119 Default is an empty dictionary. 

120 unrecognized_fields: Dictionary of all attributes that are not managed 

121 by TUF Metadata API. These fields are NOT signed and it's preferable 

122 if unrecognized fields are added to the Signed derivative classes. 

123 """ 

124 

125 def __init__( 

126 self, 

127 signed: T, 

128 signatures: dict[str, Signature] | None = None, 

129 unrecognized_fields: dict[str, Any] | None = None, 

130 ): 

131 self.signed: T = signed 

132 self.signatures = signatures if signatures is not None else {} 

133 if unrecognized_fields is None: 

134 unrecognized_fields = {} 

135 

136 self.unrecognized_fields = unrecognized_fields 

137 

138 def __eq__(self, other: object) -> bool: 

139 if not isinstance(other, Metadata): 

140 return False 

141 

142 return ( 

143 self.signatures == other.signatures 

144 # Order of the signatures matters (see issue #1788). 

145 and list(self.signatures.items()) == list(other.signatures.items()) 

146 and self.signed == other.signed 

147 and self.unrecognized_fields == other.unrecognized_fields 

148 ) 

149 

150 @property 

151 def signed_bytes(self) -> bytes: 

152 """Default canonical json byte representation of ``self.signed``.""" 

153 

154 # Use local scope import to avoid circular import errors 

155 from tuf.api.serialization.json import CanonicalJSONSerializer 

156 

157 return CanonicalJSONSerializer().serialize(self.signed) 

158 

159 @classmethod 

160 def from_dict(cls, metadata: dict[str, Any]) -> Metadata[T]: 

161 """Create ``Metadata`` object from its json/dict representation. 

162 

163 Args: 

164 metadata: TUF metadata in dict representation. 

165 

166 Raises: 

167 ValueError, KeyError, TypeError: Invalid arguments. 

168 

169 Side Effect: 

170 Destroys the metadata dict passed by reference. 

171 

172 Returns: 

173 TUF ``Metadata`` object. 

174 """ 

175 

176 # Dispatch to contained metadata class on metadata _type field. 

177 _type = metadata["signed"]["_type"] 

178 

179 if _type == _TARGETS: 

180 inner_cls: type[Signed] = Targets 

181 elif _type == _SNAPSHOT: 

182 inner_cls = Snapshot 

183 elif _type == _TIMESTAMP: 

184 inner_cls = Timestamp 

185 elif _type == _ROOT: 

186 inner_cls = Root 

187 else: 

188 raise ValueError(f'unrecognized metadata type "{_type}"') 

189 

190 # Make sure signatures are unique 

191 signatures: dict[str, Signature] = {} 

192 for sig_dict in metadata.pop("signatures"): 

193 sig = Signature.from_dict(sig_dict) 

194 if sig.keyid in signatures: 

195 raise ValueError( 

196 f"Multiple signatures found for keyid {sig.keyid}" 

197 ) 

198 signatures[sig.keyid] = sig 

199 

200 return cls( 

201 # Specific type T is not known at static type check time: use cast 

202 signed=cast(T, inner_cls.from_dict(metadata.pop("signed"))), 

203 signatures=signatures, 

204 # All fields left in the metadata dict are unrecognized. 

205 unrecognized_fields=metadata, 

206 ) 

207 

208 @classmethod 

209 def from_file( 

210 cls, 

211 filename: str, 

212 deserializer: MetadataDeserializer | None = None, 

213 storage_backend: StorageBackendInterface | None = None, 

214 ) -> Metadata[T]: 

215 """Load TUF metadata from file storage. 

216 

217 Args: 

218 filename: Path to read the file from. 

219 deserializer: ``MetadataDeserializer`` subclass instance that 

220 implements the desired wireline format deserialization. Per 

221 default a ``JSONDeserializer`` is used. 

222 storage_backend: Object that implements 

223 ``securesystemslib.storage.StorageBackendInterface``. 

224 Default is ``FilesystemBackend`` (i.e. a local file). 

225 

226 Raises: 

227 StorageError: The file cannot be read. 

228 tuf.api.serialization.DeserializationError: 

229 The file cannot be deserialized. 

230 

231 Returns: 

232 TUF ``Metadata`` object. 

233 """ 

234 

235 if storage_backend is None: 

236 storage_backend = FilesystemBackend() 

237 

238 with storage_backend.get(filename) as file_obj: 

239 return cls.from_bytes(file_obj.read(), deserializer) 

240 

241 @classmethod 

242 def from_bytes( 

243 cls, 

244 data: bytes, 

245 deserializer: MetadataDeserializer | None = None, 

246 ) -> Metadata[T]: 

247 """Load TUF metadata from raw data. 

248 

249 Args: 

250 data: Metadata content. 

251 deserializer: ``MetadataDeserializer`` implementation to use. 

252 Default is ``JSONDeserializer``. 

253 

254 Raises: 

255 tuf.api.serialization.DeserializationError: 

256 The file cannot be deserialized. 

257 

258 Returns: 

259 TUF ``Metadata`` object. 

260 """ 

261 

262 if deserializer is None: 

263 # Use local scope import to avoid circular import errors 

264 from tuf.api.serialization.json import JSONDeserializer 

265 

266 deserializer = JSONDeserializer() 

267 

268 return deserializer.deserialize(data) 

269 

270 def to_bytes(self, serializer: MetadataSerializer | None = None) -> bytes: 

271 """Return the serialized TUF file format as bytes. 

272 

273 Note that if bytes are first deserialized into ``Metadata`` and then 

274 serialized with ``to_bytes()``, the two are not required to be 

275 identical even though the signatures are guaranteed to stay valid. If 

276 byte-for-byte equivalence is required (which is the case when content 

277 hashes are used in other metadata), the original content should be used 

278 instead of re-serializing. 

279 

280 Args: 

281 serializer: ``MetadataSerializer`` instance that implements the 

282 desired serialization format. Default is ``JSONSerializer``. 

283 

284 Raises: 

285 tuf.api.serialization.SerializationError: 

286 The metadata object cannot be serialized. 

287 """ 

288 

289 if serializer is None: 

290 # Use local scope import to avoid circular import errors 

291 from tuf.api.serialization.json import JSONSerializer 

292 

293 serializer = JSONSerializer(compact=True) 

294 

295 return serializer.serialize(self) 

296 

297 def to_dict(self) -> dict[str, Any]: 

298 """Return the dict representation of self.""" 

299 

300 signatures = [sig.to_dict() for sig in self.signatures.values()] 

301 

302 return { 

303 "signatures": signatures, 

304 "signed": self.signed.to_dict(), 

305 **self.unrecognized_fields, 

306 } 

307 

308 def to_file( 

309 self, 

310 filename: str, 

311 serializer: MetadataSerializer | None = None, 

312 storage_backend: StorageBackendInterface | None = None, 

313 ) -> None: 

314 """Write TUF metadata to file storage. 

315 

316 Note that if a file is first deserialized into ``Metadata`` and then 

317 serialized with ``to_file()``, the two files are not required to be 

318 identical even though the signatures are guaranteed to stay valid. If 

319 byte-for-byte equivalence is required (which is the case when file 

320 hashes are used in other metadata), the original file should be used 

321 instead of re-serializing. 

322 

323 Args: 

324 filename: Path to write the file to. 

325 serializer: ``MetadataSerializer`` instance that implements the 

326 desired serialization format. Default is ``JSONSerializer``. 

327 storage_backend: ``StorageBackendInterface`` implementation. Default 

328 is ``FilesystemBackend`` (i.e. a local file). 

329 

330 Raises: 

331 tuf.api.serialization.SerializationError: 

332 The metadata object cannot be serialized. 

333 StorageError: The file cannot be written. 

334 """ 

335 

336 if storage_backend is None: 

337 storage_backend = FilesystemBackend() 

338 

339 bytes_data = self.to_bytes(serializer) 

340 

341 with tempfile.TemporaryFile() as temp_file: 

342 temp_file.write(bytes_data) 

343 storage_backend.put(temp_file, filename) 

344 

345 # Signatures. 

346 def sign( 

347 self, 

348 signer: Signer, 

349 append: bool = False, 

350 signed_serializer: SignedSerializer | None = None, 

351 ) -> Signature: 

352 """Create signature over ``signed`` and assigns it to ``signatures``. 

353 

354 Args: 

355 signer: A ``securesystemslib.signer.Signer`` object that provides a 

356 signing implementation to generate the signature. 

357 append: ``True`` if the signature should be appended to 

358 the list of signatures or replace any existing signatures. The 

359 default behavior is to replace signatures. 

360 signed_serializer: ``SignedSerializer`` that implements the desired 

361 serialization format. Default is ``CanonicalJSONSerializer``. 

362 

363 Raises: 

364 tuf.api.serialization.SerializationError: 

365 ``signed`` cannot be serialized. 

366 UnsignedMetadataError: Signing errors. 

367 

368 Returns: 

369 ``securesystemslib.signer.Signature`` object that was added into 

370 signatures. 

371 """ 

372 

373 if signed_serializer is None: 

374 bytes_data = self.signed_bytes 

375 else: 

376 bytes_data = signed_serializer.serialize(self.signed) 

377 

378 try: 

379 signature = signer.sign(bytes_data) 

380 except Exception as e: 

381 raise UnsignedMetadataError(f"Failed to sign: {e}") from e 

382 

383 if not append: 

384 self.signatures.clear() 

385 

386 self.signatures[signature.keyid] = signature 

387 

388 return signature 

389 

390 def verify_delegate( 

391 self, 

392 delegated_role: str, 

393 delegated_metadata: Metadata, 

394 signed_serializer: SignedSerializer | None = None, 

395 ) -> None: 

396 """Verify that ``delegated_metadata`` is signed with the required 

397 threshold of keys for ``delegated_role``. 

398 

399 .. deprecated:: 3.1.0 

400 Please use ``Root.verify_delegate()`` or 

401 ``Targets.verify_delegate()``. 

402 """ 

403 

404 if self.signed.type not in ["root", "targets"]: 

405 raise TypeError("Call is valid only on delegator metadata") 

406 

407 if signed_serializer is None: 

408 payload = delegated_metadata.signed_bytes 

409 

410 else: 

411 payload = signed_serializer.serialize(delegated_metadata.signed) 

412 

413 self.signed.verify_delegate( 

414 delegated_role, payload, delegated_metadata.signatures 

415 )