Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/oidc.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""
16API for retrieving OIDC tokens.
17"""
19from __future__ import annotations
21import logging
22import sys
23import time
24import urllib.parse
25import webbrowser
26from datetime import datetime, timezone
27from typing import NoReturn, cast
29import id
30import jwt
31import requests
32from pydantic import BaseModel, StrictStr
34from sigstore._internal import USER_AGENT
35from sigstore.errors import Error, NetworkError
37# See: https://github.com/sigstore/fulcio/blob/b2186c0/pkg/config/config.go#L182-L201
38_KNOWN_OIDC_ISSUERS = {
39 "https://accounts.google.com": "email",
40 "https://oauth2.sigstore.dev/auth": "email",
41 "https://oauth2.sigstage.dev/auth": "email",
42 "https://token.actions.githubusercontent.com": "sub",
43}
45_DEFAULT_CLIENT_ID = "sigstore"
48class _OpenIDConfiguration(BaseModel):
49 """
50 Represents a (subset) of the fields provided by an OpenID Connect provider's
51 `.well-known/openid-configuration` response, as defined by OpenID Connect Discovery.
53 See: <https://openid.net/specs/openid-connect-discovery-1_0.html>
54 """
56 authorization_endpoint: StrictStr
57 token_endpoint: StrictStr
60class ExpiredIdentity(Exception):
61 """An error raised when an identity token is expired."""
64class IdentityToken:
65 """
66 An OIDC "identity", corresponding to an underlying OIDC token with
67 a sensible subject, issuer, and audience for Sigstore purposes.
68 """
70 def __init__(self, raw_token: str, client_id: str = _DEFAULT_CLIENT_ID) -> None:
71 """
72 Create a new `IdentityToken` from the given OIDC token.
73 """
75 self._raw_token = raw_token
77 # NOTE: The lack of verification here is intentional, and is part of
78 # Sigstore's verification model: clients like sigstore-python are
79 # responsible only for forwarding the OIDC identity to Fulcio for
80 # certificate binding and issuance.
81 try:
82 self._unverified_claims = jwt.decode(
83 raw_token,
84 options={
85 "verify_signature": False,
86 "verify_aud": True,
87 "verify_iat": True,
88 "verify_exp": True,
89 # These claims are required by OpenID Connect, so
90 # we can strongly enforce their presence.
91 # See: https://openid.net/specs/openid-connect-basic-1_0.html#IDToken
92 "require": ["aud", "sub", "iat", "exp", "iss"],
93 },
94 audience=client_id,
95 # NOTE: This leeway shouldn't be strictly necessary, but is
96 # included to preempt any (small) skew between the host
97 # and the originating IdP.
98 leeway=5,
99 )
100 except Exception as exc:
101 raise IdentityError(
102 "Identity token is malformed or missing claims"
103 ) from exc
105 self._iss: str = self._unverified_claims["iss"]
106 self._nbf: int | None = self._unverified_claims.get("nbf")
107 self._exp: int = self._unverified_claims["exp"]
109 # Fail early if this token isn't within its validity period.
110 if not self.in_validity_period():
111 raise IdentityError("Identity token is not within its validity period")
113 # When verifying the private key possession proof, Fulcio uses
114 # different claims depending on the token's issuer.
115 # We currently special-case a handful of these, and fall back
116 # on signing the "sub" claim otherwise.
117 identity_claim = _KNOWN_OIDC_ISSUERS.get(self.issuer)
118 if identity_claim is not None:
119 if identity_claim not in self._unverified_claims:
120 raise IdentityError(
121 f"Identity token is missing the required {identity_claim!r} claim"
122 )
124 self._identity = str(self._unverified_claims.get(identity_claim))
125 else:
126 try:
127 self._identity = str(self._unverified_claims["sub"])
128 except KeyError:
129 raise IdentityError(
130 "Identity token is missing the required 'sub' claim"
131 )
133 # This identity token might have been retrieved directly from
134 # an identity provider, or it might be a "federated" identity token
135 # retrieved from a federated IdP (e.g., Sigstore's own Dex instance).
136 # In the latter case, the claims will also include a `federated_claims`
137 # set, which in turn should include a `connector_id` that reflects
138 # the "real" token issuer. We retrieve this, despite technically
139 # being an implementation detail, because it has value to client
140 # users: a client might want to make sure that its user is identifying
141 # with a *particular* IdP, which means that they need to pierce the
142 # federation layer to check which IdP is actually being used.
143 self._federated_issuer: str | None = None
144 federated_claims = self._unverified_claims.get("federated_claims")
145 if federated_claims is not None:
146 if not isinstance(federated_claims, dict):
147 raise IdentityError(
148 "unexpected claim type: federated_claims is not a dict"
149 )
151 federated_issuer = federated_claims.get("connector_id")
152 if federated_issuer is not None:
153 if not isinstance(federated_issuer, str):
154 raise IdentityError(
155 "unexpected claim type: federated_claims.connector_id is not a string"
156 )
158 self._federated_issuer = federated_issuer
160 def in_validity_period(self) -> bool:
161 """
162 Returns whether or not this `Identity` is currently within its self-stated validity period.
164 NOTE: As noted in `Identity.__init__`, this is not a verifying wrapper;
165 the check here only asserts whether the *unverified* identity's claims
166 are within their validity period.
167 """
169 now = datetime.now(timezone.utc).timestamp()
171 if self._nbf is not None:
172 return self._nbf <= now < self._exp
173 else:
174 return now < self._exp
176 @property
177 def identity(self) -> str:
178 """
179 Returns this `IdentityToken`'s underlying "subject".
181 Note that this is **not** always the `sub` claim in the corresponding
182 identity token: depending onm the token's issuer, it may be a *different*
183 claim, such as `email`. This corresponds to the Sigstore ecosystem's
184 behavior, e.g. in each issued certificate's SAN.
185 """
186 return self._identity
188 @property
189 def issuer(self) -> str:
190 """
191 Returns a URL identifying this `IdentityToken`'s issuer.
192 """
193 return self._iss
195 @property
196 def federated_issuer(self) -> str:
197 """
198 Returns a URL identifying the **federated** issuer for any Sigstore
199 certificate issued against this identity token.
201 The behavior of this field is slightly subtle: for non-federated
202 identity providers (like a token issued directly by Google's IdP) it
203 should be exactly equivalent to `IdentityToken.issuer`. For federated
204 issuers (like Sigstore's own federated IdP) it should be equivalent to
205 the underlying federated issuer's URL, which is kept in an
206 implementation-defined claim.
208 This attribute exists so that clients who wish to inspect the expected
209 underlying issuer of their certificates can do so without relying on
210 implementation-specific behavior.
211 """
212 if self._federated_issuer is not None:
213 return self._federated_issuer
215 return self.issuer
217 def __str__(self) -> str:
218 """
219 Returns the underlying OIDC token for this identity.
221 That this token is secret in nature and **MUST NOT** be disclosed.
222 """
223 return self._raw_token
226class IssuerError(Exception):
227 """
228 Raised on any communication or format error with an OIDC issuer.
229 """
231 pass
234class Issuer:
235 """
236 Represents an OIDC issuer (IdP).
237 """
239 def __init__(self, base_url: str) -> None:
240 """
241 Create a new `Issuer` from the given base URL.
243 This URL is used to locate an OpenID Connect configuration file,
244 which is then used to bootstrap the issuer's state (such
245 as authorization and token endpoints).
246 """
247 self.session = requests.Session()
248 self.session.headers.update({"User-Agent": USER_AGENT})
250 oidc_config_url = urllib.parse.urljoin(
251 f"{base_url}/", ".well-known/openid-configuration"
252 )
254 try:
255 resp: requests.Response = self.session.get(oidc_config_url, timeout=30)
256 except (requests.ConnectionError, requests.Timeout) as exc:
257 raise NetworkError from exc
259 try:
260 resp.raise_for_status()
261 except requests.HTTPError as http_error:
262 raise IssuerError from http_error
264 try:
265 # We don't generally expect this to fail (since the provider should
266 # return a non-success HTTP code which we catch above), but we
267 # check just in case we have a misbehaving OIDC issuer.
268 self.oidc_config = _OpenIDConfiguration.model_validate(resp.json())
269 except ValueError as exc:
270 raise IssuerError(f"OIDC issuer returned invalid configuration: {exc}")
272 def identity_token( # nosec: B107
273 self,
274 client_id: str = _DEFAULT_CLIENT_ID,
275 client_secret: str = "",
276 force_oob: bool = False,
277 ) -> IdentityToken:
278 """
279 Retrieves and returns an `IdentityToken` from the current `Issuer`, via OAuth.
281 This function blocks on user interaction.
283 The `force_oob` flag controls the kind of flow performed. When `False` (the default),
284 this function attempts to open the user's web browser before falling back to
285 an out-of-band flow. When `True`, the out-of-band flow is always used.
286 """
288 # This function and the components that it relies on are based off of:
289 # https://github.com/psteniusubi/python-sample
291 from sigstore._internal.oidc.oauth import _OAuthFlow
293 code: str
294 with _OAuthFlow(client_id, client_secret, self) as server:
295 # Launch web browser
296 if not force_oob and webbrowser.open(server.base_uri):
297 print("Waiting for browser interaction...", file=sys.stderr)
298 else:
299 server.enable_oob()
300 print(
301 f"Go to the following link in a browser:\n\n\t{server.auth_endpoint}",
302 file=sys.stderr,
303 )
305 if not server.is_oob():
306 # Wait until the redirect server populates the response
307 while server.auth_response is None:
308 time.sleep(0.1)
310 auth_error = server.auth_response.get("error")
311 if auth_error is not None:
312 raise IdentityError(
313 f"Error response from auth endpoint: {auth_error[0]}"
314 )
316 if server.auth_response["state"][0] != server.oauth_session.state:
317 raise IdentityError("OAuth state mismatch")
319 code = server.auth_response["code"][0]
320 else:
321 # In the out-of-band case, we wait until the user provides the code
322 code = input("Enter verification code: ")
324 # Provide code to token endpoint
325 data = {
326 "grant_type": "authorization_code",
327 "redirect_uri": server.redirect_uri,
328 "code": code,
329 "code_verifier": server.oauth_session.code_verifier,
330 }
331 auth = (
332 client_id,
333 client_secret,
334 )
335 logging.debug(f"PAYLOAD: data={data}")
336 try:
337 resp = self.session.post(
338 self.oidc_config.token_endpoint,
339 data=data,
340 auth=auth,
341 timeout=30,
342 )
343 except (requests.ConnectionError, requests.Timeout) as exc:
344 raise NetworkError from exc
346 try:
347 resp.raise_for_status()
348 except requests.HTTPError as http_error:
349 raise IdentityError(
350 f"Token request failed with {resp.status_code}"
351 ) from http_error
353 token_json = resp.json()
354 token_error = token_json.get("error")
355 if token_error is not None:
356 raise IdentityError(f"Error response from token endpoint: {token_error}")
358 return IdentityToken(token_json["access_token"], client_id)
361class IdentityError(Error):
362 """
363 Wraps `id`'s IdentityError.
364 """
366 @classmethod
367 def raise_from_id(cls, exc: id.IdentityError) -> NoReturn:
368 """Raises a wrapped IdentityError from the provided `id.IdentityError`."""
369 raise cls(str(exc)) from exc
371 def diagnostics(self) -> str:
372 """Returns diagnostics for the error."""
373 if isinstance(self.__cause__, id.GitHubOidcPermissionCredentialError):
374 return f"""
375 Insufficient permissions for GitHub Actions workflow.
377 The most common reason for this is incorrect
378 configuration of the top-level `permissions` setting of the
379 workflow YAML file. It should be configured like so:
381 permissions:
382 id-token: write
384 Relevant documentation here:
386 https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/about-security-hardening-with-openid-connect#adding-permissions-settings
388 Another possible reason is that the workflow run has been
389 triggered by a PR from a forked repository. PRs from forked
390 repositories typically cannot be granted write access.
392 Relevant documentation here:
394 https://docs.github.com/en/actions/security-guides/automatic-token-authentication#modifying-the-permissions-for-the-github_token
396 Additional context:
398 {self.__cause__}
399 """
400 else:
401 return f"""
402 An issue occurred with ambient credential detection.
404 Additional context:
406 {self}
407 """
410def detect_credential(client_id: str = _DEFAULT_CLIENT_ID) -> str | None:
411 """Calls `id.detect_credential`, but wraps exceptions with our own exception type."""
413 try:
414 return cast(str | None, id.detect_credential(client_id))
415 except id.IdentityError as exc:
416 IdentityError.raise_from_id(exc)