Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/_internal/tuf.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

82 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16TUF functionality for `sigstore-python`. 

17""" 

18 

19from __future__ import annotations 

20 

21import logging 

22from functools import lru_cache 

23from pathlib import Path 

24from urllib import parse 

25 

26import platformdirs 

27from tuf.api import exceptions as TUFExceptions 

28from tuf.ngclient import Updater, UpdaterConfig # type: ignore[attr-defined] 

29 

30from sigstore import __version__ 

31from sigstore._utils import read_embedded 

32from sigstore.errors import TUFError 

33 

34_logger = logging.getLogger(__name__) 

35 

36DEFAULT_TUF_URL = "https://tuf-repo-cdn.sigstore.dev" 

37STAGING_TUF_URL = "https://tuf-repo-cdn.sigstage.dev" 

38 

39 

40def _get_dirs(url: str) -> tuple[Path, Path]: 

41 """ 

42 Given a TUF repository URL, return suitable local metadata and cache directories. 

43 

44 These directories are not guaranteed to already exist. 

45 """ 

46 

47 app_name = "sigstore-python" 

48 app_author = "sigstore" 

49 

50 repo_base = parse.quote(url, safe="") 

51 

52 tuf_data_dir = Path(platformdirs.user_data_dir(app_name, app_author)) / "tuf" 

53 tuf_cache_dir = Path(platformdirs.user_cache_dir(app_name, app_author)) / "tuf" 

54 

55 return (tuf_data_dir / repo_base), (tuf_cache_dir / repo_base) 

56 

57 

58class TrustUpdater: 

59 """Internal trust root (certificates and keys) downloader. 

60 

61 TrustUpdater discovers the currently valid certificates and keys and 

62 securely downloads them from the remote TUF repository at 'url'. 

63 

64 TrustUpdater expects to find an initial root.json in either the local 

65 metadata directory for this URL, or (as special case for the sigstore.dev 

66 production and staging instances) in the application resources. 

67 """ 

68 

69 def __init__(self, url: str, offline: bool = False) -> None: 

70 """ 

71 Create a new `TrustUpdater`, pulling from the given `url`. 

72 

73 TrustUpdater expects that either embedded data contains 

74 a root.json for this url or that local data has been initialized 

75 already. 

76 

77 If not `offline`, TrustUpdater will update the TUF metadata from 

78 the remote repository. 

79 """ 

80 self._repo_url = url 

81 self._metadata_dir, self._targets_dir = _get_dirs(url) 

82 

83 # Populate targets cache so we don't have to download these versions 

84 self._targets_dir.mkdir(parents=True, exist_ok=True) 

85 

86 for artifact in ["trusted_root.json", "signing_config.v0.2.json"]: 

87 artifact_path = self._targets_dir / artifact 

88 if not artifact_path.exists(): 

89 try: 

90 data = read_embedded(artifact, url) 

91 artifact_path.write_bytes(data) 

92 except FileNotFoundError: 

93 pass # this is ok: e.g. signing_config is not in prod repository yet 

94 

95 _logger.debug(f"TUF metadata: {self._metadata_dir}") 

96 _logger.debug(f"TUF targets cache: {self._targets_dir}") 

97 

98 self._updater: Updater | None = None 

99 if offline: 

100 _logger.warning( 

101 "TUF repository is loaded in offline mode; updates will not be performed" 

102 ) 

103 else: 

104 # Initialize and update the toplevel TUF metadata 

105 try: 

106 root_json = read_embedded("root.json", url) 

107 except FileNotFoundError: 

108 # embedded root not found: we can still initialize _if_ the local metadata 

109 # exists already 

110 root_json = None 

111 

112 self._updater = Updater( 

113 metadata_dir=str(self._metadata_dir), 

114 metadata_base_url=self._repo_url, 

115 target_base_url=parse.urljoin(f"{self._repo_url}/", "targets/"), 

116 target_dir=str(self._targets_dir), 

117 config=UpdaterConfig(app_user_agent=f"sigstore-python/{__version__}"), 

118 bootstrap=root_json, 

119 ) 

120 

121 try: 

122 self._updater.refresh() 

123 except Exception as e: 

124 raise TUFError("Failed to refresh TUF metadata") from e 

125 

126 @lru_cache() 

127 def get_trusted_root_path(self) -> str: 

128 """Return local path to currently valid trusted root file""" 

129 if not self._updater: 

130 _logger.debug("Using unverified trusted root from cache") 

131 return str(self._targets_dir / "trusted_root.json") 

132 

133 root_info = self._updater.get_targetinfo("trusted_root.json") 

134 if root_info is None: 

135 raise TUFError("Unsupported TUF configuration: no trusted root") 

136 path = self._updater.find_cached_target(root_info) 

137 if path is None: 

138 try: 

139 path = self._updater.download_target(root_info) 

140 except ( 

141 TUFExceptions.DownloadError, 

142 TUFExceptions.RepositoryError, 

143 ) as e: 

144 raise TUFError("Failed to download trusted key bundle") from e 

145 

146 _logger.debug("Found and verified trusted root") 

147 return path 

148 

149 @lru_cache() 

150 def get_signing_config_path(self) -> str: 

151 """Return local path to currently valid signing config file""" 

152 if not self._updater: 

153 _logger.debug("Using unverified signing config from cache") 

154 return str(self._targets_dir / "signing_config.v0.2.json") 

155 

156 root_info = self._updater.get_targetinfo("signing_config.v0.2.json") 

157 if root_info is None: 

158 raise TUFError("Unsupported TUF configuration: no signing config") 

159 path = self._updater.find_cached_target(root_info) 

160 if path is None: 

161 try: 

162 path = self._updater.download_target(root_info) 

163 except ( 

164 TUFExceptions.DownloadError, 

165 TUFExceptions.RepositoryError, 

166 ) as e: 

167 raise TUFError("Failed to download signing config") from e 

168 

169 _logger.debug("Found and verified signing config") 

170 return path