Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tuf/ngclient/updater.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

222 statements  

1# Copyright 2020, New York University and the TUF contributors 

2# SPDX-License-Identifier: MIT OR Apache-2.0 

3 

4"""Client update workflow implementation. 

5 

6The ``Updater`` class provides an implementation of the 

7`TUF client workflow 

8<https://theupdateframework.github.io/specification/latest/#detailed-client-workflow>`_. 

9``Updater`` provides an API to query available targets and to download them in a 

10secure manner: All downloaded files are verified by signed metadata. 

11 

12High-level description of ``Updater`` functionality: 

13 * Initializing an ``Updater`` loads and validates the trusted local root 

14 metadata: This root metadata is used as the source of trust for all other 

15 metadata. Updater should always be initialized with the ``bootstrap`` 

16 argument: pass ``bootstrap=None`` only to explicitly opt into using the 

17 cached root.json as the trust anchor. 

18 * ``refresh()`` can optionally be called to update and load all top-level 

19 metadata as described in the specification, using both locally cached 

20 metadata and metadata downloaded from the remote repository. If refresh is 

21 not done explicitly, it will happen automatically during the first target 

22 info lookup. 

23 * ``Updater`` can be used to download targets. For each target: 

24 

25 * ``Updater.get_targetinfo()`` is first used to find information about a 

26 specific target. This will load new targets metadata as needed (from 

27 local cache or remote repository). 

28 * ``Updater.find_cached_target()`` can optionally be used to check if a 

29 target file is already locally cached. 

30 * ``Updater.download_target()`` downloads a target file and ensures it is 

31 verified correct by the metadata. 

32 

33Note that applications using ``Updater`` should be 'single instance' 

34applications: running multiple instances that use the same cache directories at 

35the same time is not supported. 

36 

37A simple example of using the Updater to implement a Python TUF client that 

38downloads target files is available in `examples/client 

39<https://github.com/theupdateframework/python-tuf/tree/develop/examples/client>`_. 

40 

41Notes on how Updater uses HTTP by default: 

42 * urllib3 is the HTTP library 

43 * Typically all requests are retried by urllib3 three times (in cases where 

44 this seems useful) 

45 * Operating system certificate store is used for TLS, in other words 

46 ``certifi`` is not used as the certificate source 

47 * Proxy use can be configured with ``https_proxy`` and other similar 

48 environment variables 

49 

50All of the HTTP decisions can be changed with ``fetcher`` argument: 

51Custom ``FetcherInterface`` implementations are possible. The alternative 

52``RequestsFetcher`` implementation is also provided (although deprecated). 

53""" 

54 

55from __future__ import annotations 

56 

57import contextlib 

58import logging 

59import os 

60import shutil 

61import tempfile 

62from pathlib import Path 

63from typing import TYPE_CHECKING, cast 

64from urllib import parse 

65 

66from tuf.api import exceptions 

67from tuf.api.metadata import Root, Snapshot, TargetFile, Targets, Timestamp 

68from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet 

69from tuf.ngclient.config import EnvelopeType, UpdaterConfig 

70from tuf.ngclient.urllib3_fetcher import Urllib3Fetcher 

71 

72if TYPE_CHECKING: 

73 from tuf.ngclient.fetcher import FetcherInterface 

74 

75logger = logging.getLogger(__name__) 

76 

77 

78class Updater: 

79 """Creates a new ``Updater`` instance and loads trusted root metadata. 

80 

81 Args: 

82 metadata_dir: Local metadata directory. Directory must be 

83 writable. If ``bootstrap`` is ``None``, this directory must contain 

84 a trusted root.json file. 

85 metadata_base_url: Base URL for all remote metadata downloads 

86 target_dir: Local targets directory. Directory must be writable. It 

87 will be used as the default target download directory by 

88 ``find_cached_target()`` and ``download_target()`` 

89 target_base_url: ``Optional``; Default base URL for all remote target 

90 downloads. Can be individually set in ``download_target()`` 

91 fetcher: ``Optional``; ``FetcherInterface`` implementation used to 

92 download both metadata and targets. Default is ``Urllib3Fetcher`` 

93 config: ``Optional``; ``UpdaterConfig`` could be used to setup common 

94 configuration options. 

95 bootstrap: Initial root metadata bytes. This argument is required. 

96 Pass the embedded root metadata bytes for secure initialization. 

97 Pass ``None`` only if you explicitly want to use the cached 

98 root.json as the trust anchor (not recommended for most 

99 deployments). 

100 

101 Raises: 

102 OSError: Local root.json cannot be read 

103 RepositoryError: Local root.json is invalid 

104 """ 

