Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tuf/ngclient/updater.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

222 statements  

1# Copyright 2020, New York University and the TUF contributors 

2# SPDX-License-Identifier: MIT OR Apache-2.0 

3 

4"""Client update workflow implementation. 

5 

6The ``Updater`` class provides an implementation of the 

7`TUF client workflow 

8<https://theupdateframework.github.io/specification/latest/#detailed-client-workflow>`_. 

9``Updater`` provides an API to query available targets and to download them in a 

10secure manner: All downloaded files are verified by signed metadata. 

11 

12High-level description of ``Updater`` functionality: 

13 * Initializing an ``Updater`` loads and validates the trusted local root 

14 metadata: This root metadata is used as the source of trust for all other 

15 metadata. Updater should always be initialized with the ``bootstrap`` 

16 argument: if this is not possible, it can be initialized from cache only. 

17 * ``refresh()`` can optionally be called to update and load all top-level 

18 metadata as described in the specification, using both locally cached 

19 metadata and metadata downloaded from the remote repository. If refresh is 

20 not done explicitly, it will happen automatically during the first target 

21 info lookup. 

22 * ``Updater`` can be used to download targets. For each target: 

23 

24 * ``Updater.get_targetinfo()`` is first used to find information about a 

25 specific target. This will load new targets metadata as needed (from 

26 local cache or remote repository). 

27 * ``Updater.find_cached_target()`` can optionally be used to check if a 

28 target file is already locally cached. 

29 * ``Updater.download_target()`` downloads a target file and ensures it is 

30 verified correct by the metadata. 

31 

32Note that applications using ``Updater`` should be 'single instance' 

33applications: running multiple instances that use the same cache directories at 

34the same time is not supported. 

35 

36A simple example of using the Updater to implement a Python TUF client that 

37downloads target files is available in `examples/client 

38<https://github.com/theupdateframework/python-tuf/tree/develop/examples/client>`_. 

39 

40Notes on how Updater uses HTTP by default: 

41 * urllib3 is the HTTP library 

42 * Typically all requests are retried by urllib3 three times (in cases where 

43 this seems useful) 

44 * Operating system certificate store is used for TLS, in other words 

45 ``certifi`` is not used as the certificate source 

46 * Proxy use can be configured with ``https_proxy`` and other similar 

47 environment variables 

48 

49All of the HTTP decisions can be changed with ``fetcher`` argument: 

50Custom ``FetcherInterface`` implementations are possible. The alternative 

51``RequestsFetcher`` implementation is also provided (although deprecated). 

52""" 

53 

54from __future__ import annotations 

55 

56import contextlib 

57import logging 

58import os 

59import shutil 

60import tempfile 

61from pathlib import Path 

62from typing import TYPE_CHECKING, cast 

63from urllib import parse 

64 

65from tuf.api import exceptions 

66from tuf.api.metadata import Root, Snapshot, TargetFile, Targets, Timestamp 

67from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet 

68from tuf.ngclient.config import EnvelopeType, UpdaterConfig 

69from tuf.ngclient.urllib3_fetcher import Urllib3Fetcher 

70 

71if TYPE_CHECKING: 

72 from tuf.ngclient.fetcher import FetcherInterface 

73 

74logger = logging.getLogger(__name__) 

75 

76 

77class Updater: 

78 """Creates a new ``Updater`` instance and loads trusted root metadata. 

79 

80 Args: 

81 metadata_dir: Local metadata directory. Directory must be 

82 writable and it must contain a trusted root.json file 

83 metadata_base_url: Base URL for all remote metadata downloads 

84 target_dir: Local targets directory. Directory must be writable. It 

85 will be used as the default target download directory by 

86 ``find_cached_target()`` and ``download_target()`` 

87 target_base_url: ``Optional``; Default base URL for all remote target 

88 downloads. Can be individually set in ``download_target()`` 

89 fetcher: ``Optional``; ``FetcherInterface`` implementation used to 

90 download both metadata and targets. Default is ``Urllib3Fetcher`` 

91 config: ``Optional``; ``UpdaterConfig`` could be used to setup common 

92 configuration options. 

93 bootstrap: ``Optional``; initial root metadata. A bootstrap root should 

94 always be provided. If it is not, the current root.json in the 

95 metadata cache is used as the initial root. 

96 

97 Raises: 

98 OSError: Local root.json cannot be read 

99 RepositoryError: Local root.json is invalid 

100 """ 

101 

