1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import json # type: ignore
17import re
18from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
19
20from google.api_core import gapic_v1, path_template
21from google.cloud.location import locations_pb2 # type: ignore
22from google.iam.v1 import iam_policy_pb2 # type: ignore
23from google.iam.v1 import policy_pb2 # type: ignore
24from google.protobuf import empty_pb2 # type: ignore
25from google.protobuf import json_format
26
27from google.cloud.tasks_v2.types import cloudtasks
28from google.cloud.tasks_v2.types import queue
29from google.cloud.tasks_v2.types import queue as gct_queue
30from google.cloud.tasks_v2.types import task
31from google.cloud.tasks_v2.types import task as gct_task
32
33from .base import DEFAULT_CLIENT_INFO, CloudTasksTransport
34
35
36class _BaseCloudTasksRestTransport(CloudTasksTransport):
37 """Base REST backend transport for CloudTasks.
38
39 Note: This class is not meant to be used directly. Use its sync and
40 async sub-classes instead.
41
42 This class defines the same methods as the primary client, so the
43 primary client can load the underlying transport implementation
44 and call it.
45
46 It sends JSON representations of protocol buffers over HTTP/1.1
47 """
48
49 def __init__(
50 self,
51 *,
52 host: str = "cloudtasks.googleapis.com",
53 credentials: Optional[Any] = None,
54 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
55 always_use_jwt_access: Optional[bool] = False,
56 url_scheme: str = "https",
57 api_audience: Optional[str] = None,
58 ) -> None:
59 """Instantiate the transport.
60 Args:
61 host (Optional[str]):
62 The hostname to connect to (default: 'cloudtasks.googleapis.com').
63 credentials (Optional[Any]): The
64 authorization credentials to attach to requests. These
65 credentials identify the application to the service; if none
66 are specified, the client will attempt to ascertain the
67 credentials from the environment.
68 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
69 The client info used to send a user-agent string along with
70 API requests. If ``None``, then default info will be used.
71 Generally, you only need to set this if you are developing
72 your own client library.
73 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
74 be used for service account credentials.
75 url_scheme: the protocol scheme for the API endpoint. Normally
76 "https", but for testing or local servers,
77 "http" can be specified.
78 """
79 # Run the base constructor
80 maybe_url_match = re.match("^(?P<scheme>http(?:s)?://)?(?P<host>.*)$", host)
81 if maybe_url_match is None:
82 raise ValueError(
83 f"Unexpected hostname structure: {host}"
84 ) # pragma: NO COVER
85
86 url_match_items = maybe_url_match.groupdict()
87
88 host = f"{url_scheme}://{host}" if not url_match_items["scheme"] else host
89
90 super().__init__(
91 host=host,
92 credentials=credentials,
93 client_info=client_info,
94 always_use_jwt_access=always_use_jwt_access,
95 api_audience=api_audience,
96 )
97
98 class _BaseCreateQueue:
99 def __hash__(self): # pragma: NO COVER
100 return NotImplementedError("__hash__ must be implemented.")
101
102 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
103
104 @classmethod
105 def _get_unset_required_fields(cls, message_dict):
106 return {
107 k: v
108 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
109 if k not in message_dict
110 }
111
112 @staticmethod
113 def _get_http_options():
114 http_options: List[Dict[str, str]] = [
115 {
116 "method": "post",
117 "uri": "/v2/{parent=projects/*/locations/*}/queues",
118 "body": "queue",
119 },
120 ]
121 return http_options
122
123 @staticmethod
124 def _get_transcoded_request(http_options, request):
125 pb_request = cloudtasks.CreateQueueRequest.pb(request)
126 transcoded_request = path_template.transcode(http_options, pb_request)
127 return transcoded_request
128
129 @staticmethod
130 def _get_request_body_json(transcoded_request):
131 # Jsonify the request body
132
133 body = json_format.MessageToJson(
134 transcoded_request["body"], use_integers_for_enums=True
135 )
136 return body
137
138 @staticmethod
139 def _get_query_params_json(transcoded_request):
140 query_params = json.loads(
141 json_format.MessageToJson(
142 transcoded_request["query_params"],
143 use_integers_for_enums=True,
144 )
145 )
146 query_params.update(
147 _BaseCloudTasksRestTransport._BaseCreateQueue._get_unset_required_fields(
148 query_params
149 )
150 )
151
152 query_params["$alt"] = "json;enum-encoding=int"
153 return query_params
154
155 class _BaseCreateTask:
156 def __hash__(self): # pragma: NO COVER
157 return NotImplementedError("__hash__ must be implemented.")
158
159 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
160
161 @classmethod
162 def _get_unset_required_fields(cls, message_dict):
163 return {
164 k: v
165 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
166 if k not in message_dict
167 }
168
169 @staticmethod
170 def _get_http_options():
171 http_options: List[Dict[str, str]] = [
172 {
173 "method": "post",
174 "uri": "/v2/{parent=projects/*/locations/*/queues/*}/tasks",
175 "body": "*",
176 },
177 ]
178 return http_options
179
180 @staticmethod
181 def _get_transcoded_request(http_options, request):
182 pb_request = cloudtasks.CreateTaskRequest.pb(request)
183 transcoded_request = path_template.transcode(http_options, pb_request)
184 return transcoded_request
185
186 @staticmethod
187 def _get_request_body_json(transcoded_request):
188 # Jsonify the request body
189
190 body = json_format.MessageToJson(
191 transcoded_request["body"], use_integers_for_enums=True
192 )
193 return body
194
195 @staticmethod
196 def _get_query_params_json(transcoded_request):
197 query_params = json.loads(
198 json_format.MessageToJson(
199 transcoded_request["query_params"],
200 use_integers_for_enums=True,
201 )
202 )
203 query_params.update(
204 _BaseCloudTasksRestTransport._BaseCreateTask._get_unset_required_fields(
205 query_params
206 )
207 )
208
209 query_params["$alt"] = "json;enum-encoding=int"
210 return query_params
211
212 class _BaseDeleteQueue:
213 def __hash__(self): # pragma: NO COVER
214 return NotImplementedError("__hash__ must be implemented.")
215
216 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
217
218 @classmethod
219 def _get_unset_required_fields(cls, message_dict):
220 return {
221 k: v
222 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
223 if k not in message_dict
224 }
225
226 @staticmethod
227 def _get_http_options():
228 http_options: List[Dict[str, str]] = [
229 {
230 "method": "delete",
231 "uri": "/v2/{name=projects/*/locations/*/queues/*}",
232 },
233 ]
234 return http_options
235
236 @staticmethod
237 def _get_transcoded_request(http_options, request):
238 pb_request = cloudtasks.DeleteQueueRequest.pb(request)
239 transcoded_request = path_template.transcode(http_options, pb_request)
240 return transcoded_request
241
242 @staticmethod
243 def _get_query_params_json(transcoded_request):
244 query_params = json.loads(
245 json_format.MessageToJson(
246 transcoded_request["query_params"],
247 use_integers_for_enums=True,
248 )
249 )
250 query_params.update(
251 _BaseCloudTasksRestTransport._BaseDeleteQueue._get_unset_required_fields(
252 query_params
253 )
254 )
255
256 query_params["$alt"] = "json;enum-encoding=int"
257 return query_params
258
259 class _BaseDeleteTask:
260 def __hash__(self): # pragma: NO COVER
261 return NotImplementedError("__hash__ must be implemented.")
262
263 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
264
265 @classmethod
266 def _get_unset_required_fields(cls, message_dict):
267 return {
268 k: v
269 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
270 if k not in message_dict
271 }
272
273 @staticmethod
274 def _get_http_options():
275 http_options: List[Dict[str, str]] = [
276 {
277 "method": "delete",
278 "uri": "/v2/{name=projects/*/locations/*/queues/*/tasks/*}",
279 },
280 ]
281 return http_options
282
283 @staticmethod
284 def _get_transcoded_request(http_options, request):
285 pb_request = cloudtasks.DeleteTaskRequest.pb(request)
286 transcoded_request = path_template.transcode(http_options, pb_request)
287 return transcoded_request
288
289 @staticmethod
290 def _get_query_params_json(transcoded_request):
291 query_params = json.loads(
292 json_format.MessageToJson(
293 transcoded_request["query_params"],
294 use_integers_for_enums=True,
295 )
296 )
297 query_params.update(
298 _BaseCloudTasksRestTransport._BaseDeleteTask._get_unset_required_fields(
299 query_params
300 )
301 )
302
303 query_params["$alt"] = "json;enum-encoding=int"
304 return query_params
305
306 class _BaseGetIamPolicy:
307 def __hash__(self): # pragma: NO COVER
308 return NotImplementedError("__hash__ must be implemented.")
309
310 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
311
312 @classmethod
313 def _get_unset_required_fields(cls, message_dict):
314 return {
315 k: v
316 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
317 if k not in message_dict
318 }
319
320 @staticmethod
321 def _get_http_options():
322 http_options: List[Dict[str, str]] = [
323 {
324 "method": "post",
325 "uri": "/v2/{resource=projects/*/locations/*/queues/*}:getIamPolicy",
326 "body": "*",
327 },
328 ]
329 return http_options
330
331 @staticmethod
332 def _get_transcoded_request(http_options, request):
333 pb_request = request
334 transcoded_request = path_template.transcode(http_options, pb_request)
335 return transcoded_request
336
337 @staticmethod
338 def _get_request_body_json(transcoded_request):
339 # Jsonify the request body
340
341 body = json_format.MessageToJson(
342 transcoded_request["body"], use_integers_for_enums=True
343 )
344 return body
345
346 @staticmethod
347 def _get_query_params_json(transcoded_request):
348 query_params = json.loads(
349 json_format.MessageToJson(
350 transcoded_request["query_params"],
351 use_integers_for_enums=True,
352 )
353 )
354 query_params.update(
355 _BaseCloudTasksRestTransport._BaseGetIamPolicy._get_unset_required_fields(
356 query_params
357 )
358 )
359
360 query_params["$alt"] = "json;enum-encoding=int"
361 return query_params
362
363 class _BaseGetQueue:
364 def __hash__(self): # pragma: NO COVER
365 return NotImplementedError("__hash__ must be implemented.")
366
367 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
368
369 @classmethod
370 def _get_unset_required_fields(cls, message_dict):
371 return {
372 k: v
373 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
374 if k not in message_dict
375 }
376
377 @staticmethod
378 def _get_http_options():
379 http_options: List[Dict[str, str]] = [
380 {
381 "method": "get",
382 "uri": "/v2/{name=projects/*/locations/*/queues/*}",
383 },
384 ]
385 return http_options
386
387 @staticmethod
388 def _get_transcoded_request(http_options, request):
389 pb_request = cloudtasks.GetQueueRequest.pb(request)
390 transcoded_request = path_template.transcode(http_options, pb_request)
391 return transcoded_request
392
393 @staticmethod
394 def _get_query_params_json(transcoded_request):
395 query_params = json.loads(
396 json_format.MessageToJson(
397 transcoded_request["query_params"],
398 use_integers_for_enums=True,
399 )
400 )
401 query_params.update(
402 _BaseCloudTasksRestTransport._BaseGetQueue._get_unset_required_fields(
403 query_params
404 )
405 )
406
407 query_params["$alt"] = "json;enum-encoding=int"
408 return query_params
409
410 class _BaseGetTask:
411 def __hash__(self): # pragma: NO COVER
412 return NotImplementedError("__hash__ must be implemented.")
413
414 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
415
416 @classmethod
417 def _get_unset_required_fields(cls, message_dict):
418 return {
419 k: v
420 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
421 if k not in message_dict
422 }
423
424 @staticmethod
425 def _get_http_options():
426 http_options: List[Dict[str, str]] = [
427 {
428 "method": "get",
429 "uri": "/v2/{name=projects/*/locations/*/queues/*/tasks/*}",
430 },
431 ]
432 return http_options
433
434 @staticmethod
435 def _get_transcoded_request(http_options, request):
436 pb_request = cloudtasks.GetTaskRequest.pb(request)
437 transcoded_request = path_template.transcode(http_options, pb_request)
438 return transcoded_request
439
440 @staticmethod
441 def _get_query_params_json(transcoded_request):
442 query_params = json.loads(
443 json_format.MessageToJson(
444 transcoded_request["query_params"],
445 use_integers_for_enums=True,
446 )
447 )
448 query_params.update(
449 _BaseCloudTasksRestTransport._BaseGetTask._get_unset_required_fields(
450 query_params
451 )
452 )
453
454 query_params["$alt"] = "json;enum-encoding=int"
455 return query_params
456
457 class _BaseListQueues:
458 def __hash__(self): # pragma: NO COVER
459 return NotImplementedError("__hash__ must be implemented.")
460
461 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
462
463 @classmethod
464 def _get_unset_required_fields(cls, message_dict):
465 return {
466 k: v
467 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
468 if k not in message_dict
469 }
470
471 @staticmethod
472 def _get_http_options():
473 http_options: List[Dict[str, str]] = [
474 {
475 "method": "get",
476 "uri": "/v2/{parent=projects/*/locations/*}/queues",
477 },
478 ]
479 return http_options
480
481 @staticmethod
482 def _get_transcoded_request(http_options, request):
483 pb_request = cloudtasks.ListQueuesRequest.pb(request)
484 transcoded_request = path_template.transcode(http_options, pb_request)
485 return transcoded_request
486
487 @staticmethod
488 def _get_query_params_json(transcoded_request):
489 query_params = json.loads(
490 json_format.MessageToJson(
491 transcoded_request["query_params"],
492 use_integers_for_enums=True,
493 )
494 )
495 query_params.update(
496 _BaseCloudTasksRestTransport._BaseListQueues._get_unset_required_fields(
497 query_params
498 )
499 )
500
501 query_params["$alt"] = "json;enum-encoding=int"
502 return query_params
503
504 class _BaseListTasks:
505 def __hash__(self): # pragma: NO COVER
506 return NotImplementedError("__hash__ must be implemented.")
507
508 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
509
510 @classmethod
511 def _get_unset_required_fields(cls, message_dict):
512 return {
513 k: v
514 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
515 if k not in message_dict
516 }
517
518 @staticmethod
519 def _get_http_options():
520 http_options: List[Dict[str, str]] = [
521 {
522 "method": "get",
523 "uri": "/v2/{parent=projects/*/locations/*/queues/*}/tasks",
524 },
525 ]
526 return http_options
527
528 @staticmethod
529 def _get_transcoded_request(http_options, request):
530 pb_request = cloudtasks.ListTasksRequest.pb(request)
531 transcoded_request = path_template.transcode(http_options, pb_request)
532 return transcoded_request
533
534 @staticmethod
535 def _get_query_params_json(transcoded_request):
536 query_params = json.loads(
537 json_format.MessageToJson(
538 transcoded_request["query_params"],
539 use_integers_for_enums=True,
540 )
541 )
542 query_params.update(
543 _BaseCloudTasksRestTransport._BaseListTasks._get_unset_required_fields(
544 query_params
545 )
546 )
547
548 query_params["$alt"] = "json;enum-encoding=int"
549 return query_params
550
551 class _BasePauseQueue:
552 def __hash__(self): # pragma: NO COVER
553 return NotImplementedError("__hash__ must be implemented.")
554
555 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
556
557 @classmethod
558 def _get_unset_required_fields(cls, message_dict):
559 return {
560 k: v
561 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
562 if k not in message_dict
563 }
564
565 @staticmethod
566 def _get_http_options():
567 http_options: List[Dict[str, str]] = [
568 {
569 "method": "post",
570 "uri": "/v2/{name=projects/*/locations/*/queues/*}:pause",
571 "body": "*",
572 },
573 ]
574 return http_options
575
576 @staticmethod
577 def _get_transcoded_request(http_options, request):
578 pb_request = cloudtasks.PauseQueueRequest.pb(request)
579 transcoded_request = path_template.transcode(http_options, pb_request)
580 return transcoded_request
581
582 @staticmethod
583 def _get_request_body_json(transcoded_request):
584 # Jsonify the request body
585
586 body = json_format.MessageToJson(
587 transcoded_request["body"], use_integers_for_enums=True
588 )
589 return body
590
591 @staticmethod
592 def _get_query_params_json(transcoded_request):
593 query_params = json.loads(
594 json_format.MessageToJson(
595 transcoded_request["query_params"],
596 use_integers_for_enums=True,
597 )
598 )
599 query_params.update(
600 _BaseCloudTasksRestTransport._BasePauseQueue._get_unset_required_fields(
601 query_params
602 )
603 )
604
605 query_params["$alt"] = "json;enum-encoding=int"
606 return query_params
607
608 class _BasePurgeQueue:
609 def __hash__(self): # pragma: NO COVER
610 return NotImplementedError("__hash__ must be implemented.")
611
612 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
613
614 @classmethod
615 def _get_unset_required_fields(cls, message_dict):
616 return {
617 k: v
618 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
619 if k not in message_dict
620 }
621
622 @staticmethod
623 def _get_http_options():
624 http_options: List[Dict[str, str]] = [
625 {
626 "method": "post",
627 "uri": "/v2/{name=projects/*/locations/*/queues/*}:purge",
628 "body": "*",
629 },
630 ]
631 return http_options
632
633 @staticmethod
634 def _get_transcoded_request(http_options, request):
635 pb_request = cloudtasks.PurgeQueueRequest.pb(request)
636 transcoded_request = path_template.transcode(http_options, pb_request)
637 return transcoded_request
638
639 @staticmethod
640 def _get_request_body_json(transcoded_request):
641 # Jsonify the request body
642
643 body = json_format.MessageToJson(
644 transcoded_request["body"], use_integers_for_enums=True
645 )
646 return body
647
648 @staticmethod
649 def _get_query_params_json(transcoded_request):
650 query_params = json.loads(
651 json_format.MessageToJson(
652 transcoded_request["query_params"],
653 use_integers_for_enums=True,
654 )
655 )
656 query_params.update(
657 _BaseCloudTasksRestTransport._BasePurgeQueue._get_unset_required_fields(
658 query_params
659 )
660 )
661
662 query_params["$alt"] = "json;enum-encoding=int"
663 return query_params
664
665 class _BaseResumeQueue:
666 def __hash__(self): # pragma: NO COVER
667 return NotImplementedError("__hash__ must be implemented.")
668
669 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
670
671 @classmethod
672 def _get_unset_required_fields(cls, message_dict):
673 return {
674 k: v
675 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
676 if k not in message_dict
677 }
678
679 @staticmethod
680 def _get_http_options():
681 http_options: List[Dict[str, str]] = [
682 {
683 "method": "post",
684 "uri": "/v2/{name=projects/*/locations/*/queues/*}:resume",
685 "body": "*",
686 },
687 ]
688 return http_options
689
690 @staticmethod
691 def _get_transcoded_request(http_options, request):
692 pb_request = cloudtasks.ResumeQueueRequest.pb(request)
693 transcoded_request = path_template.transcode(http_options, pb_request)
694 return transcoded_request
695
696 @staticmethod
697 def _get_request_body_json(transcoded_request):
698 # Jsonify the request body
699
700 body = json_format.MessageToJson(
701 transcoded_request["body"], use_integers_for_enums=True
702 )
703 return body
704
705 @staticmethod
706 def _get_query_params_json(transcoded_request):
707 query_params = json.loads(
708 json_format.MessageToJson(
709 transcoded_request["query_params"],
710 use_integers_for_enums=True,
711 )
712 )
713 query_params.update(
714 _BaseCloudTasksRestTransport._BaseResumeQueue._get_unset_required_fields(
715 query_params
716 )
717 )
718
719 query_params["$alt"] = "json;enum-encoding=int"
720 return query_params
721
722 class _BaseRunTask:
723 def __hash__(self): # pragma: NO COVER
724 return NotImplementedError("__hash__ must be implemented.")
725
726 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
727
728 @classmethod
729 def _get_unset_required_fields(cls, message_dict):
730 return {
731 k: v
732 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
733 if k not in message_dict
734 }
735
736 @staticmethod
737 def _get_http_options():
738 http_options: List[Dict[str, str]] = [
739 {
740 "method": "post",
741 "uri": "/v2/{name=projects/*/locations/*/queues/*/tasks/*}:run",
742 "body": "*",
743 },
744 ]
745 return http_options
746
747 @staticmethod
748 def _get_transcoded_request(http_options, request):
749 pb_request = cloudtasks.RunTaskRequest.pb(request)
750 transcoded_request = path_template.transcode(http_options, pb_request)
751 return transcoded_request
752
753 @staticmethod
754 def _get_request_body_json(transcoded_request):
755 # Jsonify the request body
756
757 body = json_format.MessageToJson(
758 transcoded_request["body"], use_integers_for_enums=True
759 )
760 return body
761
762 @staticmethod
763 def _get_query_params_json(transcoded_request):
764 query_params = json.loads(
765 json_format.MessageToJson(
766 transcoded_request["query_params"],
767 use_integers_for_enums=True,
768 )
769 )
770 query_params.update(
771 _BaseCloudTasksRestTransport._BaseRunTask._get_unset_required_fields(
772 query_params
773 )
774 )
775
776 query_params["$alt"] = "json;enum-encoding=int"
777 return query_params
778
779 class _BaseSetIamPolicy:
780 def __hash__(self): # pragma: NO COVER
781 return NotImplementedError("__hash__ must be implemented.")
782
783 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
784
785 @classmethod
786 def _get_unset_required_fields(cls, message_dict):
787 return {
788 k: v
789 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
790 if k not in message_dict
791 }
792
793 @staticmethod
794 def _get_http_options():
795 http_options: List[Dict[str, str]] = [
796 {
797 "method": "post",
798 "uri": "/v2/{resource=projects/*/locations/*/queues/*}:setIamPolicy",
799 "body": "*",
800 },
801 ]
802 return http_options
803
804 @staticmethod
805 def _get_transcoded_request(http_options, request):
806 pb_request = request
807 transcoded_request = path_template.transcode(http_options, pb_request)
808 return transcoded_request
809
810 @staticmethod
811 def _get_request_body_json(transcoded_request):
812 # Jsonify the request body
813
814 body = json_format.MessageToJson(
815 transcoded_request["body"], use_integers_for_enums=True
816 )
817 return body
818
819 @staticmethod
820 def _get_query_params_json(transcoded_request):
821 query_params = json.loads(
822 json_format.MessageToJson(
823 transcoded_request["query_params"],
824 use_integers_for_enums=True,
825 )
826 )
827 query_params.update(
828 _BaseCloudTasksRestTransport._BaseSetIamPolicy._get_unset_required_fields(
829 query_params
830 )
831 )
832
833 query_params["$alt"] = "json;enum-encoding=int"
834 return query_params
835
836 class _BaseTestIamPermissions:
837 def __hash__(self): # pragma: NO COVER
838 return NotImplementedError("__hash__ must be implemented.")
839
840 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
841
842 @classmethod
843 def _get_unset_required_fields(cls, message_dict):
844 return {
845 k: v
846 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
847 if k not in message_dict
848 }
849
850 @staticmethod
851 def _get_http_options():
852 http_options: List[Dict[str, str]] = [
853 {
854 "method": "post",
855 "uri": "/v2/{resource=projects/*/locations/*/queues/*}:testIamPermissions",
856 "body": "*",
857 },
858 ]
859 return http_options
860
861 @staticmethod
862 def _get_transcoded_request(http_options, request):
863 pb_request = request
864 transcoded_request = path_template.transcode(http_options, pb_request)
865 return transcoded_request
866
867 @staticmethod
868 def _get_request_body_json(transcoded_request):
869 # Jsonify the request body
870
871 body = json_format.MessageToJson(
872 transcoded_request["body"], use_integers_for_enums=True
873 )
874 return body
875
876 @staticmethod
877 def _get_query_params_json(transcoded_request):
878 query_params = json.loads(
879 json_format.MessageToJson(
880 transcoded_request["query_params"],
881 use_integers_for_enums=True,
882 )
883 )
884 query_params.update(
885 _BaseCloudTasksRestTransport._BaseTestIamPermissions._get_unset_required_fields(
886 query_params
887 )
888 )
889
890 query_params["$alt"] = "json;enum-encoding=int"
891 return query_params
892
893 class _BaseUpdateQueue:
894 def __hash__(self): # pragma: NO COVER
895 return NotImplementedError("__hash__ must be implemented.")
896
897 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
898
899 @classmethod
900 def _get_unset_required_fields(cls, message_dict):
901 return {
902 k: v
903 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
904 if k not in message_dict
905 }
906
907 @staticmethod
908 def _get_http_options():
909 http_options: List[Dict[str, str]] = [
910 {
911 "method": "patch",
912 "uri": "/v2/{queue.name=projects/*/locations/*/queues/*}",
913 "body": "queue",
914 },
915 ]
916 return http_options
917
918 @staticmethod
919 def _get_transcoded_request(http_options, request):
920 pb_request = cloudtasks.UpdateQueueRequest.pb(request)
921 transcoded_request = path_template.transcode(http_options, pb_request)
922 return transcoded_request
923
924 @staticmethod
925 def _get_request_body_json(transcoded_request):
926 # Jsonify the request body
927
928 body = json_format.MessageToJson(
929 transcoded_request["body"], use_integers_for_enums=True
930 )
931 return body
932
933 @staticmethod
934 def _get_query_params_json(transcoded_request):
935 query_params = json.loads(
936 json_format.MessageToJson(
937 transcoded_request["query_params"],
938 use_integers_for_enums=True,
939 )
940 )
941 query_params.update(
942 _BaseCloudTasksRestTransport._BaseUpdateQueue._get_unset_required_fields(
943 query_params
944 )
945 )
946
947 query_params["$alt"] = "json;enum-encoding=int"
948 return query_params
949
950 class _BaseGetLocation:
951 def __hash__(self): # pragma: NO COVER
952 return NotImplementedError("__hash__ must be implemented.")
953
954 @staticmethod
955 def _get_http_options():
956 http_options: List[Dict[str, str]] = [
957 {
958 "method": "get",
959 "uri": "/v2/{name=projects/*/locations/*}",
960 },
961 ]
962 return http_options
963
964 @staticmethod
965 def _get_transcoded_request(http_options, request):
966 request_kwargs = json_format.MessageToDict(request)
967 transcoded_request = path_template.transcode(http_options, **request_kwargs)
968 return transcoded_request
969
970 @staticmethod
971 def _get_query_params_json(transcoded_request):
972 query_params = json.loads(json.dumps(transcoded_request["query_params"]))
973 return query_params
974
975 class _BaseListLocations:
976 def __hash__(self): # pragma: NO COVER
977 return NotImplementedError("__hash__ must be implemented.")
978
979 @staticmethod
980 def _get_http_options():
981 http_options: List[Dict[str, str]] = [
982 {
983 "method": "get",
984 "uri": "/v2/{name=projects/*}/locations",
985 },
986 ]
987 return http_options
988
989 @staticmethod
990 def _get_transcoded_request(http_options, request):
991 request_kwargs = json_format.MessageToDict(request)
992 transcoded_request = path_template.transcode(http_options, **request_kwargs)
993 return transcoded_request
994
995 @staticmethod
996 def _get_query_params_json(transcoded_request):
997 query_params = json.loads(json.dumps(transcoded_request["query_params"]))
998 return query_params
999
1000
1001__all__ = ("_BaseCloudTasksRestTransport",)