105 

106 def __init__( 

107 self, 

108 metadata_dir: str, 

109 metadata_base_url: str, 

110 target_dir: str | None = None, 

111 target_base_url: str | None = None, 

112 fetcher: FetcherInterface | None = None, 

113 config: UpdaterConfig | None = None, 

114 *, 

115 bootstrap: bytes | None, 

116 ): 

117 self._dir = metadata_dir 

118 self._metadata_base_url = _ensure_trailing_slash(metadata_base_url) 

119 self.target_dir = target_dir 

120 if target_base_url is None: 

121 self._target_base_url = None 

122 else: 

123 self._target_base_url = _ensure_trailing_slash(target_base_url) 

124 

125 self.config = config or UpdaterConfig() 

126 if fetcher is not None: 

127 self._fetcher = fetcher 

128 else: 

129 self._fetcher = Urllib3Fetcher( 

130 app_user_agent=self.config.app_user_agent 

131 ) 

132 supported_envelopes = [EnvelopeType.METADATA, EnvelopeType.SIMPLE] 

133 if self.config.envelope_type not in supported_envelopes: 

134 raise ValueError( 

135 f"config: envelope_type must be one of {supported_envelopes}, " 

136 f"got '{self.config.envelope_type}'" 

137 ) 

138 

139 if bootstrap is None: 

140 # if no root was provided, use the cached non-versioned root.json 

141 bootstrap = self._load_local_metadata(Root.type) 

142 

143 # Load the initial root, make sure it's cached 

144 self._trusted_set = TrustedMetadataSet( 

145 bootstrap, self.config.envelope_type 

146 ) 

147 self._persist_root(self._trusted_set.root.version, bootstrap) 

148 self._update_root_symlink() 

149 

150 def refresh(self) -> None: 

151 """Refresh top-level metadata. 

152 

153 Downloads, verifies, and loads metadata for the top-level roles in the 

154 specified order (root -> timestamp -> snapshot -> targets) implementing 

155 all the checks required in the TUF client workflow. 

156 

157 A ``refresh()`` can be done only once during the lifetime of an Updater. 

158 If ``refresh()`` has not been explicitly called before the first 

159 ``get_targetinfo()`` call, it will be done implicitly at that time. 

160 

161 The metadata for delegated roles is not updated by ``refresh()``: 

162 that happens on demand during ``get_targetinfo()``. However, if the 

163 repository uses `consistent_snapshot 

164 <https://theupdateframework.github.io/specification/latest/#consistent-snapshots>`_, 

165 then all metadata downloaded by the Updater will use the same consistent 

166 repository state. 

167 

168 Raises: 

169 OSError: New metadata could not be written to disk 

170 RepositoryError: Metadata failed to verify in some way 

171 DownloadError: Download of a metadata file failed in some way 

172 """ 

173 

174 self._load_root() 

175 self._load_timestamp() 

176 self._load_snapshot() 

177 self._load_targets(Targets.type, Root.type) 

178 

179 def _generate_target_file_path(self, targetinfo: TargetFile) -> str: 

180 if self.target_dir is None: 

181 raise ValueError("target_dir must be set if filepath is not given") 

182 

183 # Use URL encoded target path as filename 

184 filename = parse.quote(targetinfo.path, "") 

185 return os.path.join(self.target_dir, filename) 

186 

187 def get_targetinfo(self, target_path: str) -> TargetFile | None: 

188 """Return ``TargetFile`` instance with information for ``target_path``. 

189 

190 The return value can be used as an argument to 

191 ``download_target()`` and ``find_cached_target()``. 

192 

193 If ``refresh()`` has not been called before calling 

194 ``get_targetinfo()``, the refresh will be done implicitly. 

195 

196 As a side-effect this method downloads all the additional (delegated 

197 targets) metadata it needs to return the target information. 

198 

199 Args: 

200 target_path: `path-relative-URL string 

201 <https://url.spec.whatwg.org/#path-relative-url-string>`_ 

202 that uniquely identifies the target within the repository. 

203 

204 Raises: 

205 OSError: New metadata could not be written to disk 

206 RepositoryError: Metadata failed to verify in some way 

207 DownloadError: Download of a metadata file failed in some way 

208 

209 Returns: 

210 ``TargetFile`` instance or ``None``. 

211 """ 

212 

213 if Targets.type not in self._trusted_set: 

214 self.refresh() 

215 return self._preorder_depth_first_walk(target_path) 

216 

217 def find_cached_target( 

218 self, 

219 targetinfo: TargetFile, 

220 filepath: str | None = None, 

221 ) -> str | None: 