102 def __init__( 

103 self, 

104 metadata_dir: str, 

105 metadata_base_url: str, 

106 target_dir: str | None = None, 

107 target_base_url: str | None = None, 

108 fetcher: FetcherInterface | None = None, 

109 config: UpdaterConfig | None = None, 

110 bootstrap: bytes | None = None, 

111 ): 

112 self._dir = metadata_dir 

113 self._metadata_base_url = _ensure_trailing_slash(metadata_base_url) 

114 self.target_dir = target_dir 

115 if target_base_url is None: 

116 self._target_base_url = None 

117 else: 

118 self._target_base_url = _ensure_trailing_slash(target_base_url) 

119 

120 self.config = config or UpdaterConfig() 

121 if fetcher is not None: 

122 self._fetcher = fetcher 

123 else: 

124 self._fetcher = Urllib3Fetcher( 

125 app_user_agent=self.config.app_user_agent 

126 ) 

127 supported_envelopes = [EnvelopeType.METADATA, EnvelopeType.SIMPLE] 

128 if self.config.envelope_type not in supported_envelopes: 

129 raise ValueError( 

130 f"config: envelope_type must be one of {supported_envelopes}, " 

131 f"got '{self.config.envelope_type}'" 

132 ) 

133 

134 if not bootstrap: 

135 # if no root was provided, use the cached non-versioned root.json 

136 bootstrap = self._load_local_metadata(Root.type) 

137 

138 # Load the initial root, make sure it's cached 

139 self._trusted_set = TrustedMetadataSet( 

140 bootstrap, self.config.envelope_type 

141 ) 

142 self._persist_root(self._trusted_set.root.version, bootstrap) 

143 self._update_root_symlink() 

144 

145 def refresh(self) -> None: 

146 """Refresh top-level metadata. 

147 

148 Downloads, verifies, and loads metadata for the top-level roles in the 

149 specified order (root -> timestamp -> snapshot -> targets) implementing 

150 all the checks required in the TUF client workflow. 

151 

152 A ``refresh()`` can be done only once during the lifetime of an Updater. 

153 If ``refresh()`` has not been explicitly called before the first 

154 ``get_targetinfo()`` call, it will be done implicitly at that time. 

155 

156 The metadata for delegated roles is not updated by ``refresh()``: 

157 that happens on demand during ``get_targetinfo()``. However, if the 

158 repository uses `consistent_snapshot 

159 <https://theupdateframework.github.io/specification/latest/#consistent-snapshots>`_, 

160 then all metadata downloaded by the Updater will use the same consistent 

161 repository state. 

162 

163 Raises: 

164 OSError: New metadata could not be written to disk 

165 RepositoryError: Metadata failed to verify in some way 

166 DownloadError: Download of a metadata file failed in some way 

167 """ 

168 

169 self._load_root() 

170 self._load_timestamp() 

171 self._load_snapshot() 

172 self._load_targets(Targets.type, Root.type) 

173 

174 def _generate_target_file_path(self, targetinfo: TargetFile) -> str: 

175 if self.target_dir is None: 

176 raise ValueError("target_dir must be set if filepath is not given") 

177 

178 # Use URL encoded target path as filename 

179 filename = parse.quote(targetinfo.path, "") 

180 return os.path.join(self.target_dir, filename) 

181 

182 def get_targetinfo(self, target_path: str) -> TargetFile | None: 

183 """Return ``TargetFile`` instance with information for ``target_path``. 

184 

185 The return value can be used as an argument to 

186 ``download_target()`` and ``find_cached_target()``. 

187 

188 If ``refresh()`` has not been called before calling 

189 ``get_targetinfo()``, the refresh will be done implicitly. 

190 

191 As a side-effect this method downloads all the additional (delegated 

192 targets) metadata it needs to return the target information. 

193 

194 Args: 

195 target_path: `path-relative-URL string 

196 <https://url.spec.whatwg.org/#path-relative-url-string>`_ 

197 that uniquely identifies the target within the repository. 

198 

199 Raises: 

200 OSError: New metadata could not be written to disk 

201 RepositoryError: Metadata failed to verify in some way 

202 DownloadError: Download of a metadata file failed in some way 

203 

204 Returns: 

205 ``TargetFile`` instance or ``None``. 

206 """ 

207 

208 if Targets.type not in self._trusted_set: 

209 self.refresh() 

210 return self._preorder_depth_first_walk(target_path) 

211 

212 def find_cached_target( 

213 self, 

214 targetinfo: TargetFile, 

215 filepath: str | None = None, 

216 ) -> str | None: 

