1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16TUF functionality for `sigstore-python`.
17"""
18
19from __future__ import annotations
20
21import logging
22from functools import lru_cache
23from pathlib import Path
24from urllib import parse
25
26import platformdirs
27from tuf.api import exceptions as TUFExceptions
28from tuf.ngclient import Updater, UpdaterConfig # type: ignore[attr-defined]
29
30from sigstore import __version__
31from sigstore._utils import read_embedded
32from sigstore.errors import RootError, TUFError
33
34_logger = logging.getLogger(__name__)
35
36DEFAULT_TUF_URL = "https://tuf-repo-cdn.sigstore.dev"
37STAGING_TUF_URL = "https://tuf-repo-cdn.sigstage.dev"
38
39
40def _get_dirs(url: str) -> tuple[Path, Path]:
41 """
42 Given a TUF repository URL, return suitable local metadata and cache directories.
43
44 These directories are not guaranteed to already exist.
45 """
46
47 app_name = "sigstore-python"
48 app_author = "sigstore"
49
50 repo_base = parse.quote(url, safe="")
51
52 tuf_data_dir = Path(platformdirs.user_data_dir(app_name, app_author)) / "tuf"
53 tuf_cache_dir = Path(platformdirs.user_cache_dir(app_name, app_author)) / "tuf"
54
55 return (tuf_data_dir / repo_base), (tuf_cache_dir / repo_base)
56
57
58class TrustUpdater:
59 """Internal trust root (certificates and keys) downloader.
60
61 TrustUpdater discovers the currently valid certificates and keys and
62 securely downloads them from the remote TUF repository at 'url'.
63
64 TrustUpdater expects to find an initial root.json in either the local
65 metadata directory for this URL, or (as special case for the sigstore.dev
66 production and staging instances) in the application resources.
67 """
68
69 def __init__(self, url: str, offline: bool = False) -> None:
70 """
71 Create a new `TrustUpdater`, pulling from the given `url`.
72
73 The URL is expected to match one of `sigstore-python`'s known TUF
74 roots, i.e. for the production or staging Sigstore TUF repos.
75
76 If not `offline`, TrustUpdater will update the TUF metadata from
77 the remote repository.
78 """
79 self._repo_url = url
80 self._metadata_dir, self._targets_dir = _get_dirs(url)
81
82 rsrc_prefix: str
83 if self._repo_url == DEFAULT_TUF_URL:
84 rsrc_prefix = "prod"
85 elif self._repo_url == STAGING_TUF_URL:
86 rsrc_prefix = "staging"
87 else:
88 raise RootError
89
90 # Initialize targets cache dir
91 self._targets_dir.mkdir(parents=True, exist_ok=True)
92 trusted_root_target = self._targets_dir / "trusted_root.json"
93
94 if not trusted_root_target.exists():
95 try:
96 trusted_root_json = read_embedded("trusted_root.json", rsrc_prefix)
97 except FileNotFoundError as e:
98 raise RootError from e
99
100 trusted_root_target.write_bytes(trusted_root_json)
101
102 _logger.debug(f"TUF metadata: {self._metadata_dir}")
103 _logger.debug(f"TUF targets cache: {self._targets_dir}")
104
105 self._updater: Updater | None = None
106 if offline:
107 _logger.warning(
108 "TUF repository is loaded in offline mode; updates will not be performed"
109 )
110 else:
111 # Initialize and update the toplevel TUF metadata
112 try:
113 root_json = read_embedded("root.json", rsrc_prefix)
114 except FileNotFoundError as e:
115 raise RootError from e
116 self._updater = Updater(
117 metadata_dir=str(self._metadata_dir),
118 metadata_base_url=self._repo_url,
119 target_base_url=parse.urljoin(f"{self._repo_url}/", "targets/"),
120 target_dir=str(self._targets_dir),
121 config=UpdaterConfig(app_user_agent=f"sigstore-python/{__version__}"),
122 bootstrap=root_json,
123 )
124 try:
125 self._updater.refresh()
126 except Exception as e:
127 raise TUFError("Failed to refresh TUF metadata") from e
128
129 @lru_cache()
130 def get_trusted_root_path(self) -> str:
131 """Return local path to currently valid trusted root file"""
132 if not self._updater:
133 _logger.debug("Using unverified trusted root from cache")
134 return str(self._targets_dir / "trusted_root.json")
135
136 root_info = self._updater.get_targetinfo("trusted_root.json")
137 if root_info is None:
138 raise TUFError("Unsupported TUF configuration: no trusted root")
139 path = self._updater.find_cached_target(root_info)
140 if path is None:
141 try:
142 path = self._updater.download_target(root_info)
143 except (
144 TUFExceptions.DownloadError,
145 TUFExceptions.RepositoryError,
146 ) as e:
147 raise TUFError("Failed to download trusted key bundle") from e
148
149 _logger.debug("Found and verified trusted root")
150 return path