1# -*- coding: utf-8 -*-
2# Copyright 2024 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import abc
17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union
18
19from google.pubsub_v1 import gapic_version as package_version
20
21import google.auth # type: ignore
22import google.api_core
23from google.api_core import exceptions as core_exceptions
24from google.api_core import gapic_v1
25from google.api_core import retry as retries
26from google.auth import credentials as ga_credentials # type: ignore
27from google.oauth2 import service_account # type: ignore
28
29from google.iam.v1 import iam_policy_pb2 # type: ignore
30from google.iam.v1 import policy_pb2 # type: ignore
31from google.protobuf import empty_pb2 # type: ignore
32from google.pubsub_v1.types import schema
33from google.pubsub_v1.types import schema as gp_schema
34
35DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
36 client_library_version=package_version.__version__
37)
38
39
40class SchemaServiceTransport(abc.ABC):
41 """Abstract transport class for SchemaService."""
42
43 AUTH_SCOPES = (
44 "https://www.googleapis.com/auth/cloud-platform",
45 "https://www.googleapis.com/auth/pubsub",
46 )
47
48 DEFAULT_HOST: str = "pubsub.googleapis.com"
49
50 def __init__(
51 self,
52 *,
53 host: str = DEFAULT_HOST,
54 credentials: Optional[ga_credentials.Credentials] = None,
55 credentials_file: Optional[str] = None,
56 scopes: Optional[Sequence[str]] = None,
57 quota_project_id: Optional[str] = None,
58 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
59 always_use_jwt_access: Optional[bool] = False,
60 api_audience: Optional[str] = None,
61 **kwargs,
62 ) -> None:
63 """Instantiate the transport.
64
65 Args:
66 host (Optional[str]):
67 The hostname to connect to (default: 'pubsub.googleapis.com').
68 credentials (Optional[google.auth.credentials.Credentials]): The
69 authorization credentials to attach to requests. These
70 credentials identify the application to the service; if none
71 are specified, the client will attempt to ascertain the
72 credentials from the environment.
73 credentials_file (Optional[str]): A file with credentials that can
74 be loaded with :func:`google.auth.load_credentials_from_file`.
75 This argument is mutually exclusive with credentials.
76 scopes (Optional[Sequence[str]]): A list of scopes.
77 quota_project_id (Optional[str]): An optional project to use for billing
78 and quota.
79 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
80 The client info used to send a user-agent string along with
81 API requests. If ``None``, then default info will be used.
82 Generally, you only need to set this if you're developing
83 your own client library.
84 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
85 be used for service account credentials.
86 """
87
88 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES}
89
90 # Save the scopes.
91 self._scopes = scopes
92 if not hasattr(self, "_ignore_credentials"):
93 self._ignore_credentials: bool = False
94
95 # If no credentials are provided, then determine the appropriate
96 # defaults.
97 if credentials and credentials_file:
98 raise core_exceptions.DuplicateCredentialArgs(
99 "'credentials_file' and 'credentials' are mutually exclusive"
100 )
101
102 if credentials_file is not None:
103 credentials, _ = google.auth.load_credentials_from_file(
104 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id
105 )
106 elif credentials is None and not self._ignore_credentials:
107 credentials, _ = google.auth.default(
108 **scopes_kwargs, quota_project_id=quota_project_id
109 )
110 # Don't apply audience if the credentials file passed from user.
111 if hasattr(credentials, "with_gdch_audience"):
112 credentials = credentials.with_gdch_audience(
113 api_audience if api_audience else host
114 )
115
116 # If the credentials are service account credentials, then always try to use self signed JWT.
117 if (
118 always_use_jwt_access
119 and isinstance(credentials, service_account.Credentials)
120 and hasattr(service_account.Credentials, "with_always_use_jwt_access")
121 ):
122 credentials = credentials.with_always_use_jwt_access(True)
123
124 # Save the credentials.
125 self._credentials = credentials
126
127 # Save the hostname. Default to port 443 (HTTPS) if none is specified.
128 if ":" not in host:
129 host += ":443"
130 self._host = host
131
132 @property
133 def host(self):
134 return self._host
135
136 def _prep_wrapped_messages(self, client_info):
137 # Precompute the wrapped methods.
138 self._wrapped_methods = {
139 self.create_schema: gapic_v1.method.wrap_method(
140 self.create_schema,
141 default_retry=retries.Retry(
142 initial=0.1,
143 maximum=60.0,
144 multiplier=1.3,
145 predicate=retries.if_exception_type(
146 core_exceptions.ServiceUnavailable,
147 ),
148 deadline=60.0,
149 ),
150 default_timeout=60.0,
151 client_info=client_info,
152 ),
153 self.get_schema: gapic_v1.method.wrap_method(
154 self.get_schema,
155 default_retry=retries.Retry(
156 initial=0.1,
157 maximum=60.0,
158 multiplier=1.3,
159 predicate=retries.if_exception_type(
160 core_exceptions.ServiceUnavailable,
161 ),
162 deadline=60.0,
163 ),
164 default_timeout=60.0,
165 client_info=client_info,
166 ),
167 self.list_schemas: gapic_v1.method.wrap_method(
168 self.list_schemas,
169 default_retry=retries.Retry(
170 initial=0.1,
171 maximum=60.0,
172 multiplier=1.3,
173 predicate=retries.if_exception_type(
174 core_exceptions.ServiceUnavailable,
175 ),
176 deadline=60.0,
177 ),
178 default_timeout=60.0,
179 client_info=client_info,
180 ),
181 self.list_schema_revisions: gapic_v1.method.wrap_method(
182 self.list_schema_revisions,
183 default_retry=retries.Retry(
184 initial=0.1,
185 maximum=60.0,
186 multiplier=1.3,
187 predicate=retries.if_exception_type(
188 core_exceptions.ServiceUnavailable,
189 ),
190 deadline=60.0,
191 ),
192 default_timeout=60.0,
193 client_info=client_info,
194 ),
195 self.commit_schema: gapic_v1.method.wrap_method(
196 self.commit_schema,
197 default_retry=retries.Retry(
198 initial=0.1,
199 maximum=60.0,
200 multiplier=1.3,
201 predicate=retries.if_exception_type(
202 core_exceptions.ServiceUnavailable,
203 ),
204 deadline=60.0,
205 ),
206 default_timeout=60.0,
207 client_info=client_info,
208 ),
209 self.rollback_schema: gapic_v1.method.wrap_method(
210 self.rollback_schema,
211 default_retry=retries.Retry(
212 initial=0.1,
213 maximum=60.0,
214 multiplier=1.3,
215 predicate=retries.if_exception_type(
216 core_exceptions.ServiceUnavailable,
217 ),
218 deadline=60.0,
219 ),
220 default_timeout=60.0,
221 client_info=client_info,
222 ),
223 self.delete_schema_revision: gapic_v1.method.wrap_method(
224 self.delete_schema_revision,
225 default_retry=retries.Retry(
226 initial=0.1,
227 maximum=60.0,
228 multiplier=1.3,
229 predicate=retries.if_exception_type(
230 core_exceptions.ServiceUnavailable,
231 ),
232 deadline=60.0,
233 ),
234 default_timeout=60.0,
235 client_info=client_info,
236 ),
237 self.delete_schema: gapic_v1.method.wrap_method(
238 self.delete_schema,
239 default_retry=retries.Retry(
240 initial=0.1,
241 maximum=60.0,
242 multiplier=1.3,
243 predicate=retries.if_exception_type(
244 core_exceptions.ServiceUnavailable,
245 ),
246 deadline=60.0,
247 ),
248 default_timeout=60.0,
249 client_info=client_info,
250 ),
251 self.validate_schema: gapic_v1.method.wrap_method(
252 self.validate_schema,
253 default_retry=retries.Retry(
254 initial=0.1,
255 maximum=60.0,
256 multiplier=1.3,
257 predicate=retries.if_exception_type(
258 core_exceptions.ServiceUnavailable,
259 ),
260 deadline=60.0,
261 ),
262 default_timeout=60.0,
263 client_info=client_info,
264 ),
265 self.validate_message: gapic_v1.method.wrap_method(
266 self.validate_message,
267 default_retry=retries.Retry(
268 initial=0.1,
269 maximum=60.0,
270 multiplier=1.3,
271 predicate=retries.if_exception_type(
272 core_exceptions.ServiceUnavailable,
273 ),
274 deadline=60.0,
275 ),
276 default_timeout=60.0,
277 client_info=client_info,
278 ),
279 self.get_iam_policy: gapic_v1.method.wrap_method(
280 self.get_iam_policy,
281 default_timeout=None,
282 client_info=client_info,
283 ),
284 self.set_iam_policy: gapic_v1.method.wrap_method(
285 self.set_iam_policy,
286 default_timeout=None,
287 client_info=client_info,
288 ),
289 self.test_iam_permissions: gapic_v1.method.wrap_method(
290 self.test_iam_permissions,
291 default_timeout=None,
292 client_info=client_info,
293 ),
294 }
295
296 def close(self):
297 """Closes resources associated with the transport.
298
299 .. warning::
300 Only call this method if the transport is NOT shared
301 with other clients - this may cause errors in other clients!
302 """
303 raise NotImplementedError()
304
305 @property
306 def create_schema(
307 self,
308 ) -> Callable[
309 [gp_schema.CreateSchemaRequest],
310 Union[gp_schema.Schema, Awaitable[gp_schema.Schema]],
311 ]:
312 raise NotImplementedError()
313
314 @property
315 def get_schema(
316 self,
317 ) -> Callable[
318 [schema.GetSchemaRequest], Union[schema.Schema, Awaitable[schema.Schema]]
319 ]:
320 raise NotImplementedError()
321
322 @property
323 def list_schemas(
324 self,
325 ) -> Callable[
326 [schema.ListSchemasRequest],
327 Union[schema.ListSchemasResponse, Awaitable[schema.ListSchemasResponse]],
328 ]:
329 raise NotImplementedError()
330
331 @property
332 def list_schema_revisions(
333 self,
334 ) -> Callable[
335 [schema.ListSchemaRevisionsRequest],
336 Union[
337 schema.ListSchemaRevisionsResponse,
338 Awaitable[schema.ListSchemaRevisionsResponse],
339 ],
340 ]:
341 raise NotImplementedError()
342
343 @property
344 def commit_schema(
345 self,
346 ) -> Callable[
347 [gp_schema.CommitSchemaRequest],
348 Union[gp_schema.Schema, Awaitable[gp_schema.Schema]],
349 ]:
350 raise NotImplementedError()
351
352 @property
353 def rollback_schema(
354 self,
355 ) -> Callable[
356 [schema.RollbackSchemaRequest], Union[schema.Schema, Awaitable[schema.Schema]]
357 ]:
358 raise NotImplementedError()
359
360 @property
361 def delete_schema_revision(
362 self,
363 ) -> Callable[
364 [schema.DeleteSchemaRevisionRequest],
365 Union[schema.Schema, Awaitable[schema.Schema]],
366 ]:
367 raise NotImplementedError()
368
369 @property
370 def delete_schema(
371 self,
372 ) -> Callable[
373 [schema.DeleteSchemaRequest], Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]]
374 ]:
375 raise NotImplementedError()
376
377 @property
378 def validate_schema(
379 self,
380 ) -> Callable[
381 [gp_schema.ValidateSchemaRequest],
382 Union[
383 gp_schema.ValidateSchemaResponse,
384 Awaitable[gp_schema.ValidateSchemaResponse],
385 ],
386 ]:
387 raise NotImplementedError()
388
389 @property
390 def validate_message(
391 self,
392 ) -> Callable[
393 [schema.ValidateMessageRequest],
394 Union[
395 schema.ValidateMessageResponse, Awaitable[schema.ValidateMessageResponse]
396 ],
397 ]:
398 raise NotImplementedError()
399
400 @property
401 def set_iam_policy(
402 self,
403 ) -> Callable[
404 [iam_policy_pb2.SetIamPolicyRequest],
405 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]],
406 ]:
407 raise NotImplementedError()
408
409 @property
410 def get_iam_policy(
411 self,
412 ) -> Callable[
413 [iam_policy_pb2.GetIamPolicyRequest],
414 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]],
415 ]:
416 raise NotImplementedError()
417
418 @property
419 def test_iam_permissions(
420 self,
421 ) -> Callable[
422 [iam_policy_pb2.TestIamPermissionsRequest],
423 Union[
424 iam_policy_pb2.TestIamPermissionsResponse,
425 Awaitable[iam_policy_pb2.TestIamPermissionsResponse],
426 ],
427 ]:
428 raise NotImplementedError()
429
430 @property
431 def kind(self) -> str:
432 raise NotImplementedError()
433
434
435__all__ = ("SchemaServiceTransport",)