Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/iam.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

52 statements  

1# Copyright 2017 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Tools for using the Google `Cloud Identity and Access Management (IAM) 

16API`_'s auth-related functionality. 

17 

18.. _Cloud Identity and Access Management (IAM) API: 

19 https://cloud.google.com/iam/docs/ 

20""" 

21 

22import base64 

23import http.client as http_client 

24import json 

25import os 

26 

27from google.auth import _exponential_backoff 

28from google.auth import _helpers 

29from google.auth import credentials 

30from google.auth import crypt 

31from google.auth import exceptions 

32from google.auth.transport import mtls 

33 

34IAM_RETRY_CODES = { 

35 http_client.INTERNAL_SERVER_ERROR, 

36 http_client.BAD_GATEWAY, 

37 http_client.SERVICE_UNAVAILABLE, 

38 http_client.GATEWAY_TIMEOUT, 

39} 

40 

41_IAM_SCOPE = ["https://www.googleapis.com/auth/iam"] 

42 

43# 1. Determine if we should use mTLS. 

44# Note: We only support automatic mTLS on the default googleapis.com universe. 

45if hasattr(mtls, "should_use_client_cert"): 

46 use_client_cert = mtls.should_use_client_cert() 

47else: # pragma: NO COVER 

48 # if unsupported, fallback to reading from env var 

49 use_client_cert = ( 

50 os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false").lower() == "true" 

51 ) 

52 

53# 2. Construct the template domain using the library's DEFAULT_UNIVERSE_DOMAIN constant. 

54# This ensures that the .replace() calls in the classes will work correctly. 

55if use_client_cert: 

56 # We use the .mtls. prefix only for the default universe template 

57 _IAM_DOMAIN = f"iamcredentials.mtls.{credentials.DEFAULT_UNIVERSE_DOMAIN}" 

58else: 

59 _IAM_DOMAIN = f"iamcredentials.{credentials.DEFAULT_UNIVERSE_DOMAIN}" 

60 

61# 3. Create the common base URL template 

62# We use double brackets {{}} so .format() can be called later for the email. 

63_IAM_BASE_URL = f"https://{_IAM_DOMAIN}/v1/projects/-/serviceAccounts/{{}}" 

64 

65# 4. Define the endpoints as templates 

66_IAM_ENDPOINT = _IAM_BASE_URL + ":generateAccessToken" 

67_IAM_SIGN_ENDPOINT = _IAM_BASE_URL + ":signBlob" 

68_IAM_SIGNJWT_ENDPOINT = _IAM_BASE_URL + ":signJwt" 

69_IAM_IDTOKEN_ENDPOINT = _IAM_BASE_URL + ":generateIdToken" 

70 

71 

72class Signer(crypt.Signer): 

73 """Signs messages using the IAM `signBlob API`_. 

74 

75 This is useful when you need to sign bytes but do not have access to the 

76 credential's private key file. 

77 

78 .. _signBlob API: 

79 https://cloud.google.com/iam/reference/rest/v1/projects.serviceAccounts 

80 /signBlob 

81 """ 

82 

83 def __init__(self, request, credentials, service_account_email): 

84 """ 

85 Args: 

86 request (google.auth.transport.Request): The object used to make 

87 HTTP requests. 

88 credentials (google.auth.credentials.Credentials): The credentials 

89 that will be used to authenticate the request to the IAM API. 

90 The credentials must have of one the following scopes: 

91 

92 - https://www.googleapis.com/auth/iam 

93 - https://www.googleapis.com/auth/cloud-platform 

94 service_account_email (str): The service account email identifying 

95 which service account to use to sign bytes. Often, this can 

96 be the same as the service account email in the given 

97 credentials. 

98 """ 

99 self._request = request 

100 self._credentials = credentials 

101 self._service_account_email = service_account_email 

102 

103 def _make_signing_request(self, message): 

104 """Makes a request to the API signBlob API.""" 

105 message = _helpers.to_bytes(message) 

106 

107 method = "POST" 

108 url = _IAM_SIGN_ENDPOINT.replace( 

109 credentials.DEFAULT_UNIVERSE_DOMAIN, self._credentials.universe_domain 

110 ).format(self._service_account_email) 

111 headers = {"Content-Type": "application/json"} 

112 body = json.dumps( 

113 {"payload": base64.b64encode(message).decode("utf-8")} 

114 ).encode("utf-8") 

115 

116 retries = _exponential_backoff.ExponentialBackoff() 

117 for _ in retries: 

118 self._credentials.before_request(self._request, method, url, headers) 

119 

120 response = self._request(url=url, method=method, body=body, headers=headers) 

121 

122 if response.status in IAM_RETRY_CODES: 

123 continue 

124 

125 if response.status != http_client.OK: 

126 raise exceptions.TransportError( 

127 "Error calling the IAM signBlob API: {}".format(response.data) 

128 ) 

129 

130 return json.loads(response.data.decode("utf-8")) 

131 raise exceptions.TransportError("exhausted signBlob endpoint retries") 

132 

133 @property 

134 def key_id(self): 

135 """Optional[str]: The key ID used to identify this private key. 

136 

137 .. warning:: 

138 This is always ``None``. The key ID used by IAM can not 

139 be reliably determined ahead of time. 

140 """ 

141 return None 

142 

143 @_helpers.copy_docstring(crypt.Signer) 

144 def sign(self, message): 

145 response = self._make_signing_request(message) 

146 return base64.b64decode(response["signedBlob"])