222 """Check whether a local file is an up to date target. 

223 

224 Args: 

225 targetinfo: ``TargetFile`` from ``get_targetinfo()``. 

226 filepath: Local path to file. If ``None``, a file path is 

227 generated based on ``target_dir`` constructor argument. 

228 

229 Raises: 

230 ValueError: Incorrect arguments 

231 

232 Returns: 

233 Local file path if the file is an up to date target file. 

234 ``None`` if file is not found or it is not up to date. 

235 """ 

236 

237 if filepath is None: 

238 filepath = self._generate_target_file_path(targetinfo) 

239 

240 try: 

241 with open(filepath, "rb") as target_file: 

242 targetinfo.verify_length_and_hashes(target_file) 

243 return filepath 

244 except (OSError, exceptions.LengthOrHashMismatchError): 

245 return None 

246 

247 def download_target( 

248 self, 

249 targetinfo: TargetFile, 

250 filepath: str | None = None, 

251 target_base_url: str | None = None, 

252 ) -> str: 

253 """Download the target file specified by ``targetinfo``. 

254 

255 Args: 

256 targetinfo: ``TargetFile`` from ``get_targetinfo()``. 

257 filepath: Local path to download into. If ``None``, the file is 

258 downloaded into directory defined by ``target_dir`` constructor 

259 argument using a generated filename. If file already exists, 

260 it is overwritten. 

261 target_base_url: Base URL used to form the final target 

262 download URL. Default is the value provided in ``Updater()`` 

263 

264 Raises: 

265 ValueError: Invalid arguments 

266 DownloadError: Download of the target file failed in some way 

267 RepositoryError: Downloaded target failed to be verified in some way 

268 OSError: Failed to write target to file 

269 

270 Returns: 

271 Local path to downloaded file 

272 """ 

273 

274 if filepath is None: 

275 filepath = self._generate_target_file_path(targetinfo) 

276 Path(filepath).parent.mkdir(exist_ok=True, parents=True) 

277 

278 if target_base_url is None: 

279 if self._target_base_url is None: 

280 raise ValueError( 

281 "target_base_url must be set in either " 

282 "download_target() or constructor" 

283 ) 

284 

285 target_base_url = self._target_base_url 

286 else: 

287 target_base_url = _ensure_trailing_slash(target_base_url) 

288 

289 target_filepath = targetinfo.path 

290 consistent_snapshot = self._trusted_set.root.consistent_snapshot 

291 if consistent_snapshot and self.config.prefix_targets_with_hash: 

292 hashes = list(targetinfo.hashes.values()) 

293 dirname, sep, basename = target_filepath.rpartition("/") 

294 target_filepath = f"{dirname}{sep}{hashes[0]}.{basename}" 

295 full_url = f"{target_base_url}{target_filepath}" 

296 

297 with self._fetcher.download_file( 

298 full_url, targetinfo.length 

299 ) as target_file: 

300 targetinfo.verify_length_and_hashes(target_file) 

301 

302 target_file.seek(0) 

303 with open(filepath, "wb") as destination_file: 

304 shutil.copyfileobj(target_file, destination_file) 

305 

306 logger.debug("Downloaded target %s", targetinfo.path) 

307 return filepath 

308 

309 def _download_metadata( 

310 self, rolename: str, length: int, version: int | None = None 

311 ) -> bytes: 

312 """Download a metadata file and return it as bytes.""" 

313 encoded_name = parse.quote(rolename, "") 

314 if version is None: 

315 url = f"{self._metadata_base_url}{encoded_name}.json" 

316 else: 

317 url = f"{self._metadata_base_url}{version}.{encoded_name}.json" 

318 return self._fetcher.download_bytes(url, length) 

319 

320 def _load_local_metadata(self, rolename: str) -> bytes: 

321 encoded_name = parse.quote(rolename, "") 

322 with open(os.path.join(self._dir, f"{encoded_name}.json"), "rb") as f: 

323 return f.read() 

324 

325 def _persist_metadata(self, rolename: str, data: bytes) -> None: 

326 """Write metadata to disk atomically to avoid data loss. 

327 

328 Use a filename _not_ prefixed with version (e.g. "timestamp.json") 

329 . Encode the rolename to avoid issues with e.g. path separators 

330 """ 

331 

332 encoded_name = parse.quote(rolename, "") 

333 filename = os.path.join(self._dir, f"{encoded_name}.json") 

334 self._persist_file(filename, data) 

335 

336 def _persist_root(self, version: int, data: bytes) -> None: 

