1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import abc
17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union
18
19import google.api_core
20from google.api_core import exceptions as core_exceptions
21from google.api_core import gapic_v1
22from google.api_core import retry as retries
23import google.auth # type: ignore
24from google.auth import credentials as ga_credentials # type: ignore
25from google.oauth2 import service_account # type: ignore
26import google.protobuf
27
28from google.cloud.iam_credentials_v1 import gapic_version as package_version
29from google.cloud.iam_credentials_v1.types import common
30
31DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
32 gapic_version=package_version.__version__
33)
34
35if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER
36 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__
37
38
39class IAMCredentialsTransport(abc.ABC):
40 """Abstract transport class for IAMCredentials."""
41
42 AUTH_SCOPES = ("https://www.googleapis.com/auth/cloud-platform",)
43
44 DEFAULT_HOST: str = "iamcredentials.googleapis.com"
45
46 def __init__(
47 self,
48 *,
49 host: str = DEFAULT_HOST,
50 credentials: Optional[ga_credentials.Credentials] = None,
51 credentials_file: Optional[str] = None,
52 scopes: Optional[Sequence[str]] = None,
53 quota_project_id: Optional[str] = None,
54 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
55 always_use_jwt_access: Optional[bool] = False,
56 api_audience: Optional[str] = None,
57 **kwargs,
58 ) -> None:
59 """Instantiate the transport.
60
61 Args:
62 host (Optional[str]):
63 The hostname to connect to (default: 'iamcredentials.googleapis.com').
64 credentials (Optional[google.auth.credentials.Credentials]): The
65 authorization credentials to attach to requests. These
66 credentials identify the application to the service; if none
67 are specified, the client will attempt to ascertain the
68 credentials from the environment.
69 credentials_file (Optional[str]): Deprecated. A file with credentials that can
70 be loaded with :func:`google.auth.load_credentials_from_file`.
71 This argument is mutually exclusive with credentials. This argument will be
72 removed in the next major version of this library.
73 scopes (Optional[Sequence[str]]): A list of scopes.
74 quota_project_id (Optional[str]): An optional project to use for billing
75 and quota.
76 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
77 The client info used to send a user-agent string along with
78 API requests. If ``None``, then default info will be used.
79 Generally, you only need to set this if you're developing
80 your own client library.
81 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
82 be used for service account credentials.
83 """
84
85 # Save the scopes.
86 self._scopes = scopes
87 if not hasattr(self, "_ignore_credentials"):
88 self._ignore_credentials: bool = False
89
90 # If no credentials are provided, then determine the appropriate
91 # defaults.
92 if credentials and credentials_file:
93 raise core_exceptions.DuplicateCredentialArgs(
94 "'credentials_file' and 'credentials' are mutually exclusive"
95 )
96
97 if credentials_file is not None:
98 credentials, _ = google.auth.load_credentials_from_file(
99 credentials_file,
100 scopes=scopes,
101 quota_project_id=quota_project_id,
102 default_scopes=self.AUTH_SCOPES,
103 )
104 elif credentials is None and not self._ignore_credentials:
105 credentials, _ = google.auth.default(
106 scopes=scopes,
107 quota_project_id=quota_project_id,
108 default_scopes=self.AUTH_SCOPES,
109 )
110 # Don't apply audience if the credentials file passed from user.
111 if hasattr(credentials, "with_gdch_audience"):
112 credentials = credentials.with_gdch_audience(
113 api_audience if api_audience else host
114 )
115
116 # If the credentials are service account credentials, then always try to use self signed JWT.
117 if (
118 always_use_jwt_access
119 and isinstance(credentials, service_account.Credentials)
120 and hasattr(service_account.Credentials, "with_always_use_jwt_access")
121 ):
122 credentials = credentials.with_always_use_jwt_access(True)
123
124 # Save the credentials.
125 self._credentials = credentials
126
127 # Save the hostname. Default to port 443 (HTTPS) if none is specified.
128 if ":" not in host:
129 host += ":443"
130 self._host = host
131
132 @property
133 def host(self):
134 return self._host
135
136 def _prep_wrapped_messages(self, client_info):
137 # Precompute the wrapped methods.
138 self._wrapped_methods = {
139 self.generate_access_token: gapic_v1.method.wrap_method(
140 self.generate_access_token,
141 default_retry=retries.Retry(
142 initial=0.1,
143 maximum=60.0,
144 multiplier=1.3,
145 predicate=retries.if_exception_type(
146 core_exceptions.DeadlineExceeded,
147 core_exceptions.ServiceUnavailable,
148 ),
149 deadline=60.0,
150 ),
151 default_timeout=60.0,
152 client_info=client_info,
153 ),
154 self.generate_id_token: gapic_v1.method.wrap_method(
155 self.generate_id_token,
156 default_retry=retries.Retry(
157 initial=0.1,
158 maximum=60.0,
159 multiplier=1.3,
160 predicate=retries.if_exception_type(
161 core_exceptions.DeadlineExceeded,
162 core_exceptions.ServiceUnavailable,
163 ),
164 deadline=60.0,
165 ),
166 default_timeout=60.0,
167 client_info=client_info,
168 ),
169 self.sign_blob: gapic_v1.method.wrap_method(
170 self.sign_blob,
171 default_retry=retries.Retry(
172 initial=0.1,
173 maximum=60.0,
174 multiplier=1.3,
175 predicate=retries.if_exception_type(
176 core_exceptions.DeadlineExceeded,
177 core_exceptions.ServiceUnavailable,
178 ),
179 deadline=60.0,
180 ),
181 default_timeout=60.0,
182 client_info=client_info,
183 ),
184 self.sign_jwt: gapic_v1.method.wrap_method(
185 self.sign_jwt,
186 default_retry=retries.Retry(
187 initial=0.1,
188 maximum=60.0,
189 multiplier=1.3,
190 predicate=retries.if_exception_type(
191 core_exceptions.DeadlineExceeded,
192 core_exceptions.ServiceUnavailable,
193 ),
194 deadline=60.0,
195 ),
196 default_timeout=60.0,
197 client_info=client_info,
198 ),
199 }
200
201 def close(self):
202 """Closes resources associated with the transport.
203
204 .. warning::
205 Only call this method if the transport is NOT shared
206 with other clients - this may cause errors in other clients!
207 """
208 raise NotImplementedError()
209
210 @property
211 def generate_access_token(
212 self,
213 ) -> Callable[
214 [common.GenerateAccessTokenRequest],
215 Union[
216 common.GenerateAccessTokenResponse,
217 Awaitable[common.GenerateAccessTokenResponse],
218 ],
219 ]:
220 raise NotImplementedError()
221
222 @property
223 def generate_id_token(
224 self,
225 ) -> Callable[
226 [common.GenerateIdTokenRequest],
227 Union[
228 common.GenerateIdTokenResponse, Awaitable[common.GenerateIdTokenResponse]
229 ],
230 ]:
231 raise NotImplementedError()
232
233 @property
234 def sign_blob(
235 self,
236 ) -> Callable[
237 [common.SignBlobRequest],
238 Union[common.SignBlobResponse, Awaitable[common.SignBlobResponse]],
239 ]:
240 raise NotImplementedError()
241
242 @property
243 def sign_jwt(
244 self,
245 ) -> Callable[
246 [common.SignJwtRequest],
247 Union[common.SignJwtResponse, Awaitable[common.SignJwtResponse]],
248 ]:
249 raise NotImplementedError()
250
251 @property
252 def kind(self) -> str:
253 raise NotImplementedError()
254
255
256__all__ = ("IAMCredentialsTransport",)