1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16TUF functionality for `sigstore-python`.
17"""
18
19from __future__ import annotations
20
21import logging
22from functools import lru_cache
23from pathlib import Path
24from urllib import parse
25
26import platformdirs
27from tuf.api import exceptions as TUFExceptions
28from tuf.ngclient import Updater, UpdaterConfig # type: ignore[attr-defined]
29
30from sigstore import __version__
31from sigstore._utils import read_embedded
32from sigstore.errors import TUFError
33
34_logger = logging.getLogger(__name__)
35
36DEFAULT_TUF_URL = "https://tuf-repo-cdn.sigstore.dev"
37STAGING_TUF_URL = "https://tuf-repo-cdn.sigstage.dev"
38
39
40def _get_dirs(url: str) -> tuple[Path, Path]:
41 """
42 Given a TUF repository URL, return suitable local metadata and cache directories.
43
44 These directories are not guaranteed to already exist.
45 """
46
47 app_name = "sigstore-python"
48 app_author = "sigstore"
49
50 repo_base = parse.quote(url, safe="")
51
52 tuf_data_dir = Path(platformdirs.user_data_dir(app_name, app_author)) / "tuf"
53 tuf_cache_dir = Path(platformdirs.user_cache_dir(app_name, app_author)) / "tuf"
54
55 return (tuf_data_dir / repo_base), (tuf_cache_dir / repo_base)
56
57
58class TrustUpdater:
59 """Internal trust root (certificates and keys) downloader.
60
61 TrustUpdater discovers the currently valid certificates and keys and
62 securely downloads them from the remote TUF repository at 'url'.
63
64 TrustUpdater expects to find an initial root.json in either the local
65 metadata directory for this URL, or (as special case for the sigstore.dev
66 production and staging instances) in the application resources.
67 """
68
69 def __init__(
70 self, url: str, offline: bool = False, bootstrap_root: Path | None = None
71 ) -> None:
72 """
73 Create a new `TrustUpdater`, pulling from the given `url`.
74
75 TrustUpdater expects that either embedded data contains
76 a root.json for this url or that `bootstrap_root` is provided as argument.
77
78 If not `offline`, TrustUpdater will update the TUF metadata from
79 the remote repository.
80 """
81 # not canonicalization, just handling trailing slash as common mistake:
82 url = url.rstrip("/")
83
84 self._metadata_dir, self._targets_dir = _get_dirs(url)
85
86 # Populate targets cache so we don't have to download these versions
87 self._targets_dir.mkdir(parents=True, exist_ok=True)
88
89 for artifact in ["trusted_root.json", "signing_config.v0.2.json"]:
90 artifact_path = self._targets_dir / artifact
91 if not artifact_path.exists():
92 try:
93 data = read_embedded(artifact, url)
94 artifact_path.write_bytes(data)
95 except FileNotFoundError:
96 pass # this is ok: we only have embedded data for specific repos
97
98 _logger.debug(f"TUF metadata: {self._metadata_dir}")
99 _logger.debug(f"TUF targets cache: {self._targets_dir}")
100
101 self._updater: Updater | None = None
102 if offline:
103 _logger.warning(
104 "TUF repository is loaded in offline mode; updates will not be performed"
105 )
106 else:
107 # Initialize and update the toplevel TUF metadata
108 try:
109 root_json: bytes | None = read_embedded("root.json", url)
110 except FileNotFoundError:
111 # We do not have embedded root metadata for this URL: we can still
112 # initialize _if_ given bootstrap root (i.e. during "sigstore trust-instance")
113 # or local metadata exists already (after "sigstore trust-instance")
114 root_json = bootstrap_root.read_bytes() if bootstrap_root else None
115
116 try:
117 self._updater = Updater(
118 metadata_dir=str(self._metadata_dir),
119 metadata_base_url=url,
120 target_base_url=parse.urljoin(f"{url}/", "targets/"),
121 target_dir=str(self._targets_dir),
122 config=UpdaterConfig(
123 app_user_agent=f"sigstore-python/{__version__}"
124 ),
125 bootstrap=root_json,
126 )
127 self._updater.refresh()
128 except Exception as e:
129 raise TUFError("Failed to refresh TUF metadata") from e
130
131 @lru_cache()
132 def get_trusted_root_path(self) -> str:
133 """Return local path to currently valid trusted root file"""
134 if not self._updater:
135 _logger.debug("Using unverified trusted root from cache")
136 return str(self._targets_dir / "trusted_root.json")
137
138 root_info = self._updater.get_targetinfo("trusted_root.json")
139 if root_info is None:
140 raise TUFError("Unsupported TUF configuration: no trusted root")
141 path = self._updater.find_cached_target(root_info)
142 if path is None:
143 try:
144 path = self._updater.download_target(root_info)
145 except (
146 TUFExceptions.DownloadError,
147 TUFExceptions.RepositoryError,
148 ) as e:
149 raise TUFError("Failed to download trusted key bundle") from e
150
151 _logger.debug("Found and verified trusted root")
152 return path
153
154 @lru_cache()
155 def get_signing_config_path(self) -> str:
156 """Return local path to currently valid signing config file"""
157 if not self._updater:
158 _logger.debug("Using unverified signing config from cache")
159 return str(self._targets_dir / "signing_config.v0.2.json")
160
161 root_info = self._updater.get_targetinfo("signing_config.v0.2.json")
162 if root_info is None:
163 raise TUFError("Unsupported TUF configuration: no signing config")
164 path = self._updater.find_cached_target(root_info)
165 if path is None:
166 try:
167 path = self._updater.download_target(root_info)
168 except (
169 TUFExceptions.DownloadError,
170 TUFExceptions.RepositoryError,
171 ) as e:
172 raise TUFError("Failed to download signing config") from e
173
174 _logger.debug("Found and verified signing config")
175 return path