1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import abc
17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union
18
19import google.api_core
20from google.api_core import exceptions as core_exceptions
21from google.api_core import gapic_v1
22from google.api_core import retry as retries
23import google.auth # type: ignore
24from google.auth import credentials as ga_credentials # type: ignore
25from google.cloud.location import locations_pb2 # type: ignore
26from google.iam.v1 import iam_policy_pb2 # type: ignore
27from google.iam.v1 import policy_pb2 # type: ignore
28from google.oauth2 import service_account # type: ignore
29import google.protobuf
30from google.protobuf import empty_pb2 # type: ignore
31
32from google.cloud.tasks_v2 import gapic_version as package_version
33from google.cloud.tasks_v2.types import cloudtasks
34from google.cloud.tasks_v2.types import queue
35from google.cloud.tasks_v2.types import queue as gct_queue
36from google.cloud.tasks_v2.types import task
37from google.cloud.tasks_v2.types import task as gct_task
38
39DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
40 gapic_version=package_version.__version__
41)
42
43if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER
44 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__
45
46
47class CloudTasksTransport(abc.ABC):
48 """Abstract transport class for CloudTasks."""
49
50 AUTH_SCOPES = ("https://www.googleapis.com/auth/cloud-platform",)
51
52 DEFAULT_HOST: str = "cloudtasks.googleapis.com"
53
54 def __init__(
55 self,
56 *,
57 host: str = DEFAULT_HOST,
58 credentials: Optional[ga_credentials.Credentials] = None,
59 credentials_file: Optional[str] = None,
60 scopes: Optional[Sequence[str]] = None,
61 quota_project_id: Optional[str] = None,
62 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
63 always_use_jwt_access: Optional[bool] = False,
64 api_audience: Optional[str] = None,
65 **kwargs,
66 ) -> None:
67 """Instantiate the transport.
68
69 Args:
70 host (Optional[str]):
71 The hostname to connect to (default: 'cloudtasks.googleapis.com').
72 credentials (Optional[google.auth.credentials.Credentials]): The
73 authorization credentials to attach to requests. These
74 credentials identify the application to the service; if none
75 are specified, the client will attempt to ascertain the
76 credentials from the environment.
77 credentials_file (Optional[str]): A file with credentials that can
78 be loaded with :func:`google.auth.load_credentials_from_file`.
79 This argument is mutually exclusive with credentials.
80 scopes (Optional[Sequence[str]]): A list of scopes.
81 quota_project_id (Optional[str]): An optional project to use for billing
82 and quota.
83 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
84 The client info used to send a user-agent string along with
85 API requests. If ``None``, then default info will be used.
86 Generally, you only need to set this if you're developing
87 your own client library.
88 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
89 be used for service account credentials.
90 """
91
92 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES}
93
94 # Save the scopes.
95 self._scopes = scopes
96 if not hasattr(self, "_ignore_credentials"):
97 self._ignore_credentials: bool = False
98
99 # If no credentials are provided, then determine the appropriate
100 # defaults.
101 if credentials and credentials_file:
102 raise core_exceptions.DuplicateCredentialArgs(
103 "'credentials_file' and 'credentials' are mutually exclusive"
104 )
105
106 if credentials_file is not None:
107 credentials, _ = google.auth.load_credentials_from_file(
108 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id
109 )
110 elif credentials is None and not self._ignore_credentials:
111 credentials, _ = google.auth.default(
112 **scopes_kwargs, quota_project_id=quota_project_id
113 )
114 # Don't apply audience if the credentials file passed from user.
115 if hasattr(credentials, "with_gdch_audience"):
116 credentials = credentials.with_gdch_audience(
117 api_audience if api_audience else host
118 )
119
120 # If the credentials are service account credentials, then always try to use self signed JWT.
121 if (
122 always_use_jwt_access
123 and isinstance(credentials, service_account.Credentials)
124 and hasattr(service_account.Credentials, "with_always_use_jwt_access")
125 ):
126 credentials = credentials.with_always_use_jwt_access(True)
127
128 # Save the credentials.
129 self._credentials = credentials
130
131 # Save the hostname. Default to port 443 (HTTPS) if none is specified.
132 if ":" not in host:
133 host += ":443"
134 self._host = host
135
136 @property
137 def host(self):
138 return self._host
139
140 def _prep_wrapped_messages(self, client_info):
141 # Precompute the wrapped methods.
142 self._wrapped_methods = {
143 self.list_queues: gapic_v1.method.wrap_method(
144 self.list_queues,
145 default_retry=retries.Retry(
146 initial=0.1,
147 maximum=10.0,
148 multiplier=1.3,
149 predicate=retries.if_exception_type(
150 core_exceptions.DeadlineExceeded,
151 core_exceptions.ServiceUnavailable,
152 ),
153 deadline=20.0,
154 ),
155 default_timeout=20.0,
156 client_info=client_info,
157 ),
158 self.get_queue: gapic_v1.method.wrap_method(
159 self.get_queue,
160 default_retry=retries.Retry(
161 initial=0.1,
162 maximum=10.0,
163 multiplier=1.3,
164 predicate=retries.if_exception_type(
165 core_exceptions.DeadlineExceeded,
166 core_exceptions.ServiceUnavailable,
167 ),
168 deadline=20.0,
169 ),
170 default_timeout=20.0,
171 client_info=client_info,
172 ),
173 self.create_queue: gapic_v1.method.wrap_method(
174 self.create_queue,
175 default_timeout=20.0,
176 client_info=client_info,
177 ),
178 self.update_queue: gapic_v1.method.wrap_method(
179 self.update_queue,
180 default_timeout=20.0,
181 client_info=client_info,
182 ),
183 self.delete_queue: gapic_v1.method.wrap_method(
184 self.delete_queue,
185 default_retry=retries.Retry(
186 initial=0.1,
187 maximum=10.0,
188 multiplier=1.3,
189 predicate=retries.if_exception_type(
190 core_exceptions.DeadlineExceeded,
191 core_exceptions.ServiceUnavailable,
192 ),
193 deadline=20.0,
194 ),
195 default_timeout=20.0,
196 client_info=client_info,
197 ),
198 self.purge_queue: gapic_v1.method.wrap_method(
199 self.purge_queue,
200 default_timeout=20.0,
201 client_info=client_info,
202 ),
203 self.pause_queue: gapic_v1.method.wrap_method(
204 self.pause_queue,
205 default_timeout=20.0,
206 client_info=client_info,
207 ),
208 self.resume_queue: gapic_v1.method.wrap_method(
209 self.resume_queue,
210 default_timeout=20.0,
211 client_info=client_info,
212 ),
213 self.get_iam_policy: gapic_v1.method.wrap_method(
214 self.get_iam_policy,
215 default_retry=retries.Retry(
216 initial=0.1,
217 maximum=10.0,
218 multiplier=1.3,
219 predicate=retries.if_exception_type(
220 core_exceptions.DeadlineExceeded,
221 core_exceptions.ServiceUnavailable,
222 ),
223 deadline=20.0,
224 ),
225 default_timeout=20.0,
226 client_info=client_info,
227 ),
228 self.set_iam_policy: gapic_v1.method.wrap_method(
229 self.set_iam_policy,
230 default_timeout=20.0,
231 client_info=client_info,
232 ),
233 self.test_iam_permissions: gapic_v1.method.wrap_method(
234 self.test_iam_permissions,
235 default_retry=retries.Retry(
236 initial=0.1,
237 maximum=10.0,
238 multiplier=1.3,
239 predicate=retries.if_exception_type(
240 core_exceptions.DeadlineExceeded,
241 core_exceptions.ServiceUnavailable,
242 ),
243 deadline=20.0,
244 ),
245 default_timeout=20.0,
246 client_info=client_info,
247 ),
248 self.list_tasks: gapic_v1.method.wrap_method(
249 self.list_tasks,
250 default_retry=retries.Retry(
251 initial=0.1,
252 maximum=10.0,
253 multiplier=1.3,
254 predicate=retries.if_exception_type(
255 core_exceptions.DeadlineExceeded,
256 core_exceptions.ServiceUnavailable,
257 ),
258 deadline=20.0,
259 ),
260 default_timeout=20.0,
261 client_info=client_info,
262 ),
263 self.get_task: gapic_v1.method.wrap_method(
264 self.get_task,
265 default_retry=retries.Retry(
266 initial=0.1,
267 maximum=10.0,
268 multiplier=1.3,
269 predicate=retries.if_exception_type(
270 core_exceptions.DeadlineExceeded,
271 core_exceptions.ServiceUnavailable,
272 ),
273 deadline=20.0,
274 ),
275 default_timeout=20.0,
276 client_info=client_info,
277 ),
278 self.create_task: gapic_v1.method.wrap_method(
279 self.create_task,
280 default_timeout=20.0,
281 client_info=client_info,
282 ),
283 self.delete_task: gapic_v1.method.wrap_method(
284 self.delete_task,
285 default_retry=retries.Retry(
286 initial=0.1,
287 maximum=10.0,
288 multiplier=1.3,
289 predicate=retries.if_exception_type(
290 core_exceptions.DeadlineExceeded,
291 core_exceptions.ServiceUnavailable,
292 ),
293 deadline=20.0,
294 ),
295 default_timeout=20.0,
296 client_info=client_info,
297 ),
298 self.run_task: gapic_v1.method.wrap_method(
299 self.run_task,
300 default_timeout=20.0,
301 client_info=client_info,
302 ),
303 self.get_location: gapic_v1.method.wrap_method(
304 self.get_location,
305 default_timeout=None,
306 client_info=client_info,
307 ),
308 self.list_locations: gapic_v1.method.wrap_method(
309 self.list_locations,
310 default_timeout=None,
311 client_info=client_info,
312 ),
313 }
314
315 def close(self):
316 """Closes resources associated with the transport.
317
318 .. warning::
319 Only call this method if the transport is NOT shared
320 with other clients - this may cause errors in other clients!
321 """
322 raise NotImplementedError()
323
324 @property
325 def list_queues(
326 self,
327 ) -> Callable[
328 [cloudtasks.ListQueuesRequest],
329 Union[cloudtasks.ListQueuesResponse, Awaitable[cloudtasks.ListQueuesResponse]],
330 ]:
331 raise NotImplementedError()
332
333 @property
334 def get_queue(
335 self,
336 ) -> Callable[
337 [cloudtasks.GetQueueRequest], Union[queue.Queue, Awaitable[queue.Queue]]
338 ]:
339 raise NotImplementedError()
340
341 @property
342 def create_queue(
343 self,
344 ) -> Callable[
345 [cloudtasks.CreateQueueRequest],
346 Union[gct_queue.Queue, Awaitable[gct_queue.Queue]],
347 ]:
348 raise NotImplementedError()
349
350 @property
351 def update_queue(
352 self,
353 ) -> Callable[
354 [cloudtasks.UpdateQueueRequest],
355 Union[gct_queue.Queue, Awaitable[gct_queue.Queue]],
356 ]:
357 raise NotImplementedError()
358
359 @property
360 def delete_queue(
361 self,
362 ) -> Callable[
363 [cloudtasks.DeleteQueueRequest],
364 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]],
365 ]:
366 raise NotImplementedError()
367
368 @property
369 def purge_queue(
370 self,
371 ) -> Callable[
372 [cloudtasks.PurgeQueueRequest], Union[queue.Queue, Awaitable[queue.Queue]]
373 ]:
374 raise NotImplementedError()
375
376 @property
377 def pause_queue(
378 self,
379 ) -> Callable[
380 [cloudtasks.PauseQueueRequest], Union[queue.Queue, Awaitable[queue.Queue]]
381 ]:
382 raise NotImplementedError()
383
384 @property
385 def resume_queue(
386 self,
387 ) -> Callable[
388 [cloudtasks.ResumeQueueRequest], Union[queue.Queue, Awaitable[queue.Queue]]
389 ]:
390 raise NotImplementedError()
391
392 @property
393 def get_iam_policy(
394 self,
395 ) -> Callable[
396 [iam_policy_pb2.GetIamPolicyRequest],
397 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]],
398 ]:
399 raise NotImplementedError()
400
401 @property
402 def set_iam_policy(
403 self,
404 ) -> Callable[
405 [iam_policy_pb2.SetIamPolicyRequest],
406 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]],
407 ]:
408 raise NotImplementedError()
409
410 @property
411 def test_iam_permissions(
412 self,
413 ) -> Callable[
414 [iam_policy_pb2.TestIamPermissionsRequest],
415 Union[
416 iam_policy_pb2.TestIamPermissionsResponse,
417 Awaitable[iam_policy_pb2.TestIamPermissionsResponse],
418 ],
419 ]:
420 raise NotImplementedError()
421
422 @property
423 def list_tasks(
424 self,
425 ) -> Callable[
426 [cloudtasks.ListTasksRequest],
427 Union[cloudtasks.ListTasksResponse, Awaitable[cloudtasks.ListTasksResponse]],
428 ]:
429 raise NotImplementedError()
430
431 @property
432 def get_task(
433 self,
434 ) -> Callable[[cloudtasks.GetTaskRequest], Union[task.Task, Awaitable[task.Task]]]:
435 raise NotImplementedError()
436
437 @property
438 def create_task(
439 self,
440 ) -> Callable[
441 [cloudtasks.CreateTaskRequest], Union[gct_task.Task, Awaitable[gct_task.Task]]
442 ]:
443 raise NotImplementedError()
444
445 @property
446 def delete_task(
447 self,
448 ) -> Callable[
449 [cloudtasks.DeleteTaskRequest],
450 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]],
451 ]:
452 raise NotImplementedError()
453
454 @property
455 def run_task(
456 self,
457 ) -> Callable[[cloudtasks.RunTaskRequest], Union[task.Task, Awaitable[task.Task]]]:
458 raise NotImplementedError()
459
460 @property
461 def get_location(
462 self,
463 ) -> Callable[
464 [locations_pb2.GetLocationRequest],
465 Union[locations_pb2.Location, Awaitable[locations_pb2.Location]],
466 ]:
467 raise NotImplementedError()
468
469 @property
470 def list_locations(
471 self,
472 ) -> Callable[
473 [locations_pb2.ListLocationsRequest],
474 Union[
475 locations_pb2.ListLocationsResponse,
476 Awaitable[locations_pb2.ListLocationsResponse],
477 ],
478 ]:
479 raise NotImplementedError()
480
481 @property
482 def kind(self) -> str:
483 raise NotImplementedError()
484
485
486__all__ = ("CloudTasksTransport",)