Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/oidc.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16API for retrieving OIDC tokens.
17"""
19from __future__ import annotations
21import logging
22import sys
23import time
24import urllib.parse
25import webbrowser
26from datetime import datetime, timezone
27from typing import NoReturn, Optional, cast
29import id
30import jwt
31import requests
32from pydantic import BaseModel, StrictStr
34from sigstore._internal import USER_AGENT
35from sigstore.errors import Error, NetworkError
37# See: https://github.com/sigstore/fulcio/blob/b2186c0/pkg/config/config.go#L182-L201
38_KNOWN_OIDC_ISSUERS = {
39 "https://accounts.google.com": "email",
40 "https://oauth2.sigstore.dev/auth": "email",
41 "https://oauth2.sigstage.dev/auth": "email",
42 "https://token.actions.githubusercontent.com": "sub",
43}
45_DEFAULT_CLIENT_ID = "sigstore"
48class _OpenIDConfiguration(BaseModel):
49 """
50 Represents a (subset) of the fields provided by an OpenID Connect provider's
51 `.well-known/openid-configuration` response, as defined by OpenID Connect Discovery.
53 See: <https://openid.net/specs/openid-connect-discovery-1_0.html>
54 """
56 authorization_endpoint: StrictStr
57 token_endpoint: StrictStr
60class ExpiredIdentity(Exception):
61 """An error raised when an identity token is expired."""
64class IdentityToken:
65 """
66 An OIDC "identity", corresponding to an underlying OIDC token with
67 a sensible subject, issuer, and audience for Sigstore purposes.
68 """
70 def __init__(self, raw_token: str, client_id: str = _DEFAULT_CLIENT_ID) -> None:
71 """
72 Create a new `IdentityToken` from the given OIDC token.
73 """
75 self._raw_token = raw_token
77 # NOTE: The lack of verification here is intentional, and is part of
78 # Sigstore's verification model: clients like sigstore-python are
79 # responsible only for forwarding the OIDC identity to Fulcio for
80 # certificate binding and issuance.
81 try:
82 self._unverified_claims = jwt.decode(
83 raw_token,
84 options={
85 "verify_signature": False,
86 "verify_aud": True,
87 "verify_iat": True,
88 "verify_exp": True,
89 # These claims are required by OpenID Connect, so
90 # we can strongly enforce their presence.
91 # See: https://openid.net/specs/openid-connect-basic-1_0.html#IDToken
92 "require": ["aud", "sub", "iat", "exp", "iss"],
93 },
94 audience=client_id,
95 # NOTE: This leeway shouldn't be strictly necessary, but is
96 # included to preempt any (small) skew between the host
97 # and the originating IdP.
98 leeway=5,
99 )
100 except Exception as exc:
101 raise IdentityError(
102 "Identity token is malformed or missing claims"
103 ) from exc
105 self._iss: str = self._unverified_claims["iss"]
106 self._nbf: int | None = self._unverified_claims.get("nbf")
107 self._exp: int = self._unverified_claims["exp"]
109 # Fail early if this token isn't within its validity period.
110 if not self.in_validity_period():
111 raise IdentityError("Identity token is not within its validity period")
113 # When verifying the private key possession proof, Fulcio uses
114 # different claims depending on the token's issuer.
115 # We currently special-case a handful of these, and fall back
116 # on signing the "sub" claim otherwise.
117 identity_claim = _KNOWN_OIDC_ISSUERS.get(self.issuer)
118 if identity_claim is not None:
119 if identity_claim not in self._unverified_claims:
120 raise IdentityError(
121 f"Identity token is missing the required {identity_claim!r} claim"
122 )
124 self._identity = str(self._unverified_claims.get(identity_claim))
125 else:
126 try:
127 self._identity = str(self._unverified_claims["sub"])
128 except KeyError:
129 raise IdentityError(
130 "Identity token is missing the required 'sub' claim"
131 )
133 # This identity token might have been retrieved directly from
134 # an identity provider, or it might be a "federated" identity token
135 # retrieved from a federated IdP (e.g., Sigstore's own Dex instance).
136 # In the latter case, the claims will also include a `federated_claims`
137 # set, which in turn should include a `connector_id` that reflects
138 # the "real" token issuer. We retrieve this, despite technically
139 # being an implementation detail, because it has value to client
140 # users: a client might want to make sure that its user is identifying
141 # with a *particular* IdP, which means that they need to pierce the
142 # federation layer to check which IdP is actually being used.
143 self._federated_issuer: str | None = None
144 federated_claims = self._unverified_claims.get("federated_claims")
145 if federated_claims is not None:
146 if not isinstance(federated_claims, dict):
147 raise IdentityError(
148 "unexpected claim type: federated_claims is not a dict"
149 )
151 federated_issuer = federated_claims.get("connector_id")
152 if federated_issuer is not None:
153 if not isinstance(federated_issuer, str):
154 raise IdentityError(
155 "unexpected claim type: federated_claims.connector_id is not a string"
156 )
158 self._federated_issuer = federated_issuer
160 def in_validity_period(self) -> bool:
161 """
162 Returns whether or not this `Identity` is currently within its self-stated validity period.
164 NOTE: As noted in `Identity.__init__`, this is not a verifying wrapper;
165 the check here only asserts whether the *unverified* identity's claims
166 are within their validity period.
167 """
169 now = datetime.now(timezone.utc).timestamp()
171 if self._nbf is not None:
172 return self._nbf <= now < self._exp
173 else:
174 return now < self._exp
176 @property
177 def identity(self) -> str:
178 """
179 Returns this `IdentityToken`'s underlying "subject".
181 Note that this is **not** always the `sub` claim in the corresponding
182 identity token: depending onm the token's issuer, it may be a *different*
183 claim, such as `email`. This corresponds to the Sigstore ecosystem's
184 behavior, e.g. in each issued certificate's SAN.
185 """
186 return self._identity
188 @property
189 def issuer(self) -> str:
190 """
191 Returns a URL identifying this `IdentityToken`'s issuer.
192 """
193 return self._iss
195 @property
196 def federated_issuer(self) -> str:
197 """
198 Returns a URL identifying the **federated** issuer for any Sigstore
199 certificate issued against this identity token.
201 The behavior of this field is slightly subtle: for non-federated
202 identity providers (like a token issued directly by Google's IdP) it
203 should be exactly equivalent to `IdentityToken.issuer`. For federated
204 issuers (like Sigstore's own federated IdP) it should be equivalent to
205 the underlying federated issuer's URL, which is kept in an
206 implementation-defined claim.
208 This attribute exists so that clients who wish to inspect the expected
209 underlying issuer of their certificates can do so without relying on
210 implementation-specific behavior.
211 """
212 if self._federated_issuer is not None:
213 return self._federated_issuer
215 return self.issuer
217 def __str__(self) -> str:
218 """
219 Returns the underlying OIDC token for this identity.
221 That this token is secret in nature and **MUST NOT** be disclosed.
222 """
223 return self._raw_token
226class IssuerError(Exception):
227 """
228 Raised on any communication or format error with an OIDC issuer.
229 """
231 pass
234class Issuer:
235 """
236 Represents an OIDC issuer (IdP).
237 """
239 def __init__(self, base_url: str) -> None:
240 """
241 Create a new `Issuer` from the given base URL.
243 This URL is used to locate an OpenID Connect configuration file,
244 which is then used to bootstrap the issuer's state (such
245 as authorization and token endpoints).
246 """
247 self.session = requests.Session()
248 self.session.headers.update({"User-Agent": USER_AGENT})
250 oidc_config_url = urllib.parse.urljoin(
251 f"{base_url}/", ".well-known/openid-configuration"
252 )
254 try:
255 resp: requests.Response = self.session.get(oidc_config_url, timeout=30)
256 except (requests.ConnectionError, requests.Timeout) as exc:
257 raise NetworkError from exc
259 try:
260 resp.raise_for_status()
261 except requests.HTTPError as http_error:
262 raise IssuerError from http_error
264 try:
265 # We don't generally expect this to fail (since the provider should
266 # return a non-success HTTP code which we catch above), but we
267 # check just in case we have a misbehaving OIDC issuer.
268 self.oidc_config = _OpenIDConfiguration.model_validate(resp.json())
269 except ValueError as exc:
270 raise IssuerError(f"OIDC issuer returned invalid configuration: {exc}")
272 def identity_token( # nosec: B107
273 self,
274 client_id: str = _DEFAULT_CLIENT_ID,
275 client_secret: str = "",
276 force_oob: bool = False,
277 ) -> IdentityToken:
278 """
279 Retrieves and returns an `IdentityToken` from the current `Issuer`, via OAuth.
281 This function blocks on user interaction.
283 The `force_oob` flag controls the kind of flow performed. When `False` (the default),
284 this function attempts to open the user's web browser before falling back to
285 an out-of-band flow. When `True`, the out-of-band flow is always used.
286 """
288 # This function and the components that it relies on are based off of:
289 # https://github.com/psteniusubi/python-sample
291 from sigstore._internal.oidc.oauth import _OAuthFlow
293 code: str
294 with _OAuthFlow(client_id, client_secret, self) as server:
295 # Launch web browser
296 if not force_oob and webbrowser.open(server.base_uri):
297 print("Waiting for browser interaction...", file=sys.stderr)
298 else:
299 server.enable_oob()
300 print(
301 f"Go to the following link in a browser:\n\n\t{server.auth_endpoint}",
302 file=sys.stderr,
303 )
305 if not server.is_oob():
306 # Wait until the redirect server populates the response
307 while server.auth_response is None:
308 time.sleep(0.1)
310 auth_error = server.auth_response.get("error")
311 if auth_error is not None:
312 raise IdentityError(
313 f"Error response from auth endpoint: {auth_error[0]}"
314 )
315 code = server.auth_response["code"][0]
316 else:
317 # In the out-of-band case, we wait until the user provides the code
318 code = input("Enter verification code: ")
320 # Provide code to token endpoint
321 data = {
322 "grant_type": "authorization_code",
323 "redirect_uri": server.redirect_uri,
324 "code": code,
325 "code_verifier": server.oauth_session.code_verifier,
326 }
327 auth = (
328 client_id,
329 client_secret,
330 )
331 logging.debug(f"PAYLOAD: data={data}")
332 try:
333 resp = self.session.post(
334 self.oidc_config.token_endpoint,
335 data=data,
336 auth=auth,
337 timeout=30,
338 )
339 except (requests.ConnectionError, requests.Timeout) as exc:
340 raise NetworkError from exc
342 try:
343 resp.raise_for_status()
344 except requests.HTTPError as http_error:
345 raise IdentityError(
346 f"Token request failed with {resp.status_code}"
347 ) from http_error
349 token_json = resp.json()
350 token_error = token_json.get("error")
351 if token_error is not None:
352 raise IdentityError(f"Error response from token endpoint: {token_error}")
354 return IdentityToken(token_json["access_token"], client_id)
357class IdentityError(Error):
358 """
359 Wraps `id`'s IdentityError.
360 """
362 @classmethod
363 def raise_from_id(cls, exc: id.IdentityError) -> NoReturn:
364 """Raises a wrapped IdentityError from the provided `id.IdentityError`."""
365 raise cls(str(exc)) from exc
367 def diagnostics(self) -> str:
368 """Returns diagnostics for the error."""
369 if isinstance(self.__cause__, id.GitHubOidcPermissionCredentialError):
370 return f"""
371 Insufficient permissions for GitHub Actions workflow.
373 The most common reason for this is incorrect
374 configuration of the top-level `permissions` setting of the
375 workflow YAML file. It should be configured like so:
377 permissions:
378 id-token: write
380 Relevant documentation here:
382 https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/about-security-hardening-with-openid-connect#adding-permissions-settings
384 Another possible reason is that the workflow run has been
385 triggered by a PR from a forked repository. PRs from forked
386 repositories typically cannot be granted write access.
388 Relevant documentation here:
390 https://docs.github.com/en/actions/security-guides/automatic-token-authentication#modifying-the-permissions-for-the-github_token
392 Additional context:
394 {self.__cause__}
395 """
396 else:
397 return f"""
398 An issue occurred with ambient credential detection.
400 Additional context:
402 {self}
403 """
406def detect_credential(client_id: str = _DEFAULT_CLIENT_ID) -> str | None:
407 """Calls `id.detect_credential`, but wraps exceptions with our own exception type."""
409 try:
410 return cast(Optional[str], id.detect_credential(client_id))
411 except id.IdentityError as exc:
412 IdentityError.raise_from_id(exc)