337 """Write root metadata to disk atomically to avoid data loss. 

338 

339 The metadata is stored with version prefix (e.g. 

340 "root_history/1.root.json"). 

341 """ 

342 rootdir = Path(self._dir, "root_history") 

343 rootdir.mkdir(exist_ok=True, parents=True) 

344 self._persist_file(str(rootdir / f"{version}.root.json"), data) 

345 

346 def _persist_file(self, filename: str, data: bytes) -> None: 

347 """Write a file to disk atomically to avoid data loss.""" 

348 temp_file_name = None 

349 

350 try: 

351 with tempfile.NamedTemporaryFile( 

352 dir=self._dir, delete=False 

353 ) as temp_file: 

354 temp_file_name = temp_file.name 

355 temp_file.write(data) 

356 os.replace(temp_file.name, filename) 

357 except OSError as e: 

358 # remove tempfile if we managed to create one, 

359 # then let the exception happen 

360 if temp_file_name is not None: 

361 with contextlib.suppress(FileNotFoundError): 

362 os.remove(temp_file_name) 

363 raise e 

364 

365 def _update_root_symlink(self) -> None: 

366 """Symlink root.json to current trusted root version in root_history/""" 

367 linkname = os.path.join(self._dir, "root.json") 

368 version = self._trusted_set.root.version 

369 current = os.path.join("root_history", f"{version}.root.json") 

370 with contextlib.suppress(FileNotFoundError): 

371 os.remove(linkname) 

372 os.symlink(current, linkname) 

373 

374 def _load_root(self) -> None: 

375 """Load root metadata. 

376 

377 Sequentially load newer root metadata versions. First try to load from 

378 local cache and if that does not work, from the remote repository. 

379 

380 If metadata is loaded from remote repository, store it in local cache. 

381 """ 

382 

383 # Update the root role 

384 lower_bound = self._trusted_set.root.version + 1 

385 upper_bound = lower_bound + self.config.max_root_rotations 

386 

387 try: 

388 for next_version in range(lower_bound, upper_bound): 

389 # look for next_version in local cache 

390 try: 

391 root_path = os.path.join( 

392 self._dir, "root_history", f"{next_version}.root.json" 

393 ) 

394 with open(root_path, "rb") as f: 

395 self._trusted_set.update_root(f.read()) 

396 continue 

397 except (OSError, exceptions.RepositoryError) as e: 

398 # this root did not exist locally or is invalid 

399 logger.debug("Local root is not valid: %s", e) 

400 

401 # next_version was not found locally, try remote 

402 try: 

403 data = self._download_metadata( 

404 Root.type, 

405 self.config.root_max_length, 

406 next_version, 

407 ) 

408 self._trusted_set.update_root(data) 

409 self._persist_root(next_version, data) 

410 

411 except exceptions.DownloadHTTPError as exception: 

412 if exception.status_code not in {403, 404}: 

413 raise 

414 # 404/403 means current root is newest available 

415 break 

416 finally: 

417 # Make sure the non-versioned root.json links to current version 

418 self._update_root_symlink() 

419 

420 def _load_timestamp(self) -> None: 

421 """Load local and remote timestamp metadata.""" 

422 try: 

423 data = self._load_local_metadata(Timestamp.type) 

424 self._trusted_set.update_timestamp(data) 

425 except (OSError, exceptions.RepositoryError) as e: 

426 # Local timestamp does not exist or is invalid 

427 logger.debug("Local timestamp not valid as final: %s", e) 

428 

429 # Load from remote (whether local load succeeded or not) 

430 data = self._download_metadata( 

431 Timestamp.type, self.config.timestamp_max_length 

432 ) 

433 try: 

434 self._trusted_set.update_timestamp(data) 

435 except exceptions.EqualVersionNumberError: 

436 # If the new timestamp version is the same as current, discard the 

437 # new timestamp. This is normal and it shouldn't raise any error. 

438 return 

439 

440 self._persist_metadata(Timestamp.type, data) 

441 

442 def _load_snapshot(self) -> None: 

443 """Load local (and if needed remote) snapshot metadata.""" 

444 try: 

445 data = self._load_local_metadata(Snapshot.type) 

446 self._trusted_set.update_snapshot(data, trusted=True) 

447 logger.debug("Local snapshot is valid: not downloading new one") 

448 except (OSError, exceptions.RepositoryError) as e: 

449 # Local snapshot does not exist or is invalid: update from remote 

450 logger.debug("Local snapshot not valid as final: %s", e) 

451 

452 snapshot_meta = self._trusted_set.timestamp.snapshot_meta 

453 length = snapshot_meta.length or self.config.snapshot_max_length 

454 version = None 

