Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/id/__init__.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

41 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16API for retrieving OIDC tokens. 

17""" 

18 

19from __future__ import annotations 

20 

21import base64 

22import binascii 

23import json 

24from typing import Callable 

25 

26__version__ = "1.6.1" 

27 

28 

29class IdentityError(Exception): 

30 """ 

31 Raised on any OIDC token format or claim error. 

32 """ 

33 

34 pass 

35 

36 

37class AmbientCredentialError(IdentityError): 

38 """ 

39 Raised when an ambient credential should be present, but 

40 can't be retrieved (e.g. network failure). 

41 """ 

42 

43 pass 

44 

45 

46class GitHubOidcPermissionCredentialError(AmbientCredentialError): 

47 """ 

48 Raised when the current GitHub Actions environment doesn't have permission 

49 to retrieve an OIDC token. 

50 """ 

51 

52 pass 

53 

54 

55def _validate_credential(credential: str, audience: str) -> None: 

56 # Decode credential to verify it roughly looks like a token and contains 

57 # the correct audience 

58 try: 

59 _, payload, _ = credential.split(".") 

60 decoded_payload = base64.urlsafe_b64decode(payload + "==").decode("utf-8") 

61 payload_json = json.loads(decoded_payload) 

62 except (ValueError, binascii.Error, json.decoder.JSONDecodeError) as e: 

63 raise AmbientCredentialError("Malformed token") from e 

64 

65 if not isinstance(payload_json, dict): 

66 raise AmbientCredentialError("Malformed token payload (JWT is not a JSON object)") 

67 if "aud" not in payload_json: 

68 raise AmbientCredentialError("Malformed token payload (audience claim is missing)") 

69 if payload_json["aud"] != audience: 

70 raise AmbientCredentialError( 

71 f"Token audience claim mismatch (expected {audience}, got {payload_json['aud']})" 

72 ) 

73 

74 

75def detect_credential(audience: str) -> str | None: 

76 """ 

77 Try each ambient credential detector, returning the first one to succeed 

78 or `None` if all fail. 

79 

80 Raises `AmbientCredentialError` if any detector fails internally (i.e. 

81 detects a credential, but cannot retrieve it). 

82 """ 

83 from ._internal.oidc.ambient import ( 

84 detect_buildkite, 

85 detect_circleci, 

86 detect_gcp, 

87 detect_github, 

88 detect_gitlab, 

89 ) 

90 

91 detectors: list[Callable[..., str | None]] = [ 

92 detect_github, 

93 detect_gcp, 

94 detect_buildkite, 

95 detect_gitlab, 

96 detect_circleci, 

97 ] 

98 for detector in detectors: 

99 credential = detector(audience) 

100 if credential is not None: 

101 _validate_credential(credential, audience) 

102 return credential 

103 return None 

104 

105 

106def decode_oidc_token(token: str) -> tuple[str, str, str]: 

107 # Split the token into its three parts: header, payload, and signature 

108 header, payload, signature = token.split(".") 

109 

110 # Decode base64-encoded header and payload 

111 decoded_header = base64.urlsafe_b64decode(header + "==").decode("utf-8") 

112 decoded_payload = base64.urlsafe_b64decode(payload + "==").decode("utf-8") 

113 

114 return decoded_header, decoded_payload, signature