1# Copyright 2020, New York University and the TUF contributors
2# SPDX-License-Identifier: MIT OR Apache-2.0
3
4"""Client update workflow implementation.
5
6The ``Updater`` class provides an implementation of the
7`TUF client workflow
8<https://theupdateframework.github.io/specification/latest/#detailed-client-workflow>`_.
9``Updater`` provides an API to query available targets and to download them in a
10secure manner: All downloaded files are verified by signed metadata.
11
12High-level description of ``Updater`` functionality:
13 * Initializing an ``Updater`` loads and validates the trusted local root
14 metadata: This root metadata is used as the source of trust for all other
15 metadata. Updater should always be initialized with the ``bootstrap``
16 argument: if this is not possible, it can be initialized from cache only.
17 * ``refresh()`` can optionally be called to update and load all top-level
18 metadata as described in the specification, using both locally cached
19 metadata and metadata downloaded from the remote repository. If refresh is
20 not done explicitly, it will happen automatically during the first target
21 info lookup.
22 * ``Updater`` can be used to download targets. For each target:
23
24 * ``Updater.get_targetinfo()`` is first used to find information about a
25 specific target. This will load new targets metadata as needed (from
26 local cache or remote repository).
27 * ``Updater.find_cached_target()`` can optionally be used to check if a
28 target file is already locally cached.
29 * ``Updater.download_target()`` downloads a target file and ensures it is
30 verified correct by the metadata.
31
32Note that applications using ``Updater`` should be 'single instance'
33applications: running multiple instances that use the same cache directories at
34the same time is not supported.
35
36A simple example of using the Updater to implement a Python TUF client that
37downloads target files is available in `examples/client
38<https://github.com/theupdateframework/python-tuf/tree/develop/examples/client>`_.
39
40Notes on how Updater uses HTTP by default:
41 * urllib3 is the HTTP library
42 * Typically all requests are retried by urllib3 three times (in cases where
43 this seems useful)
44 * Operating system certificate store is used for TLS, in other words
45 ``certifi`` is not used as the certificate source
46 * Proxy use can be configured with ``https_proxy`` and other similar
47 environment variables
48
49All of the HTTP decisions can be changed with ``fetcher`` argument:
50Custom ``FetcherInterface`` implementations are possible. The alternative
51``RequestsFetcher`` implementation is also provided (although deprecated).
52"""
53
54from __future__ import annotations
55
56import contextlib
57import logging
58import os
59import shutil
60import tempfile
61from pathlib import Path
62from typing import TYPE_CHECKING, cast
63from urllib import parse
64
65from tuf.api import exceptions
66from tuf.api.metadata import Root, Snapshot, TargetFile, Targets, Timestamp
67from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet
68from tuf.ngclient.config import EnvelopeType, UpdaterConfig
69from tuf.ngclient.urllib3_fetcher import Urllib3Fetcher
70
71if TYPE_CHECKING:
72 from tuf.ngclient.fetcher import FetcherInterface
73
74logger = logging.getLogger(__name__)
75
76
77class Updater:
78 """Creates a new ``Updater`` instance and loads trusted root metadata.
79
80 Args:
81 metadata_dir: Local metadata directory. Directory must be
82 writable and it must contain a trusted root.json file
83 metadata_base_url: Base URL for all remote metadata downloads
84 target_dir: Local targets directory. Directory must be writable. It
85 will be used as the default target download directory by
86 ``find_cached_target()`` and ``download_target()``
87 target_base_url: ``Optional``; Default base URL for all remote target
88 downloads. Can be individually set in ``download_target()``
89 fetcher: ``Optional``; ``FetcherInterface`` implementation used to
90 download both metadata and targets. Default is ``Urllib3Fetcher``
91 config: ``Optional``; ``UpdaterConfig`` could be used to setup common
92 configuration options.
93 bootstrap: ``Optional``; initial root metadata. A bootstrap root should
94 always be provided. If it is not, the current root.json in the
95 metadata cache is used as the initial root.
96
97 Raises:
98 OSError: Local root.json cannot be read
99 RepositoryError: Local root.json is invalid
100 """
101
102 def __init__(
103 self,
104 metadata_dir: str,
105 metadata_base_url: str,
106 target_dir: str | None = None,
107 target_base_url: str | None = None,
108 fetcher: FetcherInterface | None = None,
109 config: UpdaterConfig | None = None,
110 bootstrap: bytes | None = None,
111 ):
112 self._dir = metadata_dir
113 self._metadata_base_url = _ensure_trailing_slash(metadata_base_url)
114 self.target_dir = target_dir
115 if target_base_url is None:
116 self._target_base_url = None
117 else:
118 self._target_base_url = _ensure_trailing_slash(target_base_url)
119
120 self.config = config or UpdaterConfig()
121 if fetcher is not None:
122 self._fetcher = fetcher
123 else:
124 self._fetcher = Urllib3Fetcher(
125 app_user_agent=self.config.app_user_agent
126 )
127 supported_envelopes = [EnvelopeType.METADATA, EnvelopeType.SIMPLE]
128 if self.config.envelope_type not in supported_envelopes:
129 raise ValueError(
130 f"config: envelope_type must be one of {supported_envelopes}, "
131 f"got '{self.config.envelope_type}'"
132 )
133
134 if not bootstrap:
135 # if no root was provided, use the cached non-versioned root.json
136 bootstrap = self._load_local_metadata(Root.type)
137
138 # Load the initial root, make sure it's cached
139 self._trusted_set = TrustedMetadataSet(
140 bootstrap, self.config.envelope_type
141 )
142 self._persist_root(self._trusted_set.root.version, bootstrap)
143 self._update_root_symlink()
144
145 def refresh(self) -> None:
146 """Refresh top-level metadata.
147
148 Downloads, verifies, and loads metadata for the top-level roles in the
149 specified order (root -> timestamp -> snapshot -> targets) implementing
150 all the checks required in the TUF client workflow.
151
152 A ``refresh()`` can be done only once during the lifetime of an Updater.
153 If ``refresh()`` has not been explicitly called before the first
154 ``get_targetinfo()`` call, it will be done implicitly at that time.
155
156 The metadata for delegated roles is not updated by ``refresh()``:
157 that happens on demand during ``get_targetinfo()``. However, if the
158 repository uses `consistent_snapshot
159 <https://theupdateframework.github.io/specification/latest/#consistent-snapshots>`_,
160 then all metadata downloaded by the Updater will use the same consistent
161 repository state.
162
163 Raises:
164 OSError: New metadata could not be written to disk
165 RepositoryError: Metadata failed to verify in some way
166 DownloadError: Download of a metadata file failed in some way
167 """
168
169 self._load_root()
170 self._load_timestamp()
171 self._load_snapshot()
172 self._load_targets(Targets.type, Root.type)
173
174 def _generate_target_file_path(self, targetinfo: TargetFile) -> str:
175 if self.target_dir is None:
176 raise ValueError("target_dir must be set if filepath is not given")
177
178 # Use URL encoded target path as filename
179 filename = parse.quote(targetinfo.path, "")
180 return os.path.join(self.target_dir, filename)
181
182 def get_targetinfo(self, target_path: str) -> TargetFile | None:
183 """Return ``TargetFile`` instance with information for ``target_path``.
184
185 The return value can be used as an argument to
186 ``download_target()`` and ``find_cached_target()``.
187
188 If ``refresh()`` has not been called before calling
189 ``get_targetinfo()``, the refresh will be done implicitly.
190
191 As a side-effect this method downloads all the additional (delegated
192 targets) metadata it needs to return the target information.
193
194 Args:
195 target_path: `path-relative-URL string
196 <https://url.spec.whatwg.org/#path-relative-url-string>`_
197 that uniquely identifies the target within the repository.
198
199 Raises:
200 OSError: New metadata could not be written to disk
201 RepositoryError: Metadata failed to verify in some way
202 DownloadError: Download of a metadata file failed in some way
203
204 Returns:
205 ``TargetFile`` instance or ``None``.
206 """
207
208 if Targets.type not in self._trusted_set:
209 self.refresh()
210 return self._preorder_depth_first_walk(target_path)
211
212 def find_cached_target(
213 self,
214 targetinfo: TargetFile,
215 filepath: str | None = None,
216 ) -> str | None:
217 """Check whether a local file is an up to date target.
218
219 Args:
220 targetinfo: ``TargetFile`` from ``get_targetinfo()``.
221 filepath: Local path to file. If ``None``, a file path is
222 generated based on ``target_dir`` constructor argument.
223
224 Raises:
225 ValueError: Incorrect arguments
226
227 Returns:
228 Local file path if the file is an up to date target file.
229 ``None`` if file is not found or it is not up to date.
230 """
231
232 if filepath is None:
233 filepath = self._generate_target_file_path(targetinfo)
234
235 try:
236 with open(filepath, "rb") as target_file:
237 targetinfo.verify_length_and_hashes(target_file)
238 return filepath
239 except (OSError, exceptions.LengthOrHashMismatchError):
240 return None
241
242 def download_target(
243 self,
244 targetinfo: TargetFile,
245 filepath: str | None = None,
246 target_base_url: str | None = None,
247 ) -> str:
248 """Download the target file specified by ``targetinfo``.
249
250 Args:
251 targetinfo: ``TargetFile`` from ``get_targetinfo()``.
252 filepath: Local path to download into. If ``None``, the file is
253 downloaded into directory defined by ``target_dir`` constructor
254 argument using a generated filename. If file already exists,
255 it is overwritten.
256 target_base_url: Base URL used to form the final target
257 download URL. Default is the value provided in ``Updater()``
258
259 Raises:
260 ValueError: Invalid arguments
261 DownloadError: Download of the target file failed in some way
262 RepositoryError: Downloaded target failed to be verified in some way
263 OSError: Failed to write target to file
264
265 Returns:
266 Local path to downloaded file
267 """
268
269 if filepath is None:
270 filepath = self._generate_target_file_path(targetinfo)
271 Path(filepath).parent.mkdir(exist_ok=True, parents=True)
272
273 if target_base_url is None:
274 if self._target_base_url is None:
275 raise ValueError(
276 "target_base_url must be set in either "
277 "download_target() or constructor"
278 )
279
280 target_base_url = self._target_base_url
281 else:
282 target_base_url = _ensure_trailing_slash(target_base_url)
283
284 target_filepath = targetinfo.path
285 consistent_snapshot = self._trusted_set.root.consistent_snapshot
286 if consistent_snapshot and self.config.prefix_targets_with_hash:
287 hashes = list(targetinfo.hashes.values())
288 dirname, sep, basename = target_filepath.rpartition("/")
289 target_filepath = f"{dirname}{sep}{hashes[0]}.{basename}"
290 full_url = f"{target_base_url}{target_filepath}"
291
292 with self._fetcher.download_file(
293 full_url, targetinfo.length
294 ) as target_file:
295 targetinfo.verify_length_and_hashes(target_file)
296
297 target_file.seek(0)
298 with open(filepath, "wb") as destination_file:
299 shutil.copyfileobj(target_file, destination_file)
300
301 logger.debug("Downloaded target %s", targetinfo.path)
302 return filepath
303
304 def _download_metadata(
305 self, rolename: str, length: int, version: int | None = None
306 ) -> bytes:
307 """Download a metadata file and return it as bytes."""
308 encoded_name = parse.quote(rolename, "")
309 if version is None:
310 url = f"{self._metadata_base_url}{encoded_name}.json"
311 else:
312 url = f"{self._metadata_base_url}{version}.{encoded_name}.json"
313 return self._fetcher.download_bytes(url, length)
314
315 def _load_local_metadata(self, rolename: str) -> bytes:
316 encoded_name = parse.quote(rolename, "")
317 with open(os.path.join(self._dir, f"{encoded_name}.json"), "rb") as f:
318 return f.read()
319
320 def _persist_metadata(self, rolename: str, data: bytes) -> None:
321 """Write metadata to disk atomically to avoid data loss.
322
323 Use a filename _not_ prefixed with version (e.g. "timestamp.json")
324 . Encode the rolename to avoid issues with e.g. path separators
325 """
326
327 encoded_name = parse.quote(rolename, "")
328 filename = os.path.join(self._dir, f"{encoded_name}.json")
329 self._persist_file(filename, data)
330
331 def _persist_root(self, version: int, data: bytes) -> None:
332 """Write root metadata to disk atomically to avoid data loss.
333
334 The metadata is stored with version prefix (e.g.
335 "root_history/1.root.json").
336 """
337 rootdir = Path(self._dir, "root_history")
338 rootdir.mkdir(exist_ok=True, parents=True)
339 self._persist_file(str(rootdir / f"{version}.root.json"), data)
340
341 def _persist_file(self, filename: str, data: bytes) -> None:
342 """Write a file to disk atomically to avoid data loss."""
343 temp_file_name = None
344
345 try:
346 with tempfile.NamedTemporaryFile(
347 dir=self._dir, delete=False
348 ) as temp_file:
349 temp_file_name = temp_file.name
350 temp_file.write(data)
351 os.replace(temp_file.name, filename)
352 except OSError as e:
353 # remove tempfile if we managed to create one,
354 # then let the exception happen
355 if temp_file_name is not None:
356 with contextlib.suppress(FileNotFoundError):
357 os.remove(temp_file_name)
358 raise e
359
360 def _update_root_symlink(self) -> None:
361 """Symlink root.json to current trusted root version in root_history/"""
362 linkname = os.path.join(self._dir, "root.json")
363 version = self._trusted_set.root.version
364 current = os.path.join("root_history", f"{version}.root.json")
365 with contextlib.suppress(FileNotFoundError):
366 os.remove(linkname)
367 os.symlink(current, linkname)
368
369 def _load_root(self) -> None:
370 """Load root metadata.
371
372 Sequentially load newer root metadata versions. First try to load from
373 local cache and if that does not work, from the remote repository.
374
375 If metadata is loaded from remote repository, store it in local cache.
376 """
377
378 # Update the root role
379 lower_bound = self._trusted_set.root.version + 1
380 upper_bound = lower_bound + self.config.max_root_rotations
381
382 try:
383 for next_version in range(lower_bound, upper_bound):
384 # look for next_version in local cache
385 try:
386 root_path = os.path.join(
387 self._dir, "root_history", f"{next_version}.root.json"
388 )
389 with open(root_path, "rb") as f:
390 self._trusted_set.update_root(f.read())
391 continue
392 except (OSError, exceptions.RepositoryError) as e:
393 # this root did not exist locally or is invalid
394 logger.debug("Local root is not valid: %s", e)
395
396 # next_version was not found locally, try remote
397 try:
398 data = self._download_metadata(
399 Root.type,
400 self.config.root_max_length,
401 next_version,
402 )
403 self._trusted_set.update_root(data)
404 self._persist_root(next_version, data)
405
406 except exceptions.DownloadHTTPError as exception:
407 if exception.status_code not in {403, 404}:
408 raise
409 # 404/403 means current root is newest available
410 break
411 finally:
412 # Make sure the non-versioned root.json links to current version
413 self._update_root_symlink()
414
415 def _load_timestamp(self) -> None:
416 """Load local and remote timestamp metadata."""
417 try:
418 data = self._load_local_metadata(Timestamp.type)
419 self._trusted_set.update_timestamp(data)
420 except (OSError, exceptions.RepositoryError) as e:
421 # Local timestamp does not exist or is invalid
422 logger.debug("Local timestamp not valid as final: %s", e)
423
424 # Load from remote (whether local load succeeded or not)
425 data = self._download_metadata(
426 Timestamp.type, self.config.timestamp_max_length
427 )
428 try:
429 self._trusted_set.update_timestamp(data)
430 except exceptions.EqualVersionNumberError:
431 # If the new timestamp version is the same as current, discard the
432 # new timestamp. This is normal and it shouldn't raise any error.
433 return
434
435 self._persist_metadata(Timestamp.type, data)
436
437 def _load_snapshot(self) -> None:
438 """Load local (and if needed remote) snapshot metadata."""
439 try:
440 data = self._load_local_metadata(Snapshot.type)
441 self._trusted_set.update_snapshot(data, trusted=True)
442 logger.debug("Local snapshot is valid: not downloading new one")
443 except (OSError, exceptions.RepositoryError) as e:
444 # Local snapshot does not exist or is invalid: update from remote
445 logger.debug("Local snapshot not valid as final: %s", e)
446
447 snapshot_meta = self._trusted_set.timestamp.snapshot_meta
448 length = snapshot_meta.length or self.config.snapshot_max_length
449 version = None
450 if self._trusted_set.root.consistent_snapshot:
451 version = snapshot_meta.version
452
453 data = self._download_metadata(Snapshot.type, length, version)
454 self._trusted_set.update_snapshot(data)
455 self._persist_metadata(Snapshot.type, data)
456
457 def _load_targets(self, role: str, parent_role: str) -> Targets:
458 """Load local (and if needed remote) metadata for ``role``."""
459
460 # Avoid loading 'role' more than once during "get_targetinfo"
461 if role in self._trusted_set:
462 return cast(Targets, self._trusted_set[role])
463
464 try:
465 data = self._load_local_metadata(role)
466 delegated_targets = self._trusted_set.update_delegated_targets(
467 data, role, parent_role
468 )
469 logger.debug("Local %s is valid: not downloading new one", role)
470 return delegated_targets
471 except (OSError, exceptions.RepositoryError) as e:
472 # Local 'role' does not exist or is invalid: update from remote
473 logger.debug("Failed to load local %s: %s", role, e)
474
475 snapshot = self._trusted_set.snapshot
476 metainfo = snapshot.meta.get(f"{role}.json")
477 if metainfo is None:
478 raise exceptions.RepositoryError(
479 f"Role {role} was delegated but is not part of snapshot"
480 ) from None
481
482 length = metainfo.length or self.config.targets_max_length
483 version = None
484 if self._trusted_set.root.consistent_snapshot:
485 version = metainfo.version
486
487 data = self._download_metadata(role, length, version)
488 delegated_targets = self._trusted_set.update_delegated_targets(
489 data, role, parent_role
490 )
491 self._persist_metadata(role, data)
492
493 return delegated_targets
494
495 def _preorder_depth_first_walk(
496 self, target_filepath: str
497 ) -> TargetFile | None:
498 """
499 Interrogates the tree of target delegations in order of appearance
500 (which implicitly order trustworthiness), and returns the matching
501 target found in the most trusted role.
502 """
503
504 # List of delegations to be interrogated. A (role, parent role) pair
505 # is needed to load and verify the delegated targets metadata.
506 delegations_to_visit = [(Targets.type, Root.type)]
507 visited_role_names: set[str] = set()
508
509 # Preorder depth-first traversal of the graph of target delegations.
510 while (
511 len(visited_role_names) <= self.config.max_delegations
512 and len(delegations_to_visit) > 0
513 ):
514 # Pop the role name from the top of the stack.
515 role_name, parent_role = delegations_to_visit.pop(-1)
516
517 # Skip any visited current role to prevent cycles.
518 if role_name in visited_role_names:
519 logger.debug("Skipping visited current role %s", role_name)
520 continue
521
522 # The metadata for 'role_name' must be downloaded/updated before
523 # its targets, delegations, and child roles can be inspected.
524 targets = self._load_targets(role_name, parent_role)
525
526 target = targets.targets.get(target_filepath)
527
528 if target is not None:
529 logger.debug("Found target in current role %s", role_name)
530 return target
531
532 # After preorder check, add current role to set of visited roles.
533 visited_role_names.add(role_name)
534
535 if targets.delegations is not None:
536 child_roles_to_visit = []
537 # NOTE: This may be a slow operation if there are many
538 # delegated roles.
539 for (
540 child_name,
541 terminating,
542 ) in targets.delegations.get_roles_for_target(target_filepath):
543 logger.debug("Adding child role %s", child_name)
544 child_roles_to_visit.append((child_name, role_name))
545 if terminating:
546 logger.debug("Not backtracking to other roles")
547 delegations_to_visit = []
548 break
549 # Push 'child_roles_to_visit' in reverse order of appearance
550 # onto 'delegations_to_visit'. Roles are popped from the end of
551 # the list.
552 child_roles_to_visit.reverse()
553 delegations_to_visit.extend(child_roles_to_visit)
554
555 if len(delegations_to_visit) > 0:
556 logger.debug(
557 "%d roles left to visit, but allowed at most %d delegations",
558 len(delegations_to_visit),
559 self.config.max_delegations,
560 )
561
562 # If this point is reached then target is not found, return None
563 return None
564
565
566def _ensure_trailing_slash(url: str) -> str:
567 """Return url guaranteed to end in a slash."""
568 return url if url.endswith("/") else f"{url}/"