217 """Check whether a local file is an up to date target. 

218 

219 Args: 

220 targetinfo: ``TargetFile`` from ``get_targetinfo()``. 

221 filepath: Local path to file. If ``None``, a file path is 

222 generated based on ``target_dir`` constructor argument. 

223 

224 Raises: 

225 ValueError: Incorrect arguments 

226 

227 Returns: 

228 Local file path if the file is an up to date target file. 

229 ``None`` if file is not found or it is not up to date. 

230 """ 

231 

232 if filepath is None: 

233 filepath = self._generate_target_file_path(targetinfo) 

234 

235 try: 

236 with open(filepath, "rb") as target_file: 

237 targetinfo.verify_length_and_hashes(target_file) 

238 return filepath 

239 except (OSError, exceptions.LengthOrHashMismatchError): 

240 return None 

241 

242 def download_target( 

243 self, 

244 targetinfo: TargetFile, 

245 filepath: str | None = None, 

246 target_base_url: str | None = None, 

247 ) -> str: 

248 """Download the target file specified by ``targetinfo``. 

249 

250 Args: 

251 targetinfo: ``TargetFile`` from ``get_targetinfo()``. 

252 filepath: Local path to download into. If ``None``, the file is 

253 downloaded into directory defined by ``target_dir`` constructor 

254 argument using a generated filename. If file already exists, 

255 it is overwritten. 

256 target_base_url: Base URL used to form the final target 

257 download URL. Default is the value provided in ``Updater()`` 

258 

259 Raises: 

260 ValueError: Invalid arguments 

261 DownloadError: Download of the target file failed in some way 

262 RepositoryError: Downloaded target failed to be verified in some way 

263 OSError: Failed to write target to file 

264 

265 Returns: 

266 Local path to downloaded file 

267 """ 

268 

269 if filepath is None: 

270 filepath = self._generate_target_file_path(targetinfo) 

271 Path(filepath).parent.mkdir(exist_ok=True, parents=True) 

272 

273 if target_base_url is None: 

274 if self._target_base_url is None: 

275 raise ValueError( 

276 "target_base_url must be set in either " 

277 "download_target() or constructor" 

278 ) 

279 

280 target_base_url = self._target_base_url 

281 else: 

282 target_base_url = _ensure_trailing_slash(target_base_url) 

283 

284 target_filepath = targetinfo.path 

285 consistent_snapshot = self._trusted_set.root.consistent_snapshot 

286 if consistent_snapshot and self.config.prefix_targets_with_hash: 

287 hashes = list(targetinfo.hashes.values()) 

288 dirname, sep, basename = target_filepath.rpartition("/") 

289 target_filepath = f"{dirname}{sep}{hashes[0]}.{basename}" 

290 full_url = f"{target_base_url}{target_filepath}" 

291 

292 with self._fetcher.download_file( 

293 full_url, targetinfo.length 

294 ) as target_file: 

295 targetinfo.verify_length_and_hashes(target_file) 

296 

297 target_file.seek(0) 

298 with open(filepath, "wb") as destination_file: 

299 shutil.copyfileobj(target_file, destination_file) 

300 

301 logger.debug("Downloaded target %s", targetinfo.path) 

302 return filepath 

303 

304 def _download_metadata( 

305 self, rolename: str, length: int, version: int | None = None 

306 ) -> bytes: 

307 """Download a metadata file and return it as bytes.""" 

308 encoded_name = parse.quote(rolename, "") 

309 if version is None: 

310 url = f"{self._metadata_base_url}{encoded_name}.json" 

311 else: 

312 url = f"{self._metadata_base_url}{version}.{encoded_name}.json" 

313 return self._fetcher.download_bytes(url, length) 

314 

315 def _load_local_metadata(self, rolename: str) -> bytes: 

316 encoded_name = parse.quote(rolename, "") 

317 with open(os.path.join(self._dir, f"{encoded_name}.json"), "rb") as f: 

318 return f.read() 

319 

320 def _persist_metadata(self, rolename: str, data: bytes) -> None: 

321 """Write metadata to disk atomically to avoid data loss. 

322 

323 Use a filename _not_ prefixed with version (e.g. "timestamp.json") 

324 . Encode the rolename to avoid issues with e.g. path separators 

325 """ 

326 

327 encoded_name = parse.quote(rolename, "") 

328 filename = os.path.join(self._dir, f"{encoded_name}.json") 

329 self._persist_file(filename, data) 

330 

331 def _persist_root(self, version: int, data: bytes) -> None: 

332 """Write root metadata to disk atomically to avoid data loss. 

333 

334 The metadata is stored with version prefix (e.g. 

335 "root_history/1.root.json"). 

336 """ 

