Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/_internal/tuf.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

70 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16TUF functionality for `sigstore-python`. 

17""" 

18 

19from __future__ import annotations 

20 

21import logging 

22from functools import lru_cache 

23from pathlib import Path 

24from urllib import parse 

25 

26import platformdirs 

27from tuf.api import exceptions as TUFExceptions 

28from tuf.ngclient import Updater, UpdaterConfig # type: ignore[attr-defined] 

29 

30from sigstore import __version__ 

31from sigstore._utils import read_embedded 

32from sigstore.errors import RootError, TUFError 

33 

34_logger = logging.getLogger(__name__) 

35 

36DEFAULT_TUF_URL = "https://tuf-repo-cdn.sigstore.dev" 

37STAGING_TUF_URL = "https://tuf-repo-cdn.sigstage.dev" 

38 

39 

40def _get_dirs(url: str) -> tuple[Path, Path]: 

41 """ 

42 Given a TUF repository URL, return suitable local metadata and cache directories. 

43 

44 These directories are not guaranteed to already exist. 

45 """ 

46 

47 app_name = "sigstore-python" 

48 app_author = "sigstore" 

49 

50 repo_base = parse.quote(url, safe="") 

51 

52 tuf_data_dir = Path(platformdirs.user_data_dir(app_name, app_author)) / "tuf" 

53 tuf_cache_dir = Path(platformdirs.user_cache_dir(app_name, app_author)) / "tuf" 

54 

55 return (tuf_data_dir / repo_base), (tuf_cache_dir / repo_base) 

56 

57 

58class TrustUpdater: 

59 """Internal trust root (certificates and keys) downloader. 

60 

61 TrustUpdater discovers the currently valid certificates and keys and 

62 securely downloads them from the remote TUF repository at 'url'. 

63 

64 TrustUpdater expects to find an initial root.json in either the local 

65 metadata directory for this URL, or (as special case for the sigstore.dev 

66 production and staging instances) in the application resources. 

67 """ 

68 

69 def __init__(self, url: str, offline: bool = False) -> None: 

70 """ 

71 Create a new `TrustUpdater`, pulling from the given `url`. 

72 

73 The URL is expected to match one of `sigstore-python`'s known TUF 

74 roots, i.e. for the production or staging Sigstore TUF repos. 

75 

76 If not `offline`, TrustUpdater will update the TUF metadata from 

77 the remote repository. 

78 """ 

79 self._repo_url = url 

80 self._metadata_dir, self._targets_dir = _get_dirs(url) 

81 

82 rsrc_prefix: str 

83 if self._repo_url == DEFAULT_TUF_URL: 

84 rsrc_prefix = "prod" 

85 elif self._repo_url == STAGING_TUF_URL: 

86 rsrc_prefix = "staging" 

87 else: 

88 raise RootError 

89 

90 # Initialize targets cache dir 

91 self._targets_dir.mkdir(parents=True, exist_ok=True) 

92 trusted_root_target = self._targets_dir / "trusted_root.json" 

93 

94 if not trusted_root_target.exists(): 

95 try: 

96 trusted_root_json = read_embedded("trusted_root.json", rsrc_prefix) 

97 except FileNotFoundError as e: 

98 raise RootError from e 

99 

100 trusted_root_target.write_bytes(trusted_root_json) 

101 

102 _logger.debug(f"TUF metadata: {self._metadata_dir}") 

103 _logger.debug(f"TUF targets cache: {self._targets_dir}") 

104 

105 self._updater: Updater | None = None 

106 if offline: 

107 _logger.warning( 

108 "TUF repository is loaded in offline mode; updates will not be performed" 

109 ) 

110 else: 

111 # Initialize and update the toplevel TUF metadata 

112 try: 

113 root_json = read_embedded("root.json", rsrc_prefix) 

114 except FileNotFoundError as e: 

115 raise RootError from e 

116 self._updater = Updater( 

117 metadata_dir=str(self._metadata_dir), 

118 metadata_base_url=self._repo_url, 

119 target_base_url=parse.urljoin(f"{self._repo_url}/", "targets/"), 

120 target_dir=str(self._targets_dir), 

121 config=UpdaterConfig(app_user_agent=f"sigstore-python/{__version__}"), 

122 bootstrap=root_json, 

123 ) 

124 try: 

125 self._updater.refresh() 

126 except Exception as e: 

127 raise TUFError("Failed to refresh TUF metadata") from e 

128 

129 @lru_cache() 

130 def get_trusted_root_path(self) -> str: 

131 """Return local path to currently valid trusted root file""" 

132 if not self._updater: 

133 _logger.debug("Using unverified trusted root from cache") 

134 return str(self._targets_dir / "trusted_root.json") 

135 

136 root_info = self._updater.get_targetinfo("trusted_root.json") 

137 if root_info is None: 

138 raise TUFError("Unsupported TUF configuration: no trusted root") 

139 path = self._updater.find_cached_target(root_info) 

140 if path is None: 

141 try: 

142 path = self._updater.download_target(root_info) 

143 except ( 

144 TUFExceptions.DownloadError, 

145 TUFExceptions.RepositoryError, 

146 ) as e: 

147 raise TUFError("Failed to download trusted key bundle") from e 

148 

149 _logger.debug("Found and verified trusted root") 

150 return path