Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/iam.py: 53%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright 2017 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Tools for using the Google `Cloud Identity and Access Management (IAM)
16API`_'s auth-related functionality.
18.. _Cloud Identity and Access Management (IAM) API:
19 https://cloud.google.com/iam/docs/
20"""
22import base64
23import http.client as http_client
24import json
26from google.auth import _exponential_backoff
27from google.auth import _helpers
28from google.auth import credentials
29from google.auth import crypt
30from google.auth import exceptions
31from google.auth.transport import _mtls_helper
33IAM_RETRY_CODES = {
34 http_client.INTERNAL_SERVER_ERROR,
35 http_client.BAD_GATEWAY,
36 http_client.SERVICE_UNAVAILABLE,
37 http_client.GATEWAY_TIMEOUT,
38}
40_IAM_SCOPE = ["https://www.googleapis.com/auth/iam"]
42# Determine if we should use mTLS.
43if (
44 hasattr(_mtls_helper, "check_use_client_cert")
45 and _mtls_helper.check_use_client_cert()
46):
47 # Construct the template domain using the library's DEFAULT_UNIVERSE_DOMAIN constant.
48 _IAM_DOMAIN = f"iamcredentials.mtls.{credentials.DEFAULT_UNIVERSE_DOMAIN}"
49else:
50 _IAM_DOMAIN = f"iamcredentials.{credentials.DEFAULT_UNIVERSE_DOMAIN}"
52# 3. Create the common base URL template
53# We use double brackets {{}} so .format() can be called later for the email.
54_IAM_BASE_URL = f"https://{_IAM_DOMAIN}/v1/projects/-/serviceAccounts/{{}}"
56# 4. Define the endpoints as templates
57_IAM_ENDPOINT = _IAM_BASE_URL + ":generateAccessToken"
58_IAM_SIGN_ENDPOINT = _IAM_BASE_URL + ":signBlob"
59_IAM_SIGNJWT_ENDPOINT = _IAM_BASE_URL + ":signJwt"
60_IAM_IDTOKEN_ENDPOINT = _IAM_BASE_URL + ":generateIdToken"
63class Signer(crypt.Signer):
64 """Signs messages using the IAM `signBlob API`_.
66 This is useful when you need to sign bytes but do not have access to the
67 credential's private key file.
69 .. _signBlob API:
70 https://cloud.google.com/iam/reference/rest/v1/projects.serviceAccounts
71 /signBlob
72 """
74 def __init__(self, request, credentials, service_account_email):
75 """
76 Args:
77 request (google.auth.transport.Request): The object used to make
78 HTTP requests.
79 credentials (google.auth.credentials.Credentials): The credentials
80 that will be used to authenticate the request to the IAM API.
81 The credentials must have of one the following scopes:
83 - https://www.googleapis.com/auth/iam
84 - https://www.googleapis.com/auth/cloud-platform
85 service_account_email (str): The service account email identifying
86 which service account to use to sign bytes. Often, this can
87 be the same as the service account email in the given
88 credentials.
89 """
90 self._request = request
91 self._credentials = credentials
92 self._service_account_email = service_account_email
94 def _make_signing_request(self, message):
95 """Makes a request to the API signBlob API."""
96 message = _helpers.to_bytes(message)
98 method = "POST"
99 url = _IAM_SIGN_ENDPOINT.replace(
100 credentials.DEFAULT_UNIVERSE_DOMAIN, self._credentials.universe_domain
101 ).format(self._service_account_email)
102 headers = {"Content-Type": "application/json"}
103 body = json.dumps(
104 {"payload": base64.b64encode(message).decode("utf-8")}
105 ).encode("utf-8")
107 retries = _exponential_backoff.ExponentialBackoff()
108 for _ in retries:
109 self._credentials.before_request(self._request, method, url, headers)
111 response = self._request(url=url, method=method, body=body, headers=headers)
113 if response.status in IAM_RETRY_CODES:
114 continue
116 if response.status != http_client.OK:
117 raise exceptions.TransportError(
118 "Error calling the IAM signBlob API: {}".format(response.data)
119 )
121 return json.loads(response.data.decode("utf-8"))
122 raise exceptions.TransportError("exhausted signBlob endpoint retries")
124 @property
125 def key_id(self):
126 """Optional[str]: The key ID used to identify this private key.
128 .. warning::
129 This is always ``None``. The key ID used by IAM can not
130 be reliably determined ahead of time.
131 """
132 return None
134 @_helpers.copy_docstring(crypt.Signer)
135 def sign(self, message):
136 response = self._make_signing_request(message)
137 return base64.b64decode(response["signedBlob"])