337 rootdir = Path(self._dir, "root_history") 

338 rootdir.mkdir(exist_ok=True, parents=True) 

339 self._persist_file(str(rootdir / f"{version}.root.json"), data) 

340 

341 def _persist_file(self, filename: str, data: bytes) -> None: 

342 """Write a file to disk atomically to avoid data loss.""" 

343 temp_file_name = None 

344 

345 try: 

346 with tempfile.NamedTemporaryFile( 

347 dir=self._dir, delete=False 

348 ) as temp_file: 

349 temp_file_name = temp_file.name 

350 temp_file.write(data) 

351 os.replace(temp_file.name, filename) 

352 except OSError as e: 

353 # remove tempfile if we managed to create one, 

354 # then let the exception happen 

355 if temp_file_name is not None: 

356 with contextlib.suppress(FileNotFoundError): 

357 os.remove(temp_file_name) 

358 raise e 

359 

360 def _update_root_symlink(self) -> None: 

361 """Symlink root.json to current trusted root version in root_history/""" 

362 linkname = os.path.join(self._dir, "root.json") 

363 version = self._trusted_set.root.version 

364 current = os.path.join("root_history", f"{version}.root.json") 

365 with contextlib.suppress(FileNotFoundError): 

366 os.remove(linkname) 

367 os.symlink(current, linkname) 

368 

369 def _load_root(self) -> None: 

370 """Load root metadata. 

371 

372 Sequentially load newer root metadata versions. First try to load from 

373 local cache and if that does not work, from the remote repository. 

374 

375 If metadata is loaded from remote repository, store it in local cache. 

376 """ 

377 

378 # Update the root role 

379 lower_bound = self._trusted_set.root.version + 1 

380 upper_bound = lower_bound + self.config.max_root_rotations 

381 

382 try: 

383 for next_version in range(lower_bound, upper_bound): 

384 # look for next_version in local cache 

385 try: 

386 root_path = os.path.join( 

387 self._dir, "root_history", f"{next_version}.root.json" 

388 ) 

389 with open(root_path, "rb") as f: 

390 self._trusted_set.update_root(f.read()) 

391 continue 

392 except (OSError, exceptions.RepositoryError) as e: 

393 # this root did not exist locally or is invalid 

394 logger.debug("Local root is not valid: %s", e) 

395 

396 # next_version was not found locally, try remote 

397 try: 

398 data = self._download_metadata( 

399 Root.type, 

400 self.config.root_max_length, 

401 next_version, 

402 ) 

403 self._trusted_set.update_root(data) 

404 self._persist_root(next_version, data) 

405 

406 except exceptions.DownloadHTTPError as exception: 

407 if exception.status_code not in {403, 404}: 

408 raise 

409 # 404/403 means current root is newest available 

410 break 

411 finally: 

412 # Make sure the non-versioned root.json links to current version 

413 self._update_root_symlink() 

414 

415 def _load_timestamp(self) -> None: 

416 """Load local and remote timestamp metadata.""" 

417 try: 

418 data = self._load_local_metadata(Timestamp.type) 

419 self._trusted_set.update_timestamp(data) 

420 except (OSError, exceptions.RepositoryError) as e: 

421 # Local timestamp does not exist or is invalid 

422 logger.debug("Local timestamp not valid as final: %s", e) 

423 

424 # Load from remote (whether local load succeeded or not) 

425 data = self._download_metadata( 

426 Timestamp.type, self.config.timestamp_max_length 

427 ) 

428 try: 

429 self._trusted_set.update_timestamp(data) 

430 except exceptions.EqualVersionNumberError: 

431 # If the new timestamp version is the same as current, discard the 

432 # new timestamp. This is normal and it shouldn't raise any error. 

433 return 

434 

435 self._persist_metadata(Timestamp.type, data) 

436 

437 def _load_snapshot(self) -> None: 

438 """Load local (and if needed remote) snapshot metadata.""" 

439 try: 

440 data = self._load_local_metadata(Snapshot.type) 

441 self._trusted_set.update_snapshot(data, trusted=True) 

442 logger.debug("Local snapshot is valid: not downloading new one") 

443 except (OSError, exceptions.RepositoryError) as e: 

444 # Local snapshot does not exist or is invalid: update from remote 

445 logger.debug("Local snapshot not valid as final: %s", e) 

446 

447 snapshot_meta = self._trusted_set.timestamp.snapshot_meta 

448 length = snapshot_meta.length or self.config.snapshot_max_length 

449 version = None 

450 if self._trusted_set.root.consistent_snapshot: 

