Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/iam.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

52 statements  

1# Copyright 2017 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Tools for using the Google `Cloud Identity and Access Management (IAM) 

16API`_'s auth-related functionality. 

17 

18.. _Cloud Identity and Access Management (IAM) API: 

19 https://cloud.google.com/iam/docs/ 

20""" 

21 

22import base64 

23import http.client as http_client 

24import json 

25 

26from google.auth import _exponential_backoff 

27from google.auth import _helpers 

28from google.auth import credentials 

29from google.auth import crypt 

30from google.auth import exceptions 

31from google.auth.transport import _mtls_helper 

32 

33IAM_RETRY_CODES = { 

34 http_client.INTERNAL_SERVER_ERROR, 

35 http_client.BAD_GATEWAY, 

36 http_client.SERVICE_UNAVAILABLE, 

37 http_client.GATEWAY_TIMEOUT, 

38} 

39 

40_IAM_SCOPE = ["https://www.googleapis.com/auth/iam"] 

41 

42# Determine if we should use mTLS. 

43if ( 

44 hasattr(_mtls_helper, "check_use_client_cert") 

45 and _mtls_helper.check_use_client_cert() 

46): 

47 # Construct the template domain using the library's DEFAULT_UNIVERSE_DOMAIN constant. 

48 _IAM_DOMAIN = f"iamcredentials.mtls.{credentials.DEFAULT_UNIVERSE_DOMAIN}" 

49else: 

50 _IAM_DOMAIN = f"iamcredentials.{credentials.DEFAULT_UNIVERSE_DOMAIN}" 

51 

52# 3. Create the common base URL template 

53# We use double brackets {{}} so .format() can be called later for the email. 

54_IAM_BASE_URL = f"https://{_IAM_DOMAIN}/v1/projects/-/serviceAccounts/{{}}" 

55 

56# 4. Define the endpoints as templates 

57_IAM_ENDPOINT = _IAM_BASE_URL + ":generateAccessToken" 

58_IAM_SIGN_ENDPOINT = _IAM_BASE_URL + ":signBlob" 

59_IAM_SIGNJWT_ENDPOINT = _IAM_BASE_URL + ":signJwt" 

60_IAM_IDTOKEN_ENDPOINT = _IAM_BASE_URL + ":generateIdToken" 

61 

62 

63# Regional Access Boundary (RAB) Lookup Endpoints 

64_SERVICE_ACCOUNT_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT = f"https://{_IAM_DOMAIN}/v1/projects/-/serviceAccounts/{{service_account_email}}/allowedLocations" 

65_WORKFORCE_POOL_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT = f"https://{_IAM_DOMAIN}/v1/locations/global/workforcePools/{{pool_id}}/allowedLocations" 

66_WORKLOAD_IDENTITY_POOL_REGIONAL_ACCESS_BOUNDARY_LOOKUP_ENDPOINT = f"https://{_IAM_DOMAIN}/v1/projects/{{project_number}}/locations/global/workloadIdentityPools/{{pool_id}}/allowedLocations" 

67 

68 

69class Signer(crypt.Signer): 

70 """Signs messages using the IAM `signBlob API`_. 

71 

72 This is useful when you need to sign bytes but do not have access to the 

73 credential's private key file. 

74 

75 .. _signBlob API: 

76 https://cloud.google.com/iam/reference/rest/v1/projects.serviceAccounts 

77 /signBlob 

78 """ 

79 

80 def __init__(self, request, credentials, service_account_email): 

81 """ 

82 Args: 

83 request (google.auth.transport.Request): The object used to make 

84 HTTP requests. 

85 credentials (google.auth.credentials.Credentials): The credentials 

86 that will be used to authenticate the request to the IAM API. 

87 The credentials must have of one the following scopes: 

88 

89 - https://www.googleapis.com/auth/iam 

90 - https://www.googleapis.com/auth/cloud-platform 

91 service_account_email (str): The service account email identifying 

92 which service account to use to sign bytes. Often, this can 

93 be the same as the service account email in the given 

94 credentials. 

95 """ 

96 self._request = request 

97 self._credentials = credentials 

98 self._service_account_email = service_account_email 

99 

100 def _make_signing_request(self, message): 

101 """Makes a request to the API signBlob API.""" 

102 message = _helpers.to_bytes(message) 

103 

104 method = "POST" 

105 url = _IAM_SIGN_ENDPOINT.replace( 

106 credentials.DEFAULT_UNIVERSE_DOMAIN, self._credentials.universe_domain 

107 ).format(self._service_account_email) 

108 headers = {"Content-Type": "application/json"} 

109 body = json.dumps( 

110 {"payload": base64.b64encode(message).decode("utf-8")} 

111 ).encode("utf-8") 

112 

113 retries = _exponential_backoff.ExponentialBackoff() 

114 for _ in retries: 

115 self._credentials.before_request(self._request, method, url, headers) 

116 

117 response = self._request(url=url, method=method, body=body, headers=headers) 

118 

119 if response.status in IAM_RETRY_CODES: 

120 continue 

121 

122 if response.status != http_client.OK: 

123 raise exceptions.TransportError( 

124 "Error calling the IAM signBlob API: {}".format(response.data) 

125 ) 

126 

127 return json.loads(response.data.decode("utf-8")) 

128 raise exceptions.TransportError("exhausted signBlob endpoint retries") 

129 

130 @property 

131 def key_id(self): 

132 """Optional[str]: The key ID used to identify this private key. 

133 

134 .. warning:: 

135 This is always ``None``. The key ID used by IAM can not 

136 be reliably determined ahead of time. 

137 """ 

138 return None 

139 

140 @_helpers.copy_docstring(crypt.Signer) 

141 def sign(self, message): 

142 response = self._make_signing_request(message) 

143 return base64.b64decode(response["signedBlob"])