1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16TUF functionality for `sigstore-python`.
17"""
18
19from __future__ import annotations
20
21import logging
22from functools import lru_cache
23from pathlib import Path
24from urllib import parse
25
26import platformdirs
27from tuf.api import exceptions as TUFExceptions
28from tuf.ngclient import Updater, UpdaterConfig # type: ignore[attr-defined]
29
30from sigstore import __version__
31from sigstore._utils import read_embedded
32from sigstore.errors import TUFError
33
34_logger = logging.getLogger(__name__)
35
36DEFAULT_TUF_URL = "https://tuf-repo-cdn.sigstore.dev"
37STAGING_TUF_URL = "https://tuf-repo-cdn.sigstage.dev"
38
39
40def _get_dirs(url: str) -> tuple[Path, Path]:
41 """
42 Given a TUF repository URL, return suitable local metadata and cache directories.
43
44 These directories are not guaranteed to already exist.
45 """
46
47 app_name = "sigstore-python"
48 app_author = "sigstore"
49
50 repo_base = parse.quote(url, safe="")
51
52 tuf_data_dir = Path(platformdirs.user_data_dir(app_name, app_author)) / "tuf"
53 tuf_cache_dir = Path(platformdirs.user_cache_dir(app_name, app_author)) / "tuf"
54
55 return (tuf_data_dir / repo_base), (tuf_cache_dir / repo_base)
56
57
58class TrustUpdater:
59 """Internal trust root (certificates and keys) downloader.
60
61 TrustUpdater discovers the currently valid certificates and keys and
62 securely downloads them from the remote TUF repository at 'url'.
63
64 TrustUpdater expects to find an initial root.json in either the local
65 metadata directory for this URL, or (as special case for the sigstore.dev
66 production and staging instances) in the application resources.
67 """
68
69 def __init__(self, url: str, offline: bool = False) -> None:
70 """
71 Create a new `TrustUpdater`, pulling from the given `url`.
72
73 TrustUpdater expects that either embedded data contains
74 a root.json for this url or that local data has been initialized
75 already.
76
77 If not `offline`, TrustUpdater will update the TUF metadata from
78 the remote repository.
79 """
80 self._repo_url = url
81 self._metadata_dir, self._targets_dir = _get_dirs(url)
82
83 # Populate targets cache so we don't have to download these versions
84 self._targets_dir.mkdir(parents=True, exist_ok=True)
85
86 for artifact in ["trusted_root.json", "signing_config.v0.2.json"]:
87 artifact_path = self._targets_dir / artifact
88 if not artifact_path.exists():
89 try:
90 data = read_embedded(artifact, url)
91 artifact_path.write_bytes(data)
92 except FileNotFoundError:
93 pass # this is ok: e.g. signing_config is not in prod repository yet
94
95 _logger.debug(f"TUF metadata: {self._metadata_dir}")
96 _logger.debug(f"TUF targets cache: {self._targets_dir}")
97
98 self._updater: Updater | None = None
99 if offline:
100 _logger.warning(
101 "TUF repository is loaded in offline mode; updates will not be performed"
102 )
103 else:
104 # Initialize and update the toplevel TUF metadata
105 try:
106 root_json = read_embedded("root.json", url)
107 except FileNotFoundError:
108 # embedded root not found: we can still initialize _if_ the local metadata
109 # exists already
110 root_json = None
111
112 self._updater = Updater(
113 metadata_dir=str(self._metadata_dir),
114 metadata_base_url=self._repo_url,
115 target_base_url=parse.urljoin(f"{self._repo_url}/", "targets/"),
116 target_dir=str(self._targets_dir),
117 config=UpdaterConfig(app_user_agent=f"sigstore-python/{__version__}"),
118 bootstrap=root_json,
119 )
120
121 try:
122 self._updater.refresh()
123 except Exception as e:
124 raise TUFError("Failed to refresh TUF metadata") from e
125
126 @lru_cache()
127 def get_trusted_root_path(self) -> str:
128 """Return local path to currently valid trusted root file"""
129 if not self._updater:
130 _logger.debug("Using unverified trusted root from cache")
131 return str(self._targets_dir / "trusted_root.json")
132
133 root_info = self._updater.get_targetinfo("trusted_root.json")
134 if root_info is None:
135 raise TUFError("Unsupported TUF configuration: no trusted root")
136 path = self._updater.find_cached_target(root_info)
137 if path is None:
138 try:
139 path = self._updater.download_target(root_info)
140 except (
141 TUFExceptions.DownloadError,
142 TUFExceptions.RepositoryError,
143 ) as e:
144 raise TUFError("Failed to download trusted key bundle") from e
145
146 _logger.debug("Found and verified trusted root")
147 return path
148
149 @lru_cache()
150 def get_signing_config_path(self) -> str:
151 """Return local path to currently valid signing config file"""
152 if not self._updater:
153 _logger.debug("Using unverified signing config from cache")
154 return str(self._targets_dir / "signing_config.v0.2.json")
155
156 root_info = self._updater.get_targetinfo("signing_config.v0.2.json")
157 if root_info is None:
158 raise TUFError("Unsupported TUF configuration: no signing config")
159 path = self._updater.find_cached_target(root_info)
160 if path is None:
161 try:
162 path = self._updater.download_target(root_info)
163 except (
164 TUFExceptions.DownloadError,
165 TUFExceptions.RepositoryError,
166 ) as e:
167 raise TUFError("Failed to download signing config") from e
168
169 _logger.debug("Found and verified signing config")
170 return path