451 version = snapshot_meta.version 

452 

453 data = self._download_metadata(Snapshot.type, length, version) 

454 self._trusted_set.update_snapshot(data) 

455 self._persist_metadata(Snapshot.type, data) 

456 

457 def _load_targets(self, role: str, parent_role: str) -> Targets: 

458 """Load local (and if needed remote) metadata for ``role``.""" 

459 

460 # Avoid loading 'role' more than once during "get_targetinfo" 

461 if role in self._trusted_set: 

462 return cast(Targets, self._trusted_set[role]) 

463 

464 try: 

465 data = self._load_local_metadata(role) 

466 delegated_targets = self._trusted_set.update_delegated_targets( 

467 data, role, parent_role 

468 ) 

469 logger.debug("Local %s is valid: not downloading new one", role) 

470 return delegated_targets 

471 except (OSError, exceptions.RepositoryError) as e: 

472 # Local 'role' does not exist or is invalid: update from remote 

473 logger.debug("Failed to load local %s: %s", role, e) 

474 

475 snapshot = self._trusted_set.snapshot 

476 metainfo = snapshot.meta.get(f"{role}.json") 

477 if metainfo is None: 

478 raise exceptions.RepositoryError( 

479 f"Role {role} was delegated but is not part of snapshot" 

480 ) from None 

481 

482 length = metainfo.length or self.config.targets_max_length 

483 version = None 

484 if self._trusted_set.root.consistent_snapshot: 

485 version = metainfo.version 

486 

487 data = self._download_metadata(role, length, version) 

488 delegated_targets = self._trusted_set.update_delegated_targets( 

489 data, role, parent_role 

490 ) 

491 self._persist_metadata(role, data) 

492 

493 return delegated_targets 

494 

495 def _preorder_depth_first_walk( 

496 self, target_filepath: str 

497 ) -> TargetFile | None: 

498 """ 

499 Interrogates the tree of target delegations in order of appearance 

500 (which implicitly order trustworthiness), and returns the matching 

501 target found in the most trusted role. 

502 """ 

503 

504 # List of delegations to be interrogated. A (role, parent role) pair 

505 # is needed to load and verify the delegated targets metadata. 

506 delegations_to_visit = [(Targets.type, Root.type)] 

507 visited_role_names: set[str] = set() 

508 

509 # Preorder depth-first traversal of the graph of target delegations. 

510 while ( 

511 len(visited_role_names) <= self.config.max_delegations 

512 and len(delegations_to_visit) > 0 

513 ): 

514 # Pop the role name from the top of the stack. 

515 role_name, parent_role = delegations_to_visit.pop(-1) 

516 

517 # Skip any visited current role to prevent cycles. 

518 if role_name in visited_role_names: 

519 logger.debug("Skipping visited current role %s", role_name) 

520 continue 

521 

522 # The metadata for 'role_name' must be downloaded/updated before 

523 # its targets, delegations, and child roles can be inspected. 

524 targets = self._load_targets(role_name, parent_role) 

525 

526 target = targets.targets.get(target_filepath) 

527 

528 if target is not None: 

529 logger.debug("Found target in current role %s", role_name) 

530 return target 

531 

532 # After preorder check, add current role to set of visited roles. 

533 visited_role_names.add(role_name) 

534 

535 if targets.delegations is not None: 

536 child_roles_to_visit = [] 

537 # NOTE: This may be a slow operation if there are many 

538 # delegated roles. 

539 for ( 

540 child_name, 

541 terminating, 

542 ) in targets.delegations.get_roles_for_target(target_filepath): 

543 logger.debug("Adding child role %s", child_name) 

544 child_roles_to_visit.append((child_name, role_name)) 

545 if terminating: 

546 logger.debug("Not backtracking to other roles") 

547 delegations_to_visit = [] 

548 break 

549 # Push 'child_roles_to_visit' in reverse order of appearance 

550 # onto 'delegations_to_visit'. Roles are popped from the end of 

551 # the list. 

552 child_roles_to_visit.reverse() 

553 delegations_to_visit.extend(child_roles_to_visit) 

554 

555 if len(delegations_to_visit) > 0: 

556 logger.debug( 

557 "%d roles left to visit, but allowed at most %d delegations", 

558 len(delegations_to_visit), 

559 self.config.max_delegations, 

560 ) 

561 

562 # If this point is reached then target is not found, return None 

563 return None 

564 

565 

566def _ensure_trailing_slash(url: str) -> str: 

567 """Return url guaranteed to end in a slash.""" 

568 return url if url.endswith("/") else f"{url}/"