Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tuf/ngclient/updater.py: 16%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2020, New York University and the TUF contributors
2# SPDX-License-Identifier: MIT OR Apache-2.0
4"""Client update workflow implementation.
6The ``Updater`` class provides an implementation of the
7`TUF client workflow
8<https://theupdateframework.github.io/specification/latest/#detailed-client-workflow>`_.
9``Updater`` provides an API to query available targets and to download them in a
10secure manner: All downloaded files are verified by signed metadata.
12High-level description of ``Updater`` functionality:
13 * Initializing an ``Updater`` loads and validates the trusted local root
14 metadata: This root metadata is used as the source of trust for all other
15 metadata. Updater should always be initialized with the ``bootstrap``
16 argument: pass ``bootstrap=None`` only to explicitly opt into using the
17 cached root.json as the trust anchor.
18 * ``refresh()`` can optionally be called to update and load all top-level
19 metadata as described in the specification, using both locally cached
20 metadata and metadata downloaded from the remote repository. If refresh is
21 not done explicitly, it will happen automatically during the first target
22 info lookup.
23 * ``Updater`` can be used to download targets. For each target:
25 * ``Updater.get_targetinfo()`` is first used to find information about a
26 specific target. This will load new targets metadata as needed (from
27 local cache or remote repository).
28 * ``Updater.find_cached_target()`` can optionally be used to check if a
29 target file is already locally cached.
30 * ``Updater.download_target()`` downloads a target file and ensures it is
31 verified correct by the metadata.
33Note that applications using ``Updater`` should be 'single instance'
34applications: running multiple instances that use the same cache directories at
35the same time is not supported.
37A simple example of using the Updater to implement a Python TUF client that
38downloads target files is available in `examples/client
39<https://github.com/theupdateframework/python-tuf/tree/develop/examples/client>`_.
41Notes on how Updater uses HTTP by default:
42 * urllib3 is the HTTP library
43 * Typically all requests are retried by urllib3 three times (in cases where
44 this seems useful)
45 * Operating system certificate store is used for TLS, in other words
46 ``certifi`` is not used as the certificate source
47 * Proxy use can be configured with ``https_proxy`` and other similar
48 environment variables
50All of the HTTP decisions can be changed with ``fetcher`` argument:
51Custom ``FetcherInterface`` implementations are possible. The alternative
52``RequestsFetcher`` implementation is also provided (although deprecated).
53"""
55from __future__ import annotations
57import contextlib
58import logging
59import os
60import shutil
61import tempfile
62from pathlib import Path
63from typing import TYPE_CHECKING, cast
64from urllib import parse
66from tuf.api import exceptions
67from tuf.api.metadata import Root, Snapshot, TargetFile, Targets, Timestamp
68from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet
69from tuf.ngclient.config import EnvelopeType, UpdaterConfig
70from tuf.ngclient.urllib3_fetcher import Urllib3Fetcher
72if TYPE_CHECKING:
73 from tuf.ngclient.fetcher import FetcherInterface
75logger = logging.getLogger(__name__)
78class Updater:
79 """Creates a new ``Updater`` instance and loads trusted root metadata.
81 Args:
82 metadata_dir: Local metadata directory. Directory must be
83 writable. If ``bootstrap`` is ``None``, this directory must contain
84 a trusted root.json file.
85 metadata_base_url: Base URL for all remote metadata downloads
86 target_dir: Local targets directory. Directory must be writable. It
87 will be used as the default target download directory by
88 ``find_cached_target()`` and ``download_target()``
89 target_base_url: ``Optional``; Default base URL for all remote target
90 downloads. Can be individually set in ``download_target()``
91 fetcher: ``Optional``; ``FetcherInterface`` implementation used to
92 download both metadata and targets. Default is ``Urllib3Fetcher``
93 config: ``Optional``; ``UpdaterConfig`` could be used to setup common
94 configuration options.
95 bootstrap: Initial root metadata bytes. This argument is required.
96 Pass the embedded root metadata bytes for secure initialization.
97 Pass ``None`` only if you explicitly want to use the cached
98 root.json as the trust anchor (not recommended for most
99 deployments).
101 Raises:
102 OSError: Local root.json cannot be read
103 RepositoryError: Local root.json is invalid
104 """
106 def __init__(
107 self,
108 metadata_dir: str,
109 metadata_base_url: str,
110 target_dir: str | None = None,
111 target_base_url: str | None = None,
112 fetcher: FetcherInterface | None = None,
113 config: UpdaterConfig | None = None,
114 *,
115 bootstrap: bytes | None,
116 ):
117 self._dir = metadata_dir
118 self._metadata_base_url = _ensure_trailing_slash(metadata_base_url)
119 self.target_dir = target_dir
120 if target_base_url is None:
121 self._target_base_url = None
122 else:
123 self._target_base_url = _ensure_trailing_slash(target_base_url)
125 self.config = config or UpdaterConfig()
126 if fetcher is not None:
127 self._fetcher = fetcher
128 else:
129 self._fetcher = Urllib3Fetcher(
130 app_user_agent=self.config.app_user_agent
131 )
132 supported_envelopes = [EnvelopeType.METADATA, EnvelopeType.SIMPLE]
133 if self.config.envelope_type not in supported_envelopes:
134 raise ValueError(
135 f"config: envelope_type must be one of {supported_envelopes}, "
136 f"got '{self.config.envelope_type}'"
137 )
139 if bootstrap is None:
140 # if no root was provided, use the cached non-versioned root.json
141 bootstrap = self._load_local_metadata(Root.type)
143 # Load the initial root, make sure it's cached
144 self._trusted_set = TrustedMetadataSet(
145 bootstrap, self.config.envelope_type
146 )
147 self._persist_root(self._trusted_set.root.version, bootstrap)
148 self._update_root_symlink()
150 def refresh(self) -> None:
151 """Refresh top-level metadata.
153 Downloads, verifies, and loads metadata for the top-level roles in the
154 specified order (root -> timestamp -> snapshot -> targets) implementing
155 all the checks required in the TUF client workflow.
157 A ``refresh()`` can be done only once during the lifetime of an Updater.
158 If ``refresh()`` has not been explicitly called before the first
159 ``get_targetinfo()`` call, it will be done implicitly at that time.
161 The metadata for delegated roles is not updated by ``refresh()``:
162 that happens on demand during ``get_targetinfo()``. However, if the
163 repository uses `consistent_snapshot
164 <https://theupdateframework.github.io/specification/latest/#consistent-snapshots>`_,
165 then all metadata downloaded by the Updater will use the same consistent
166 repository state.
168 Raises:
169 OSError: New metadata could not be written to disk
170 RepositoryError: Metadata failed to verify in some way
171 DownloadError: Download of a metadata file failed in some way
172 """
174 self._load_root()
175 self._load_timestamp()
176 self._load_snapshot()
177 self._load_targets(Targets.type, Root.type)
179 def _generate_target_file_path(self, targetinfo: TargetFile) -> str:
180 if self.target_dir is None:
181 raise ValueError("target_dir must be set if filepath is not given")
183 # Use URL encoded target path as filename
184 filename = parse.quote(targetinfo.path, "")
185 return os.path.join(self.target_dir, filename)
187 def get_targetinfo(self, target_path: str) -> TargetFile | None:
188 """Return ``TargetFile`` instance with information for ``target_path``.
190 The return value can be used as an argument to
191 ``download_target()`` and ``find_cached_target()``.
193 If ``refresh()`` has not been called before calling
194 ``get_targetinfo()``, the refresh will be done implicitly.
196 As a side-effect this method downloads all the additional (delegated
197 targets) metadata it needs to return the target information.
199 Args:
200 target_path: `path-relative-URL string
201 <https://url.spec.whatwg.org/#path-relative-url-string>`_
202 that uniquely identifies the target within the repository.
204 Raises:
205 OSError: New metadata could not be written to disk
206 RepositoryError: Metadata failed to verify in some way
207 DownloadError: Download of a metadata file failed in some way
209 Returns:
210 ``TargetFile`` instance or ``None``.
211 """
213 if Targets.type not in self._trusted_set:
214 self.refresh()
215 return self._preorder_depth_first_walk(target_path)
217 def find_cached_target(
218 self,
219 targetinfo: TargetFile,
220 filepath: str | None = None,
221 ) -> str | None:
222 """Check whether a local file is an up to date target.
224 Args:
225 targetinfo: ``TargetFile`` from ``get_targetinfo()``.
226 filepath: Local path to file. If ``None``, a file path is
227 generated based on ``target_dir`` constructor argument.
229 Raises:
230 ValueError: Incorrect arguments
232 Returns:
233 Local file path if the file is an up to date target file.
234 ``None`` if file is not found or it is not up to date.
235 """
237 if filepath is None:
238 filepath = self._generate_target_file_path(targetinfo)
240 try:
241 with open(filepath, "rb") as target_file:
242 targetinfo.verify_length_and_hashes(target_file)
243 return filepath
244 except (OSError, exceptions.LengthOrHashMismatchError):
245 return None
247 def download_target(
248 self,
249 targetinfo: TargetFile,
250 filepath: str | None = None,
251 target_base_url: str | None = None,
252 ) -> str:
253 """Download the target file specified by ``targetinfo``.
255 Args:
256 targetinfo: ``TargetFile`` from ``get_targetinfo()``.
257 filepath: Local path to download into. If ``None``, the file is
258 downloaded into directory defined by ``target_dir`` constructor
259 argument using a generated filename. If file already exists,
260 it is overwritten.
261 target_base_url: Base URL used to form the final target
262 download URL. Default is the value provided in ``Updater()``
264 Raises:
265 ValueError: Invalid arguments
266 DownloadError: Download of the target file failed in some way
267 RepositoryError: Downloaded target failed to be verified in some way
268 OSError: Failed to write target to file
270 Returns:
271 Local path to downloaded file
272 """
274 if filepath is None:
275 filepath = self._generate_target_file_path(targetinfo)
276 Path(filepath).parent.mkdir(exist_ok=True, parents=True)
278 if target_base_url is None:
279 if self._target_base_url is None:
280 raise ValueError(
281 "target_base_url must be set in either "
282 "download_target() or constructor"
283 )
285 target_base_url = self._target_base_url
286 else:
287 target_base_url = _ensure_trailing_slash(target_base_url)
289 target_filepath = targetinfo.path
290 consistent_snapshot = self._trusted_set.root.consistent_snapshot
291 if consistent_snapshot and self.config.prefix_targets_with_hash:
292 hashes = list(targetinfo.hashes.values())
293 dirname, sep, basename = target_filepath.rpartition("/")
294 target_filepath = f"{dirname}{sep}{hashes[0]}.{basename}"
295 full_url = f"{target_base_url}{target_filepath}"
297 with self._fetcher.download_file(
298 full_url, targetinfo.length
299 ) as target_file:
300 targetinfo.verify_length_and_hashes(target_file)
302 target_file.seek(0)
303 with open(filepath, "wb") as destination_file:
304 shutil.copyfileobj(target_file, destination_file)
306 logger.debug("Downloaded target %s", targetinfo.path)
307 return filepath
309 def _download_metadata(
310 self, rolename: str, length: int, version: int | None = None
311 ) -> bytes:
312 """Download a metadata file and return it as bytes."""
313 encoded_name = parse.quote(rolename, "")
314 if version is None:
315 url = f"{self._metadata_base_url}{encoded_name}.json"
316 else:
317 url = f"{self._metadata_base_url}{version}.{encoded_name}.json"
318 return self._fetcher.download_bytes(url, length)
320 def _load_local_metadata(self, rolename: str) -> bytes:
321 encoded_name = parse.quote(rolename, "")
322 with open(os.path.join(self._dir, f"{encoded_name}.json"), "rb") as f:
323 return f.read()
325 def _persist_metadata(self, rolename: str, data: bytes) -> None:
326 """Write metadata to disk atomically to avoid data loss.
328 Use a filename _not_ prefixed with version (e.g. "timestamp.json")
329 . Encode the rolename to avoid issues with e.g. path separators
330 """
332 encoded_name = parse.quote(rolename, "")
333 filename = os.path.join(self._dir, f"{encoded_name}.json")
334 self._persist_file(filename, data)
336 def _persist_root(self, version: int, data: bytes) -> None:
337 """Write root metadata to disk atomically to avoid data loss.
339 The metadata is stored with version prefix (e.g.
340 "root_history/1.root.json").
341 """
342 rootdir = Path(self._dir, "root_history")
343 rootdir.mkdir(exist_ok=True, parents=True)
344 self._persist_file(str(rootdir / f"{version}.root.json"), data)
346 def _persist_file(self, filename: str, data: bytes) -> None:
347 """Write a file to disk atomically to avoid data loss."""
348 temp_file_name = None
350 try:
351 with tempfile.NamedTemporaryFile(
352 dir=self._dir, delete=False
353 ) as temp_file:
354 temp_file_name = temp_file.name
355 temp_file.write(data)
356 os.replace(temp_file.name, filename)
357 except OSError as e:
358 # remove tempfile if we managed to create one,
359 # then let the exception happen
360 if temp_file_name is not None:
361 with contextlib.suppress(FileNotFoundError):
362 os.remove(temp_file_name)
363 raise e
365 def _update_root_symlink(self) -> None:
366 """Symlink root.json to current trusted root version in root_history/"""
367 linkname = os.path.join(self._dir, "root.json")
368 version = self._trusted_set.root.version
369 current = os.path.join("root_history", f"{version}.root.json")
370 with contextlib.suppress(FileNotFoundError):
371 os.remove(linkname)
372 os.symlink(current, linkname)
374 def _load_root(self) -> None:
375 """Load root metadata.
377 Sequentially load newer root metadata versions. First try to load from
378 local cache and if that does not work, from the remote repository.
380 If metadata is loaded from remote repository, store it in local cache.
381 """
383 # Update the root role
384 lower_bound = self._trusted_set.root.version + 1
385 upper_bound = lower_bound + self.config.max_root_rotations
387 try:
388 for next_version in range(lower_bound, upper_bound):
389 # look for next_version in local cache
390 try:
391 root_path = os.path.join(
392 self._dir, "root_history", f"{next_version}.root.json"
393 )
394 with open(root_path, "rb") as f:
395 self._trusted_set.update_root(f.read())
396 continue
397 except (OSError, exceptions.RepositoryError) as e:
398 # this root did not exist locally or is invalid
399 logger.debug("Local root is not valid: %s", e)
401 # next_version was not found locally, try remote
402 try:
403 data = self._download_metadata(
404 Root.type,
405 self.config.root_max_length,
406 next_version,
407 )
408 self._trusted_set.update_root(data)
409 self._persist_root(next_version, data)
411 except exceptions.DownloadHTTPError as exception:
412 if exception.status_code not in {403, 404}:
413 raise
414 # 404/403 means current root is newest available
415 break
416 finally:
417 # Make sure the non-versioned root.json links to current version
418 self._update_root_symlink()
420 def _load_timestamp(self) -> None:
421 """Load local and remote timestamp metadata."""
422 try:
423 data = self._load_local_metadata(Timestamp.type)
424 self._trusted_set.update_timestamp(data)
425 except (OSError, exceptions.RepositoryError) as e:
426 # Local timestamp does not exist or is invalid
427 logger.debug("Local timestamp not valid as final: %s", e)
429 # Load from remote (whether local load succeeded or not)
430 data = self._download_metadata(
431 Timestamp.type, self.config.timestamp_max_length
432 )
433 try:
434 self._trusted_set.update_timestamp(data)
435 except exceptions.EqualVersionNumberError:
436 # If the new timestamp version is the same as current, discard the
437 # new timestamp. This is normal and it shouldn't raise any error.
438 return
440 self._persist_metadata(Timestamp.type, data)
442 def _load_snapshot(self) -> None:
443 """Load local (and if needed remote) snapshot metadata."""
444 try:
445 data = self._load_local_metadata(Snapshot.type)
446 self._trusted_set.update_snapshot(data, trusted=True)
447 logger.debug("Local snapshot is valid: not downloading new one")
448 except (OSError, exceptions.RepositoryError) as e:
449 # Local snapshot does not exist or is invalid: update from remote
450 logger.debug("Local snapshot not valid as final: %s", e)
452 snapshot_meta = self._trusted_set.timestamp.snapshot_meta
453 length = snapshot_meta.length or self.config.snapshot_max_length
454 version = None
455 if self._trusted_set.root.consistent_snapshot:
456 version = snapshot_meta.version
458 data = self._download_metadata(Snapshot.type, length, version)
459 self._trusted_set.update_snapshot(data)
460 self._persist_metadata(Snapshot.type, data)
462 def _load_targets(self, role: str, parent_role: str) -> Targets:
463 """Load local (and if needed remote) metadata for ``role``."""
465 # Avoid loading 'role' more than once during "get_targetinfo"
466 if role in self._trusted_set:
467 return cast("Targets", self._trusted_set[role])
469 try:
470 data = self._load_local_metadata(role)
471 delegated_targets = self._trusted_set.update_delegated_targets(
472 data, role, parent_role
473 )
474 logger.debug("Local %s is valid: not downloading new one", role)
475 return delegated_targets
476 except (OSError, exceptions.RepositoryError) as e:
477 # Local 'role' does not exist or is invalid: update from remote
478 logger.debug("Failed to load local %s: %s", role, e)
480 snapshot = self._trusted_set.snapshot
481 metainfo = snapshot.meta.get(f"{role}.json")
482 if metainfo is None:
483 raise exceptions.RepositoryError(
484 f"Role {role} was delegated but is not part of snapshot"
485 ) from None
487 length = metainfo.length or self.config.targets_max_length
488 version = None
489 if self._trusted_set.root.consistent_snapshot:
490 version = metainfo.version
492 data = self._download_metadata(role, length, version)
493 delegated_targets = self._trusted_set.update_delegated_targets(
494 data, role, parent_role
495 )
496 self._persist_metadata(role, data)
498 return delegated_targets
500 def _preorder_depth_first_walk(
501 self, target_filepath: str
502 ) -> TargetFile | None:
503 """
504 Interrogates the tree of target delegations in order of appearance
505 (which implicitly order trustworthiness), and returns the matching
506 target found in the most trusted role.
507 """
509 # List of delegations to be interrogated. A (role, parent role) pair
510 # is needed to load and verify the delegated targets metadata.
511 delegations_to_visit = [(Targets.type, Root.type)]
512 visited_role_names: set[str] = set()
514 # Preorder depth-first traversal of the graph of target delegations.
515 while (
516 len(visited_role_names) <= self.config.max_delegations
517 and len(delegations_to_visit) > 0
518 ):
519 # Pop the role name from the top of the stack.
520 role_name, parent_role = delegations_to_visit.pop(-1)
522 # Skip any visited current role to prevent cycles.
523 if role_name in visited_role_names:
524 logger.debug("Skipping visited current role %s", role_name)
525 continue
527 # The metadata for 'role_name' must be downloaded/updated before
528 # its targets, delegations, and child roles can be inspected.
529 targets = self._load_targets(role_name, parent_role)
531 target = targets.targets.get(target_filepath)
533 if target is not None:
534 logger.debug("Found target in current role %s", role_name)
535 return target
537 # After preorder check, add current role to set of visited roles.
538 visited_role_names.add(role_name)
540 if targets.delegations is not None:
541 child_roles_to_visit = []
542 # NOTE: This may be a slow operation if there are many
543 # delegated roles.
544 for (
545 child_name,
546 terminating,
547 ) in targets.delegations.get_roles_for_target(target_filepath):
548 logger.debug("Adding child role %s", child_name)
549 child_roles_to_visit.append((child_name, role_name))
550 if terminating:
551 logger.debug("Not backtracking to other roles")
552 delegations_to_visit = []
553 break
554 # Push 'child_roles_to_visit' in reverse order of appearance
555 # onto 'delegations_to_visit'. Roles are popped from the end of
556 # the list.
557 child_roles_to_visit.reverse()
558 delegations_to_visit.extend(child_roles_to_visit)
560 if len(delegations_to_visit) > 0:
561 logger.debug(
562 "%d roles left to visit, but allowed at most %d delegations",
563 len(delegations_to_visit),
564 self.config.max_delegations,
565 )
567 # If this point is reached then target is not found, return None
568 return None
571def _ensure_trailing_slash(url: str) -> str:
572 """Return url guaranteed to end in a slash."""
573 return url if url.endswith("/") else f"{url}/"