1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import json # type: ignore
17from google.api_core import path_template
18from google.api_core import gapic_v1
19
20from google.protobuf import json_format
21from google.iam.v1 import iam_policy_pb2 # type: ignore
22from google.iam.v1 import policy_pb2 # type: ignore
23from .base import PublisherTransport, DEFAULT_CLIENT_INFO
24
25import re
26from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
27
28
29from google.protobuf import empty_pb2 # type: ignore
30from google.pubsub_v1.types import pubsub
31
32
33class _BasePublisherRestTransport(PublisherTransport):
34 """Base REST backend transport for Publisher.
35
36 Note: This class is not meant to be used directly. Use its sync and
37 async sub-classes instead.
38
39 This class defines the same methods as the primary client, so the
40 primary client can load the underlying transport implementation
41 and call it.
42
43 It sends JSON representations of protocol buffers over HTTP/1.1
44 """
45
46 def __init__(
47 self,
48 *,
49 host: str = "pubsub.googleapis.com",
50 credentials: Optional[Any] = None,
51 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
52 always_use_jwt_access: Optional[bool] = False,
53 url_scheme: str = "https",
54 api_audience: Optional[str] = None,
55 ) -> None:
56 """Instantiate the transport.
57 Args:
58 host (Optional[str]):
59 The hostname to connect to (default: 'pubsub.googleapis.com').
60 credentials (Optional[Any]): The
61 authorization credentials to attach to requests. These
62 credentials identify the application to the service; if none
63 are specified, the client will attempt to ascertain the
64 credentials from the environment.
65 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
66 The client info used to send a user-agent string along with
67 API requests. If ``None``, then default info will be used.
68 Generally, you only need to set this if you are developing
69 your own client library.
70 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
71 be used for service account credentials.
72 url_scheme: the protocol scheme for the API endpoint. Normally
73 "https", but for testing or local servers,
74 "http" can be specified.
75 """
76 # Run the base constructor
77 maybe_url_match = re.match("^(?P<scheme>http(?:s)?://)?(?P<host>.*)$", host)
78 if maybe_url_match is None:
79 raise ValueError(
80 f"Unexpected hostname structure: {host}"
81 ) # pragma: NO COVER
82
83 url_match_items = maybe_url_match.groupdict()
84
85 host = f"{url_scheme}://{host}" if not url_match_items["scheme"] else host
86
87 super().__init__(
88 host=host,
89 credentials=credentials,
90 client_info=client_info,
91 always_use_jwt_access=always_use_jwt_access,
92 api_audience=api_audience,
93 )
94
95 class _BaseCreateTopic:
96 def __hash__(self): # pragma: NO COVER
97 return NotImplementedError("__hash__ must be implemented.")
98
99 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
100
101 @classmethod
102 def _get_unset_required_fields(cls, message_dict):
103 return {
104 k: v
105 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
106 if k not in message_dict
107 }
108
109 @staticmethod
110 def _get_http_options():
111 http_options: List[Dict[str, str]] = [
112 {
113 "method": "put",
114 "uri": "/v1/{name=projects/*/topics/*}",
115 "body": "*",
116 },
117 ]
118 return http_options
119
120 @staticmethod
121 def _get_transcoded_request(http_options, request):
122 pb_request = pubsub.Topic.pb(request)
123 transcoded_request = path_template.transcode(http_options, pb_request)
124 return transcoded_request
125
126 @staticmethod
127 def _get_request_body_json(transcoded_request):
128 # Jsonify the request body
129
130 body = json_format.MessageToJson(
131 transcoded_request["body"], use_integers_for_enums=True
132 )
133 return body
134
135 @staticmethod
136 def _get_query_params_json(transcoded_request):
137 query_params = json.loads(
138 json_format.MessageToJson(
139 transcoded_request["query_params"],
140 use_integers_for_enums=True,
141 )
142 )
143 query_params.update(
144 _BasePublisherRestTransport._BaseCreateTopic._get_unset_required_fields(
145 query_params
146 )
147 )
148
149 query_params["$alt"] = "json;enum-encoding=int"
150 return query_params
151
152 class _BaseDeleteTopic:
153 def __hash__(self): # pragma: NO COVER
154 return NotImplementedError("__hash__ must be implemented.")
155
156 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
157
158 @classmethod
159 def _get_unset_required_fields(cls, message_dict):
160 return {
161 k: v
162 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
163 if k not in message_dict
164 }
165
166 @staticmethod
167 def _get_http_options():
168 http_options: List[Dict[str, str]] = [
169 {
170 "method": "delete",
171 "uri": "/v1/{topic=projects/*/topics/*}",
172 },
173 ]
174 return http_options
175
176 @staticmethod
177 def _get_transcoded_request(http_options, request):
178 pb_request = pubsub.DeleteTopicRequest.pb(request)
179 transcoded_request = path_template.transcode(http_options, pb_request)
180 return transcoded_request
181
182 @staticmethod
183 def _get_query_params_json(transcoded_request):
184 query_params = json.loads(
185 json_format.MessageToJson(
186 transcoded_request["query_params"],
187 use_integers_for_enums=True,
188 )
189 )
190 query_params.update(
191 _BasePublisherRestTransport._BaseDeleteTopic._get_unset_required_fields(
192 query_params
193 )
194 )
195
196 query_params["$alt"] = "json;enum-encoding=int"
197 return query_params
198
199 class _BaseDetachSubscription:
200 def __hash__(self): # pragma: NO COVER
201 return NotImplementedError("__hash__ must be implemented.")
202
203 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
204
205 @classmethod
206 def _get_unset_required_fields(cls, message_dict):
207 return {
208 k: v
209 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
210 if k not in message_dict
211 }
212
213 @staticmethod
214 def _get_http_options():
215 http_options: List[Dict[str, str]] = [
216 {
217 "method": "post",
218 "uri": "/v1/{subscription=projects/*/subscriptions/*}:detach",
219 },
220 ]
221 return http_options
222
223 @staticmethod
224 def _get_transcoded_request(http_options, request):
225 pb_request = pubsub.DetachSubscriptionRequest.pb(request)
226 transcoded_request = path_template.transcode(http_options, pb_request)
227 return transcoded_request
228
229 @staticmethod
230 def _get_query_params_json(transcoded_request):
231 query_params = json.loads(
232 json_format.MessageToJson(
233 transcoded_request["query_params"],
234 use_integers_for_enums=True,
235 )
236 )
237 query_params.update(
238 _BasePublisherRestTransport._BaseDetachSubscription._get_unset_required_fields(
239 query_params
240 )
241 )
242
243 query_params["$alt"] = "json;enum-encoding=int"
244 return query_params
245
246 class _BaseGetTopic:
247 def __hash__(self): # pragma: NO COVER
248 return NotImplementedError("__hash__ must be implemented.")
249
250 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
251
252 @classmethod
253 def _get_unset_required_fields(cls, message_dict):
254 return {
255 k: v
256 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
257 if k not in message_dict
258 }
259
260 @staticmethod
261 def _get_http_options():
262 http_options: List[Dict[str, str]] = [
263 {
264 "method": "get",
265 "uri": "/v1/{topic=projects/*/topics/*}",
266 },
267 ]
268 return http_options
269
270 @staticmethod
271 def _get_transcoded_request(http_options, request):
272 pb_request = pubsub.GetTopicRequest.pb(request)
273 transcoded_request = path_template.transcode(http_options, pb_request)
274 return transcoded_request
275
276 @staticmethod
277 def _get_query_params_json(transcoded_request):
278 query_params = json.loads(
279 json_format.MessageToJson(
280 transcoded_request["query_params"],
281 use_integers_for_enums=True,
282 )
283 )
284 query_params.update(
285 _BasePublisherRestTransport._BaseGetTopic._get_unset_required_fields(
286 query_params
287 )
288 )
289
290 query_params["$alt"] = "json;enum-encoding=int"
291 return query_params
292
293 class _BaseListTopics:
294 def __hash__(self): # pragma: NO COVER
295 return NotImplementedError("__hash__ must be implemented.")
296
297 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
298
299 @classmethod
300 def _get_unset_required_fields(cls, message_dict):
301 return {
302 k: v
303 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
304 if k not in message_dict
305 }
306
307 @staticmethod
308 def _get_http_options():
309 http_options: List[Dict[str, str]] = [
310 {
311 "method": "get",
312 "uri": "/v1/{project=projects/*}/topics",
313 },
314 ]
315 return http_options
316
317 @staticmethod
318 def _get_transcoded_request(http_options, request):
319 pb_request = pubsub.ListTopicsRequest.pb(request)
320 transcoded_request = path_template.transcode(http_options, pb_request)
321 return transcoded_request
322
323 @staticmethod
324 def _get_query_params_json(transcoded_request):
325 query_params = json.loads(
326 json_format.MessageToJson(
327 transcoded_request["query_params"],
328 use_integers_for_enums=True,
329 )
330 )
331 query_params.update(
332 _BasePublisherRestTransport._BaseListTopics._get_unset_required_fields(
333 query_params
334 )
335 )
336
337 query_params["$alt"] = "json;enum-encoding=int"
338 return query_params
339
340 class _BaseListTopicSnapshots:
341 def __hash__(self): # pragma: NO COVER
342 return NotImplementedError("__hash__ must be implemented.")
343
344 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
345
346 @classmethod
347 def _get_unset_required_fields(cls, message_dict):
348 return {
349 k: v
350 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
351 if k not in message_dict
352 }
353
354 @staticmethod
355 def _get_http_options():
356 http_options: List[Dict[str, str]] = [
357 {
358 "method": "get",
359 "uri": "/v1/{topic=projects/*/topics/*}/snapshots",
360 },
361 ]
362 return http_options
363
364 @staticmethod
365 def _get_transcoded_request(http_options, request):
366 pb_request = pubsub.ListTopicSnapshotsRequest.pb(request)
367 transcoded_request = path_template.transcode(http_options, pb_request)
368 return transcoded_request
369
370 @staticmethod
371 def _get_query_params_json(transcoded_request):
372 query_params = json.loads(
373 json_format.MessageToJson(
374 transcoded_request["query_params"],
375 use_integers_for_enums=True,
376 )
377 )
378 query_params.update(
379 _BasePublisherRestTransport._BaseListTopicSnapshots._get_unset_required_fields(
380 query_params
381 )
382 )
383
384 query_params["$alt"] = "json;enum-encoding=int"
385 return query_params
386
387 class _BaseListTopicSubscriptions:
388 def __hash__(self): # pragma: NO COVER
389 return NotImplementedError("__hash__ must be implemented.")
390
391 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
392
393 @classmethod
394 def _get_unset_required_fields(cls, message_dict):
395 return {
396 k: v
397 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
398 if k not in message_dict
399 }
400
401 @staticmethod
402 def _get_http_options():
403 http_options: List[Dict[str, str]] = [
404 {
405 "method": "get",
406 "uri": "/v1/{topic=projects/*/topics/*}/subscriptions",
407 },
408 ]
409 return http_options
410
411 @staticmethod
412 def _get_transcoded_request(http_options, request):
413 pb_request = pubsub.ListTopicSubscriptionsRequest.pb(request)
414 transcoded_request = path_template.transcode(http_options, pb_request)
415 return transcoded_request
416
417 @staticmethod
418 def _get_query_params_json(transcoded_request):
419 query_params = json.loads(
420 json_format.MessageToJson(
421 transcoded_request["query_params"],
422 use_integers_for_enums=True,
423 )
424 )
425 query_params.update(
426 _BasePublisherRestTransport._BaseListTopicSubscriptions._get_unset_required_fields(
427 query_params
428 )
429 )
430
431 query_params["$alt"] = "json;enum-encoding=int"
432 return query_params
433
434 class _BasePublish:
435 def __hash__(self): # pragma: NO COVER
436 return NotImplementedError("__hash__ must be implemented.")
437
438 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
439
440 @classmethod
441 def _get_unset_required_fields(cls, message_dict):
442 return {
443 k: v
444 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
445 if k not in message_dict
446 }
447
448 @staticmethod
449 def _get_http_options():
450 http_options: List[Dict[str, str]] = [
451 {
452 "method": "post",
453 "uri": "/v1/{topic=projects/*/topics/*}:publish",
454 "body": "*",
455 },
456 ]
457 return http_options
458
459 @staticmethod
460 def _get_transcoded_request(http_options, request):
461 pb_request = pubsub.PublishRequest.pb(request)
462 transcoded_request = path_template.transcode(http_options, pb_request)
463 return transcoded_request
464
465 @staticmethod
466 def _get_request_body_json(transcoded_request):
467 # Jsonify the request body
468
469 body = json_format.MessageToJson(
470 transcoded_request["body"], use_integers_for_enums=True
471 )
472 return body
473
474 @staticmethod
475 def _get_query_params_json(transcoded_request):
476 query_params = json.loads(
477 json_format.MessageToJson(
478 transcoded_request["query_params"],
479 use_integers_for_enums=True,
480 )
481 )
482 query_params.update(
483 _BasePublisherRestTransport._BasePublish._get_unset_required_fields(
484 query_params
485 )
486 )
487
488 query_params["$alt"] = "json;enum-encoding=int"
489 return query_params
490
491 class _BaseUpdateTopic:
492 def __hash__(self): # pragma: NO COVER
493 return NotImplementedError("__hash__ must be implemented.")
494
495 __REQUIRED_FIELDS_DEFAULT_VALUES: Dict[str, Any] = {}
496
497 @classmethod
498 def _get_unset_required_fields(cls, message_dict):
499 return {
500 k: v
501 for k, v in cls.__REQUIRED_FIELDS_DEFAULT_VALUES.items()
502 if k not in message_dict
503 }
504
505 @staticmethod
506 def _get_http_options():
507 http_options: List[Dict[str, str]] = [
508 {
509 "method": "patch",
510 "uri": "/v1/{topic.name=projects/*/topics/*}",
511 "body": "*",
512 },
513 ]
514 return http_options
515
516 @staticmethod
517 def _get_transcoded_request(http_options, request):
518 pb_request = pubsub.UpdateTopicRequest.pb(request)
519 transcoded_request = path_template.transcode(http_options, pb_request)
520 return transcoded_request
521
522 @staticmethod
523 def _get_request_body_json(transcoded_request):
524 # Jsonify the request body
525
526 body = json_format.MessageToJson(
527 transcoded_request["body"], use_integers_for_enums=True
528 )
529 return body
530
531 @staticmethod
532 def _get_query_params_json(transcoded_request):
533 query_params = json.loads(
534 json_format.MessageToJson(
535 transcoded_request["query_params"],
536 use_integers_for_enums=True,
537 )
538 )
539 query_params.update(
540 _BasePublisherRestTransport._BaseUpdateTopic._get_unset_required_fields(
541 query_params
542 )
543 )
544
545 query_params["$alt"] = "json;enum-encoding=int"
546 return query_params
547
548 class _BaseGetIamPolicy:
549 def __hash__(self): # pragma: NO COVER
550 return NotImplementedError("__hash__ must be implemented.")
551
552 @staticmethod
553 def _get_http_options():
554 http_options: List[Dict[str, str]] = [
555 {
556 "method": "get",
557 "uri": "/v1/{resource=projects/*/topics/*}:getIamPolicy",
558 },
559 {
560 "method": "get",
561 "uri": "/v1/{resource=projects/*/subscriptions/*}:getIamPolicy",
562 },
563 {
564 "method": "get",
565 "uri": "/v1/{resource=projects/*/snapshots/*}:getIamPolicy",
566 },
567 {
568 "method": "get",
569 "uri": "/v1/{resource=projects/*/schemas/*}:getIamPolicy",
570 },
571 ]
572 return http_options
573
574 @staticmethod
575 def _get_transcoded_request(http_options, request):
576 request_kwargs = json_format.MessageToDict(request)
577 transcoded_request = path_template.transcode(http_options, **request_kwargs)
578 return transcoded_request
579
580 @staticmethod
581 def _get_query_params_json(transcoded_request):
582 query_params = json.loads(json.dumps(transcoded_request["query_params"]))
583 return query_params
584
585 class _BaseSetIamPolicy:
586 def __hash__(self): # pragma: NO COVER
587 return NotImplementedError("__hash__ must be implemented.")
588
589 @staticmethod
590 def _get_http_options():
591 http_options: List[Dict[str, str]] = [
592 {
593 "method": "post",
594 "uri": "/v1/{resource=projects/*/topics/*}:setIamPolicy",
595 "body": "*",
596 },
597 {
598 "method": "post",
599 "uri": "/v1/{resource=projects/*/subscriptions/*}:setIamPolicy",
600 "body": "*",
601 },
602 {
603 "method": "post",
604 "uri": "/v1/{resource=projects/*/snapshots/*}:setIamPolicy",
605 "body": "*",
606 },
607 {
608 "method": "post",
609 "uri": "/v1/{resource=projects/*/schemas/*}:setIamPolicy",
610 "body": "*",
611 },
612 ]
613 return http_options
614
615 @staticmethod
616 def _get_transcoded_request(http_options, request):
617 request_kwargs = json_format.MessageToDict(request)
618 transcoded_request = path_template.transcode(http_options, **request_kwargs)
619 return transcoded_request
620
621 @staticmethod
622 def _get_request_body_json(transcoded_request):
623 body = json.dumps(transcoded_request["body"])
624 return body
625
626 @staticmethod
627 def _get_query_params_json(transcoded_request):
628 query_params = json.loads(json.dumps(transcoded_request["query_params"]))
629 return query_params
630
631 class _BaseTestIamPermissions:
632 def __hash__(self): # pragma: NO COVER
633 return NotImplementedError("__hash__ must be implemented.")
634
635 @staticmethod
636 def _get_http_options():
637 http_options: List[Dict[str, str]] = [
638 {
639 "method": "post",
640 "uri": "/v1/{resource=projects/*/subscriptions/*}:testIamPermissions",
641 "body": "*",
642 },
643 {
644 "method": "post",
645 "uri": "/v1/{resource=projects/*/topics/*}:testIamPermissions",
646 "body": "*",
647 },
648 {
649 "method": "post",
650 "uri": "/v1/{resource=projects/*/snapshots/*}:testIamPermissions",
651 "body": "*",
652 },
653 {
654 "method": "post",
655 "uri": "/v1/{resource=projects/*/schemas/*}:testIamPermissions",
656 "body": "*",
657 },
658 ]
659 return http_options
660
661 @staticmethod
662 def _get_transcoded_request(http_options, request):
663 request_kwargs = json_format.MessageToDict(request)
664 transcoded_request = path_template.transcode(http_options, **request_kwargs)
665 return transcoded_request
666
667 @staticmethod
668 def _get_request_body_json(transcoded_request):
669 body = json.dumps(transcoded_request["body"])
670 return body
671
672 @staticmethod
673 def _get_query_params_json(transcoded_request):
674 query_params = json.loads(json.dumps(transcoded_request["query_params"]))
675 return query_params
676
677
678__all__ = ("_BasePublisherRestTransport",)