1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import abc
17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union
18
19import google.api_core
20import google.auth # type: ignore
21import google.protobuf
22from google.api_core import exceptions as core_exceptions
23from google.api_core import gapic_v1
24from google.api_core import retry as retries
25from google.auth import credentials as ga_credentials # type: ignore
26from google.oauth2 import service_account # type: ignore
27
28from google.cloud.iam_credentials_v1 import gapic_version as package_version
29from google.cloud.iam_credentials_v1.types import common
30
31DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
32 gapic_version=package_version.__version__
33)
34
35if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER
36 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__
37
38
39class IAMCredentialsTransport(abc.ABC):
40 """Abstract transport class for IAMCredentials."""
41
42 AUTH_SCOPES = ("https://www.googleapis.com/auth/cloud-platform",)
43
44 DEFAULT_HOST: str = "iamcredentials.googleapis.com"
45
46 def __init__(
47 self,
48 *,
49 host: str = DEFAULT_HOST,
50 credentials: Optional[ga_credentials.Credentials] = None,
51 credentials_file: Optional[str] = None,
52 scopes: Optional[Sequence[str]] = None,
53 quota_project_id: Optional[str] = None,
54 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
55 always_use_jwt_access: Optional[bool] = False,
56 api_audience: Optional[str] = None,
57 **kwargs,
58 ) -> None:
59 """Instantiate the transport.
60
61 Args:
62 host (Optional[str]):
63 The hostname to connect to (default: 'iamcredentials.googleapis.com').
64 credentials (Optional[google.auth.credentials.Credentials]): The
65 authorization credentials to attach to requests. These
66 credentials identify the application to the service; if none
67 are specified, the client will attempt to ascertain the
68 credentials from the environment.
69 credentials_file (Optional[str]): Deprecated. A file with credentials that can
70 be loaded with :func:`google.auth.load_credentials_from_file`.
71 This argument is mutually exclusive with credentials. This argument will be
72 removed in the next major version of this library.
73 scopes (Optional[Sequence[str]]): A list of scopes.
74 quota_project_id (Optional[str]): An optional project to use for billing
75 and quota.
76 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
77 The client info used to send a user-agent string along with
78 API requests. If ``None``, then default info will be used.
79 Generally, you only need to set this if you're developing
80 your own client library.
81 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
82 be used for service account credentials.
83 api_audience (Optional[str]): The intended audience for the API calls
84 to the service that will be set when using certain 3rd party
85 authentication flows. Audience is typically a resource identifier.
86 If not set, the host value will be used as a default.
87 """
88
89 # Save the scopes.
90 self._scopes = scopes
91 if not hasattr(self, "_ignore_credentials"):
92 self._ignore_credentials: bool = False
93
94 # If no credentials are provided, then determine the appropriate
95 # defaults.
96 if credentials and credentials_file:
97 raise core_exceptions.DuplicateCredentialArgs(
98 "'credentials_file' and 'credentials' are mutually exclusive"
99 )
100
101 if credentials_file is not None:
102 credentials, _ = google.auth.load_credentials_from_file(
103 credentials_file,
104 scopes=scopes,
105 quota_project_id=quota_project_id,
106 default_scopes=self.AUTH_SCOPES,
107 )
108 elif credentials is None and not self._ignore_credentials:
109 credentials, _ = google.auth.default(
110 scopes=scopes,
111 quota_project_id=quota_project_id,
112 default_scopes=self.AUTH_SCOPES,
113 )
114 # Don't apply audience if the credentials file passed from user.
115 if hasattr(credentials, "with_gdch_audience"):
116 credentials = credentials.with_gdch_audience(
117 api_audience if api_audience else host
118 )
119
120 # If the credentials are service account credentials, then always try to use self signed JWT.
121 if (
122 always_use_jwt_access
123 and isinstance(credentials, service_account.Credentials)
124 and hasattr(service_account.Credentials, "with_always_use_jwt_access")
125 ):
126 credentials = credentials.with_always_use_jwt_access(True)
127
128 # Save the credentials.
129 self._credentials = credentials
130
131 # Save the hostname. Default to port 443 (HTTPS) if none is specified.
132 if ":" not in host:
133 host += ":443"
134 self._host = host
135
136 self._wrapped_methods: Dict[Callable, Callable] = {}
137
138 @property
139 def host(self):
140 return self._host
141
142 def _prep_wrapped_messages(self, client_info):
143 # Precompute the wrapped methods.
144 self._wrapped_methods = {
145 self.generate_access_token: gapic_v1.method.wrap_method(
146 self.generate_access_token,
147 default_retry=retries.Retry(
148 initial=0.1,
149 maximum=60.0,
150 multiplier=1.3,
151 predicate=retries.if_exception_type(
152 core_exceptions.DeadlineExceeded,
153 core_exceptions.ServiceUnavailable,
154 ),
155 deadline=60.0,
156 ),
157 default_timeout=60.0,
158 client_info=client_info,
159 ),
160 self.generate_id_token: gapic_v1.method.wrap_method(
161 self.generate_id_token,
162 default_retry=retries.Retry(
163 initial=0.1,
164 maximum=60.0,
165 multiplier=1.3,
166 predicate=retries.if_exception_type(
167 core_exceptions.DeadlineExceeded,
168 core_exceptions.ServiceUnavailable,
169 ),
170 deadline=60.0,
171 ),
172 default_timeout=60.0,
173 client_info=client_info,
174 ),
175 self.sign_blob: gapic_v1.method.wrap_method(
176 self.sign_blob,
177 default_retry=retries.Retry(
178 initial=0.1,
179 maximum=60.0,
180 multiplier=1.3,
181 predicate=retries.if_exception_type(
182 core_exceptions.DeadlineExceeded,
183 core_exceptions.ServiceUnavailable,
184 ),
185 deadline=60.0,
186 ),
187 default_timeout=60.0,
188 client_info=client_info,
189 ),
190 self.sign_jwt: gapic_v1.method.wrap_method(
191 self.sign_jwt,
192 default_retry=retries.Retry(
193 initial=0.1,
194 maximum=60.0,
195 multiplier=1.3,
196 predicate=retries.if_exception_type(
197 core_exceptions.DeadlineExceeded,
198 core_exceptions.ServiceUnavailable,
199 ),
200 deadline=60.0,
201 ),
202 default_timeout=60.0,
203 client_info=client_info,
204 ),
205 }
206
207 def close(self):
208 """Closes resources associated with the transport.
209
210 .. warning::
211 Only call this method if the transport is NOT shared
212 with other clients - this may cause errors in other clients!
213 """
214 raise NotImplementedError()
215
216 @property
217 def generate_access_token(
218 self,
219 ) -> Callable[
220 [common.GenerateAccessTokenRequest],
221 Union[
222 common.GenerateAccessTokenResponse,
223 Awaitable[common.GenerateAccessTokenResponse],
224 ],
225 ]:
226 raise NotImplementedError()
227
228 @property
229 def generate_id_token(
230 self,
231 ) -> Callable[
232 [common.GenerateIdTokenRequest],
233 Union[
234 common.GenerateIdTokenResponse, Awaitable[common.GenerateIdTokenResponse]
235 ],
236 ]:
237 raise NotImplementedError()
238
239 @property
240 def sign_blob(
241 self,
242 ) -> Callable[
243 [common.SignBlobRequest],
244 Union[common.SignBlobResponse, Awaitable[common.SignBlobResponse]],
245 ]:
246 raise NotImplementedError()
247
248 @property
249 def sign_jwt(
250 self,
251 ) -> Callable[
252 [common.SignJwtRequest],
253 Union[common.SignJwtResponse, Awaitable[common.SignJwtResponse]],
254 ]:
255 raise NotImplementedError()
256
257 @property
258 def kind(self) -> str:
259 raise NotImplementedError()
260
261
262__all__ = ("IAMCredentialsTransport",)