455 if self._trusted_set.root.consistent_snapshot: 

456 version = snapshot_meta.version 

457 

458 data = self._download_metadata(Snapshot.type, length, version) 

459 self._trusted_set.update_snapshot(data) 

460 self._persist_metadata(Snapshot.type, data) 

461 

462 def _load_targets(self, role: str, parent_role: str) -> Targets: 

463 """Load local (and if needed remote) metadata for ``role``.""" 

464 

465 # Avoid loading 'role' more than once during "get_targetinfo" 

466 if role in self._trusted_set: 

467 return cast("Targets", self._trusted_set[role]) 

468 

469 try: 

470 data = self._load_local_metadata(role) 

471 delegated_targets = self._trusted_set.update_delegated_targets( 

472 data, role, parent_role 

473 ) 

474 logger.debug("Local %s is valid: not downloading new one", role) 

475 return delegated_targets 

476 except (OSError, exceptions.RepositoryError) as e: 

477 # Local 'role' does not exist or is invalid: update from remote 

478 logger.debug("Failed to load local %s: %s", role, e) 

479 

480 snapshot = self._trusted_set.snapshot 

481 metainfo = snapshot.meta.get(f"{role}.json") 

482 if metainfo is None: 

483 raise exceptions.RepositoryError( 

484 f"Role {role} was delegated but is not part of snapshot" 

485 ) from None 

486 

487 length = metainfo.length or self.config.targets_max_length 

488 version = None 

489 if self._trusted_set.root.consistent_snapshot: 

490 version = metainfo.version 

491 

492 data = self._download_metadata(role, length, version) 

493 delegated_targets = self._trusted_set.update_delegated_targets( 

494 data, role, parent_role 

495 ) 

496 self._persist_metadata(role, data) 

497 

498 return delegated_targets 

499 

500 def _preorder_depth_first_walk( 

501 self, target_filepath: str 

502 ) -> TargetFile | None: 

503 """ 

504 Interrogates the tree of target delegations in order of appearance 

505 (which implicitly order trustworthiness), and returns the matching 

506 target found in the most trusted role. 

507 """ 

508 

509 # List of delegations to be interrogated. A (role, parent role) pair 

510 # is needed to load and verify the delegated targets metadata. 

511 delegations_to_visit = [(Targets.type, Root.type)] 

512 visited_role_names: set[str] = set() 

513 

514 # Preorder depth-first traversal of the graph of target delegations. 

515 while ( 

516 len(visited_role_names) <= self.config.max_delegations 

517 and len(delegations_to_visit) > 0 

518 ): 

519 # Pop the role name from the top of the stack. 

520 role_name, parent_role = delegations_to_visit.pop(-1) 

521 

522 # Skip any visited current role to prevent cycles. 

523 if role_name in visited_role_names: 

524 logger.debug("Skipping visited current role %s", role_name) 

525 continue 

526 

527 # The metadata for 'role_name' must be downloaded/updated before 

528 # its targets, delegations, and child roles can be inspected. 

529 targets = self._load_targets(role_name, parent_role) 

530 

531 target = targets.targets.get(target_filepath) 

532 

533 if target is not None: 

534 logger.debug("Found target in current role %s", role_name) 

535 return target 

536 

537 # After preorder check, add current role to set of visited roles. 

538 visited_role_names.add(role_name) 

539 

540 if targets.delegations is not None: 

541 child_roles_to_visit = [] 

542 # NOTE: This may be a slow operation if there are many 

543 # delegated roles. 

544 for ( 

545 child_name, 

546 terminating, 

547 ) in targets.delegations.get_roles_for_target(target_filepath): 

548 logger.debug("Adding child role %s", child_name) 

549 child_roles_to_visit.append((child_name, role_name)) 

550 if terminating: 

551 logger.debug("Not backtracking to other roles") 

552 delegations_to_visit = [] 

553 break 

554 # Push 'child_roles_to_visit' in reverse order of appearance 

555 # onto 'delegations_to_visit'. Roles are popped from the end of 

556 # the list. 

557 child_roles_to_visit.reverse() 

558 delegations_to_visit.extend(child_roles_to_visit) 

559 

560 if len(delegations_to_visit) > 0: 

561 logger.debug( 

562 "%d roles left to visit, but allowed at most %d delegations", 

563 len(delegations_to_visit), 

564 self.config.max_delegations, 

565 ) 

566 

567 # If this point is reached then target is not found, return None 

568 return None 

569 

570 

571def _ensure_trailing_slash(url: str) -> str: 

572 """Return url guaranteed to end in a slash.""" 

573 return url if url.endswith("/") else f"{url}/"