Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/pubsub_v1/services/publisher/transports/rest.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

450 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import logging 

17import json # type: ignore 

18 

19from google.auth.transport.requests import AuthorizedSession # type: ignore 

20from google.auth import credentials as ga_credentials # type: ignore 

21from google.api_core import exceptions as core_exceptions 

22from google.api_core import retry as retries 

23from google.api_core import rest_helpers 

24from google.api_core import rest_streaming 

25from google.api_core import gapic_v1 

26import google.protobuf 

27 

28from google.protobuf import json_format 

29from google.iam.v1 import iam_policy_pb2 # type: ignore 

30from google.iam.v1 import policy_pb2 # type: ignore 

31 

32from requests import __version__ as requests_version 

33import dataclasses 

34from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union 

35import warnings 

36 

37 

38from google.protobuf import empty_pb2 # type: ignore 

39from google.pubsub_v1.types import pubsub 

40 

41 

42from .rest_base import _BasePublisherRestTransport 

43from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO 

44 

45try: 

46 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None] 

47except AttributeError: # pragma: NO COVER 

48 OptionalRetry = Union[retries.Retry, object, None] # type: ignore 

49 

50try: 

51 from google.api_core import client_logging # type: ignore 

52 

53 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

54except ImportError: # pragma: NO COVER 

55 CLIENT_LOGGING_SUPPORTED = False 

56 

57_LOGGER = logging.getLogger(__name__) 

58 

59DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

60 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version, 

61 grpc_version=None, 

62 rest_version=f"requests@{requests_version}", 

63) 

64 

65if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER 

66 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__ 

67 

68 

69class PublisherRestInterceptor: 

70 """Interceptor for Publisher. 

71 

72 Interceptors are used to manipulate requests, request metadata, and responses 

73 in arbitrary ways. 

74 Example use cases include: 

75 * Logging 

76 * Verifying requests according to service or custom semantics 

77 * Stripping extraneous information from responses 

78 

79 These use cases and more can be enabled by injecting an 

80 instance of a custom subclass when constructing the PublisherRestTransport. 

81 

82 .. code-block:: python 

83 class MyCustomPublisherInterceptor(PublisherRestInterceptor): 

84 def pre_create_topic(self, request, metadata): 

85 logging.log(f"Received request: {request}") 

86 return request, metadata 

87 

88 def post_create_topic(self, response): 

89 logging.log(f"Received response: {response}") 

90 return response 

91 

92 def pre_delete_topic(self, request, metadata): 

93 logging.log(f"Received request: {request}") 

94 return request, metadata 

95 

96 def pre_detach_subscription(self, request, metadata): 

97 logging.log(f"Received request: {request}") 

98 return request, metadata 

99 

100 def post_detach_subscription(self, response): 

101 logging.log(f"Received response: {response}") 

102 return response 

103 

104 def pre_get_topic(self, request, metadata): 

105 logging.log(f"Received request: {request}") 

106 return request, metadata 

107 

108 def post_get_topic(self, response): 

109 logging.log(f"Received response: {response}") 

110 return response 

111 

112 def pre_list_topics(self, request, metadata): 

113 logging.log(f"Received request: {request}") 

114 return request, metadata 

115 

116 def post_list_topics(self, response): 

117 logging.log(f"Received response: {response}") 

118 return response 

119 

120 def pre_list_topic_snapshots(self, request, metadata): 

121 logging.log(f"Received request: {request}") 

122 return request, metadata 

123 

124 def post_list_topic_snapshots(self, response): 

125 logging.log(f"Received response: {response}") 

126 return response 

127 

128 def pre_list_topic_subscriptions(self, request, metadata): 

129 logging.log(f"Received request: {request}") 

130 return request, metadata 

131 

132 def post_list_topic_subscriptions(self, response): 

133 logging.log(f"Received response: {response}") 

134 return response 

135 

136 def pre_publish(self, request, metadata): 

137 logging.log(f"Received request: {request}") 

138 return request, metadata 

139 

140 def post_publish(self, response): 

141 logging.log(f"Received response: {response}") 

142 return response 

143 

144 def pre_update_topic(self, request, metadata): 

145 logging.log(f"Received request: {request}") 

146 return request, metadata 

147 

148 def post_update_topic(self, response): 

149 logging.log(f"Received response: {response}") 

150 return response 

151 

152 transport = PublisherRestTransport(interceptor=MyCustomPublisherInterceptor()) 

153 client = PublisherClient(transport=transport) 

154 

155 

156 """ 

157 

158 def pre_create_topic( 

159 self, request: pubsub.Topic, metadata: Sequence[Tuple[str, Union[str, bytes]]] 

160 ) -> Tuple[pubsub.Topic, Sequence[Tuple[str, Union[str, bytes]]]]: 

161 """Pre-rpc interceptor for create_topic 

162 

163 Override in a subclass to manipulate the request or metadata 

164 before they are sent to the Publisher server. 

165 """ 

166 return request, metadata 

167 

168 def post_create_topic(self, response: pubsub.Topic) -> pubsub.Topic: 

169 """Post-rpc interceptor for create_topic 

170 

171 DEPRECATED. Please use the `post_create_topic_with_metadata` 

172 interceptor instead. 

173 

174 Override in a subclass to read or manipulate the response 

175 after it is returned by the Publisher server but before 

176 it is returned to user code. This `post_create_topic` interceptor runs 

177 before the `post_create_topic_with_metadata` interceptor. 

178 """ 

179 return response 

180 

181 def post_create_topic_with_metadata( 

182 self, response: pubsub.Topic, metadata: Sequence[Tuple[str, Union[str, bytes]]] 

183 ) -> Tuple[pubsub.Topic, Sequence[Tuple[str, Union[str, bytes]]]]: 

184 """Post-rpc interceptor for create_topic 

185 

186 Override in a subclass to read or manipulate the response or metadata after it 

187 is returned by the Publisher server but before it is returned to user code. 

188 

189 We recommend only using this `post_create_topic_with_metadata` 

190 interceptor in new development instead of the `post_create_topic` interceptor. 

191 When both interceptors are used, this `post_create_topic_with_metadata` interceptor runs after the 

192 `post_create_topic` interceptor. The (possibly modified) response returned by 

193 `post_create_topic` will be passed to 

194 `post_create_topic_with_metadata`. 

195 """ 

196 return response, metadata 

197 

198 def pre_delete_topic( 

199 self, 

200 request: pubsub.DeleteTopicRequest, 

201 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

202 ) -> Tuple[pubsub.DeleteTopicRequest, Sequence[Tuple[str, Union[str, bytes]]]]: 

203 """Pre-rpc interceptor for delete_topic 

204 

205 Override in a subclass to manipulate the request or metadata 

206 before they are sent to the Publisher server. 

207 """ 

208 return request, metadata 

209 

210 def pre_detach_subscription( 

211 self, 

212 request: pubsub.DetachSubscriptionRequest, 

213 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

214 ) -> Tuple[ 

215 pubsub.DetachSubscriptionRequest, Sequence[Tuple[str, Union[str, bytes]]] 

216 ]: 

217 """Pre-rpc interceptor for detach_subscription 

218 

219 Override in a subclass to manipulate the request or metadata 

220 before they are sent to the Publisher server. 

221 """ 

222 return request, metadata 

223 

224 def post_detach_subscription( 

225 self, response: pubsub.DetachSubscriptionResponse 

226 ) -> pubsub.DetachSubscriptionResponse: 

227 """Post-rpc interceptor for detach_subscription 

228 

229 DEPRECATED. Please use the `post_detach_subscription_with_metadata` 

230 interceptor instead. 

231 

232 Override in a subclass to read or manipulate the response 

233 after it is returned by the Publisher server but before 

234 it is returned to user code. This `post_detach_subscription` interceptor runs 

235 before the `post_detach_subscription_with_metadata` interceptor. 

236 """ 

237 return response 

238 

239 def post_detach_subscription_with_metadata( 

240 self, 

241 response: pubsub.DetachSubscriptionResponse, 

242 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

243 ) -> Tuple[ 

244 pubsub.DetachSubscriptionResponse, Sequence[Tuple[str, Union[str, bytes]]] 

245 ]: 

246 """Post-rpc interceptor for detach_subscription 

247 

248 Override in a subclass to read or manipulate the response or metadata after it 

249 is returned by the Publisher server but before it is returned to user code. 

250 

251 We recommend only using this `post_detach_subscription_with_metadata` 

252 interceptor in new development instead of the `post_detach_subscription` interceptor. 

253 When both interceptors are used, this `post_detach_subscription_with_metadata` interceptor runs after the 

254 `post_detach_subscription` interceptor. The (possibly modified) response returned by 

255 `post_detach_subscription` will be passed to 

256 `post_detach_subscription_with_metadata`. 

257 """ 

258 return response, metadata 

259 

260 def pre_get_topic( 

261 self, 

262 request: pubsub.GetTopicRequest, 

263 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

264 ) -> Tuple[pubsub.GetTopicRequest, Sequence[Tuple[str, Union[str, bytes]]]]: 

265 """Pre-rpc interceptor for get_topic 

266 

267 Override in a subclass to manipulate the request or metadata 

268 before they are sent to the Publisher server. 

269 """ 

270 return request, metadata 

271 

272 def post_get_topic(self, response: pubsub.Topic) -> pubsub.Topic: 

273 """Post-rpc interceptor for get_topic 

274 

275 DEPRECATED. Please use the `post_get_topic_with_metadata` 

276 interceptor instead. 

277 

278 Override in a subclass to read or manipulate the response 

279 after it is returned by the Publisher server but before 

280 it is returned to user code. This `post_get_topic` interceptor runs 

281 before the `post_get_topic_with_metadata` interceptor. 

282 """ 

283 return response 

284 

285 def post_get_topic_with_metadata( 

286 self, response: pubsub.Topic, metadata: Sequence[Tuple[str, Union[str, bytes]]] 

287 ) -> Tuple[pubsub.Topic, Sequence[Tuple[str, Union[str, bytes]]]]: 

288 """Post-rpc interceptor for get_topic 

289 

290 Override in a subclass to read or manipulate the response or metadata after it 

291 is returned by the Publisher server but before it is returned to user code. 

292 

293 We recommend only using this `post_get_topic_with_metadata` 

294 interceptor in new development instead of the `post_get_topic` interceptor. 

295 When both interceptors are used, this `post_get_topic_with_metadata` interceptor runs after the 

296 `post_get_topic` interceptor. The (possibly modified) response returned by 

297 `post_get_topic` will be passed to 

298 `post_get_topic_with_metadata`. 

299 """ 

300 return response, metadata 

301 

302 def pre_list_topics( 

303 self, 

304 request: pubsub.ListTopicsRequest, 

305 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

306 ) -> Tuple[pubsub.ListTopicsRequest, Sequence[Tuple[str, Union[str, bytes]]]]: 

307 """Pre-rpc interceptor for list_topics 

308 

309 Override in a subclass to manipulate the request or metadata 

310 before they are sent to the Publisher server. 

311 """ 

312 return request, metadata 

313 

314 def post_list_topics( 

315 self, response: pubsub.ListTopicsResponse 

316 ) -> pubsub.ListTopicsResponse: 

317 """Post-rpc interceptor for list_topics 

318 

319 DEPRECATED. Please use the `post_list_topics_with_metadata` 

320 interceptor instead. 

321 

322 Override in a subclass to read or manipulate the response 

323 after it is returned by the Publisher server but before 

324 it is returned to user code. This `post_list_topics` interceptor runs 

325 before the `post_list_topics_with_metadata` interceptor. 

326 """ 

327 return response 

328 

329 def post_list_topics_with_metadata( 

330 self, 

331 response: pubsub.ListTopicsResponse, 

332 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

333 ) -> Tuple[pubsub.ListTopicsResponse, Sequence[Tuple[str, Union[str, bytes]]]]: 

334 """Post-rpc interceptor for list_topics 

335 

336 Override in a subclass to read or manipulate the response or metadata after it 

337 is returned by the Publisher server but before it is returned to user code. 

338 

339 We recommend only using this `post_list_topics_with_metadata` 

340 interceptor in new development instead of the `post_list_topics` interceptor. 

341 When both interceptors are used, this `post_list_topics_with_metadata` interceptor runs after the 

342 `post_list_topics` interceptor. The (possibly modified) response returned by 

343 `post_list_topics` will be passed to 

344 `post_list_topics_with_metadata`. 

345 """ 

346 return response, metadata 

347 

348 def pre_list_topic_snapshots( 

349 self, 

350 request: pubsub.ListTopicSnapshotsRequest, 

351 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

352 ) -> Tuple[ 

353 pubsub.ListTopicSnapshotsRequest, Sequence[Tuple[str, Union[str, bytes]]] 

354 ]: 

355 """Pre-rpc interceptor for list_topic_snapshots 

356 

357 Override in a subclass to manipulate the request or metadata 

358 before they are sent to the Publisher server. 

359 """ 

360 return request, metadata 

361 

362 def post_list_topic_snapshots( 

363 self, response: pubsub.ListTopicSnapshotsResponse 

364 ) -> pubsub.ListTopicSnapshotsResponse: 

365 """Post-rpc interceptor for list_topic_snapshots 

366 

367 DEPRECATED. Please use the `post_list_topic_snapshots_with_metadata` 

368 interceptor instead. 

369 

370 Override in a subclass to read or manipulate the response 

371 after it is returned by the Publisher server but before 

372 it is returned to user code. This `post_list_topic_snapshots` interceptor runs 

373 before the `post_list_topic_snapshots_with_metadata` interceptor. 

374 """ 

375 return response 

376 

377 def post_list_topic_snapshots_with_metadata( 

378 self, 

379 response: pubsub.ListTopicSnapshotsResponse, 

380 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

381 ) -> Tuple[ 

382 pubsub.ListTopicSnapshotsResponse, Sequence[Tuple[str, Union[str, bytes]]] 

383 ]: 

384 """Post-rpc interceptor for list_topic_snapshots 

385 

386 Override in a subclass to read or manipulate the response or metadata after it 

387 is returned by the Publisher server but before it is returned to user code. 

388 

389 We recommend only using this `post_list_topic_snapshots_with_metadata` 

390 interceptor in new development instead of the `post_list_topic_snapshots` interceptor. 

391 When both interceptors are used, this `post_list_topic_snapshots_with_metadata` interceptor runs after the 

392 `post_list_topic_snapshots` interceptor. The (possibly modified) response returned by 

393 `post_list_topic_snapshots` will be passed to 

394 `post_list_topic_snapshots_with_metadata`. 

395 """ 

396 return response, metadata 

397 

398 def pre_list_topic_subscriptions( 

399 self, 

400 request: pubsub.ListTopicSubscriptionsRequest, 

401 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

402 ) -> Tuple[ 

403 pubsub.ListTopicSubscriptionsRequest, Sequence[Tuple[str, Union[str, bytes]]] 

404 ]: 

405 """Pre-rpc interceptor for list_topic_subscriptions 

406 

407 Override in a subclass to manipulate the request or metadata 

408 before they are sent to the Publisher server. 

409 """ 

410 return request, metadata 

411 

412 def post_list_topic_subscriptions( 

413 self, response: pubsub.ListTopicSubscriptionsResponse 

414 ) -> pubsub.ListTopicSubscriptionsResponse: 

415 """Post-rpc interceptor for list_topic_subscriptions 

416 

417 DEPRECATED. Please use the `post_list_topic_subscriptions_with_metadata` 

418 interceptor instead. 

419 

420 Override in a subclass to read or manipulate the response 

421 after it is returned by the Publisher server but before 

422 it is returned to user code. This `post_list_topic_subscriptions` interceptor runs 

423 before the `post_list_topic_subscriptions_with_metadata` interceptor. 

424 """ 

425 return response 

426 

427 def post_list_topic_subscriptions_with_metadata( 

428 self, 

429 response: pubsub.ListTopicSubscriptionsResponse, 

430 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

431 ) -> Tuple[ 

432 pubsub.ListTopicSubscriptionsResponse, Sequence[Tuple[str, Union[str, bytes]]] 

433 ]: 

434 """Post-rpc interceptor for list_topic_subscriptions 

435 

436 Override in a subclass to read or manipulate the response or metadata after it 

437 is returned by the Publisher server but before it is returned to user code. 

438 

439 We recommend only using this `post_list_topic_subscriptions_with_metadata` 

440 interceptor in new development instead of the `post_list_topic_subscriptions` interceptor. 

441 When both interceptors are used, this `post_list_topic_subscriptions_with_metadata` interceptor runs after the 

442 `post_list_topic_subscriptions` interceptor. The (possibly modified) response returned by 

443 `post_list_topic_subscriptions` will be passed to 

444 `post_list_topic_subscriptions_with_metadata`. 

445 """ 

446 return response, metadata 

447 

448 def pre_publish( 

449 self, 

450 request: pubsub.PublishRequest, 

451 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

452 ) -> Tuple[pubsub.PublishRequest, Sequence[Tuple[str, Union[str, bytes]]]]: 

453 """Pre-rpc interceptor for publish 

454 

455 Override in a subclass to manipulate the request or metadata 

456 before they are sent to the Publisher server. 

457 """ 

458 return request, metadata 

459 

460 def post_publish(self, response: pubsub.PublishResponse) -> pubsub.PublishResponse: 

461 """Post-rpc interceptor for publish 

462 

463 DEPRECATED. Please use the `post_publish_with_metadata` 

464 interceptor instead. 

465 

466 Override in a subclass to read or manipulate the response 

467 after it is returned by the Publisher server but before 

468 it is returned to user code. This `post_publish` interceptor runs 

469 before the `post_publish_with_metadata` interceptor. 

470 """ 

471 return response 

472 

473 def post_publish_with_metadata( 

474 self, 

475 response: pubsub.PublishResponse, 

476 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

477 ) -> Tuple[pubsub.PublishResponse, Sequence[Tuple[str, Union[str, bytes]]]]: 

478 """Post-rpc interceptor for publish 

479 

480 Override in a subclass to read or manipulate the response or metadata after it 

481 is returned by the Publisher server but before it is returned to user code. 

482 

483 We recommend only using this `post_publish_with_metadata` 

484 interceptor in new development instead of the `post_publish` interceptor. 

485 When both interceptors are used, this `post_publish_with_metadata` interceptor runs after the 

486 `post_publish` interceptor. The (possibly modified) response returned by 

487 `post_publish` will be passed to 

488 `post_publish_with_metadata`. 

489 """ 

490 return response, metadata 

491 

492 def pre_update_topic( 

493 self, 

494 request: pubsub.UpdateTopicRequest, 

495 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

496 ) -> Tuple[pubsub.UpdateTopicRequest, Sequence[Tuple[str, Union[str, bytes]]]]: 

497 """Pre-rpc interceptor for update_topic 

498 

499 Override in a subclass to manipulate the request or metadata 

500 before they are sent to the Publisher server. 

501 """ 

502 return request, metadata 

503 

504 def post_update_topic(self, response: pubsub.Topic) -> pubsub.Topic: 

505 """Post-rpc interceptor for update_topic 

506 

507 DEPRECATED. Please use the `post_update_topic_with_metadata` 

508 interceptor instead. 

509 

510 Override in a subclass to read or manipulate the response 

511 after it is returned by the Publisher server but before 

512 it is returned to user code. This `post_update_topic` interceptor runs 

513 before the `post_update_topic_with_metadata` interceptor. 

514 """ 

515 return response 

516 

517 def post_update_topic_with_metadata( 

518 self, response: pubsub.Topic, metadata: Sequence[Tuple[str, Union[str, bytes]]] 

519 ) -> Tuple[pubsub.Topic, Sequence[Tuple[str, Union[str, bytes]]]]: 

520 """Post-rpc interceptor for update_topic 

521 

522 Override in a subclass to read or manipulate the response or metadata after it 

523 is returned by the Publisher server but before it is returned to user code. 

524 

525 We recommend only using this `post_update_topic_with_metadata` 

526 interceptor in new development instead of the `post_update_topic` interceptor. 

527 When both interceptors are used, this `post_update_topic_with_metadata` interceptor runs after the 

528 `post_update_topic` interceptor. The (possibly modified) response returned by 

529 `post_update_topic` will be passed to 

530 `post_update_topic_with_metadata`. 

531 """ 

532 return response, metadata 

533 

534 def pre_get_iam_policy( 

535 self, 

536 request: iam_policy_pb2.GetIamPolicyRequest, 

537 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

538 ) -> Tuple[ 

539 iam_policy_pb2.GetIamPolicyRequest, Sequence[Tuple[str, Union[str, bytes]]] 

540 ]: 

541 """Pre-rpc interceptor for get_iam_policy 

542 

543 Override in a subclass to manipulate the request or metadata 

544 before they are sent to the Publisher server. 

545 """ 

546 return request, metadata 

547 

548 def post_get_iam_policy(self, response: policy_pb2.Policy) -> policy_pb2.Policy: 

549 """Post-rpc interceptor for get_iam_policy 

550 

551 Override in a subclass to manipulate the response 

552 after it is returned by the Publisher server but before 

553 it is returned to user code. 

554 """ 

555 return response 

556 

557 def pre_set_iam_policy( 

558 self, 

559 request: iam_policy_pb2.SetIamPolicyRequest, 

560 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

561 ) -> Tuple[ 

562 iam_policy_pb2.SetIamPolicyRequest, Sequence[Tuple[str, Union[str, bytes]]] 

563 ]: 

564 """Pre-rpc interceptor for set_iam_policy 

565 

566 Override in a subclass to manipulate the request or metadata 

567 before they are sent to the Publisher server. 

568 """ 

569 return request, metadata 

570 

571 def post_set_iam_policy(self, response: policy_pb2.Policy) -> policy_pb2.Policy: 

572 """Post-rpc interceptor for set_iam_policy 

573 

574 Override in a subclass to manipulate the response 

575 after it is returned by the Publisher server but before 

576 it is returned to user code. 

577 """ 

578 return response 

579 

580 def pre_test_iam_permissions( 

581 self, 

582 request: iam_policy_pb2.TestIamPermissionsRequest, 

583 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

584 ) -> Tuple[ 

585 iam_policy_pb2.TestIamPermissionsRequest, 

586 Sequence[Tuple[str, Union[str, bytes]]], 

587 ]: 

588 """Pre-rpc interceptor for test_iam_permissions 

589 

590 Override in a subclass to manipulate the request or metadata 

591 before they are sent to the Publisher server. 

592 """ 

593 return request, metadata 

594 

595 def post_test_iam_permissions( 

596 self, response: iam_policy_pb2.TestIamPermissionsResponse 

597 ) -> iam_policy_pb2.TestIamPermissionsResponse: 

598 """Post-rpc interceptor for test_iam_permissions 

599 

600 Override in a subclass to manipulate the response 

601 after it is returned by the Publisher server but before 

602 it is returned to user code. 

603 """ 

604 return response 

605 

606 

607@dataclasses.dataclass 

608class PublisherRestStub: 

609 _session: AuthorizedSession 

610 _host: str 

611 _interceptor: PublisherRestInterceptor 

612 

613 

614class PublisherRestTransport(_BasePublisherRestTransport): 

615 """REST backend synchronous transport for Publisher. 

616 

617 The service that an application uses to manipulate topics, 

618 and to send messages to a topic. 

619 

620 This class defines the same methods as the primary client, so the 

621 primary client can load the underlying transport implementation 

622 and call it. 

623 

624 It sends JSON representations of protocol buffers over HTTP/1.1 

625 """ 

626 

627 def __init__( 

628 self, 

629 *, 

630 host: str = "pubsub.googleapis.com", 

631 credentials: Optional[ga_credentials.Credentials] = None, 

632 credentials_file: Optional[str] = None, 

633 scopes: Optional[Sequence[str]] = None, 

634 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

635 quota_project_id: Optional[str] = None, 

636 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

637 always_use_jwt_access: Optional[bool] = False, 

638 url_scheme: str = "https", 

639 interceptor: Optional[PublisherRestInterceptor] = None, 

640 api_audience: Optional[str] = None, 

641 ) -> None: 

642 """Instantiate the transport. 

643 

644 Args: 

645 host (Optional[str]): 

646 The hostname to connect to (default: 'pubsub.googleapis.com'). 

647 credentials (Optional[google.auth.credentials.Credentials]): The 

648 authorization credentials to attach to requests. These 

649 credentials identify the application to the service; if none 

650 are specified, the client will attempt to ascertain the 

651 credentials from the environment. 

652 

653 credentials_file (Optional[str]): Deprecated. A file with credentials that can 

654 be loaded with :func:`google.auth.load_credentials_from_file`. 

655 This argument is ignored if ``channel`` is provided. This argument will be 

656 removed in the next major version of this library. 

657 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

658 ignored if ``channel`` is provided. 

659 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client 

660 certificate to configure mutual TLS HTTP channel. It is ignored 

661 if ``channel`` is provided. 

662 quota_project_id (Optional[str]): An optional project to use for billing 

663 and quota. 

664 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

665 The client info used to send a user-agent string along with 

666 API requests. If ``None``, then default info will be used. 

667 Generally, you only need to set this if you are developing 

668 your own client library. 

669 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

670 be used for service account credentials. 

671 url_scheme: the protocol scheme for the API endpoint. Normally 

672 "https", but for testing or local servers, 

673 "http" can be specified. 

674 """ 

675 # Run the base constructor 

676 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc. 

677 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the 

678 # credentials object 

679 super().__init__( 

680 host=host, 

681 credentials=credentials, 

682 client_info=client_info, 

683 always_use_jwt_access=always_use_jwt_access, 

684 url_scheme=url_scheme, 

685 api_audience=api_audience, 

686 ) 

687 self._session = AuthorizedSession( 

688 self._credentials, default_host=self.DEFAULT_HOST 

689 ) 

690 if client_cert_source_for_mtls: 

691 self._session.configure_mtls_channel(client_cert_source_for_mtls) 

692 self._interceptor = interceptor or PublisherRestInterceptor() 

693 self._prep_wrapped_messages(client_info) 

694 

695 class _CreateTopic(_BasePublisherRestTransport._BaseCreateTopic, PublisherRestStub): 

696 def __hash__(self): 

697 return hash("PublisherRestTransport.CreateTopic") 

698 

699 @staticmethod 

700 def _get_response( 

701 host, 

702 metadata, 

703 query_params, 

704 session, 

705 timeout, 

706 transcoded_request, 

707 body=None, 

708 ): 

709 uri = transcoded_request["uri"] 

710 method = transcoded_request["method"] 

711 headers = dict(metadata) 

712 headers["Content-Type"] = "application/json" 

713 response = getattr(session, method)( 

714 "{host}{uri}".format(host=host, uri=uri), 

715 timeout=timeout, 

716 headers=headers, 

717 params=rest_helpers.flatten_query_params(query_params, strict=True), 

718 data=body, 

719 ) 

720 return response 

721 

722 def __call__( 

723 self, 

724 request: pubsub.Topic, 

725 *, 

726 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

727 timeout: Optional[float] = None, 

728 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

729 ) -> pubsub.Topic: 

730 r"""Call the create topic method over HTTP. 

731 

732 Args: 

733 request (~.pubsub.Topic): 

734 The request object. A topic resource. 

735 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

736 should be retried. 

737 timeout (float): The timeout for this request. 

738 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

739 sent along with the request as metadata. Normally, each value must be of type `str`, 

740 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

741 be of type `bytes`. 

742 

743 Returns: 

744 ~.pubsub.Topic: 

745 A topic resource. 

746 """ 

747 

748 http_options = ( 

749 _BasePublisherRestTransport._BaseCreateTopic._get_http_options() 

750 ) 

751 

752 request, metadata = self._interceptor.pre_create_topic(request, metadata) 

753 transcoded_request = ( 

754 _BasePublisherRestTransport._BaseCreateTopic._get_transcoded_request( 

755 http_options, request 

756 ) 

757 ) 

758 

759 body = _BasePublisherRestTransport._BaseCreateTopic._get_request_body_json( 

760 transcoded_request 

761 ) 

762 

763 # Jsonify the query params 

764 query_params = ( 

765 _BasePublisherRestTransport._BaseCreateTopic._get_query_params_json( 

766 transcoded_request 

767 ) 

768 ) 

769 

770 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

771 logging.DEBUG 

772 ): # pragma: NO COVER 

773 request_url = "{host}{uri}".format( 

774 host=self._host, uri=transcoded_request["uri"] 

775 ) 

776 method = transcoded_request["method"] 

777 try: 

778 request_payload = type(request).to_json(request) 

779 except: 

780 request_payload = None 

781 http_request = { 

782 "payload": request_payload, 

783 "requestMethod": method, 

784 "requestUrl": request_url, 

785 "headers": dict(metadata), 

786 } 

787 _LOGGER.debug( 

788 f"Sending request for google.pubsub_v1.PublisherClient.CreateTopic", 

789 extra={ 

790 "serviceName": "google.pubsub.v1.Publisher", 

791 "rpcName": "CreateTopic", 

792 "httpRequest": http_request, 

793 "metadata": http_request["headers"], 

794 }, 

795 ) 

796 

797 # Send the request 

798 response = PublisherRestTransport._CreateTopic._get_response( 

799 self._host, 

800 metadata, 

801 query_params, 

802 self._session, 

803 timeout, 

804 transcoded_request, 

805 body, 

806 ) 

807 

808 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

809 # subclass. 

810 if response.status_code >= 400: 

811 raise core_exceptions.from_http_response(response) 

812 

813 # Return the response 

814 resp = pubsub.Topic() 

815 pb_resp = pubsub.Topic.pb(resp) 

816 

817 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

818 

819 resp = self._interceptor.post_create_topic(resp) 

820 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

821 resp, _ = self._interceptor.post_create_topic_with_metadata( 

822 resp, response_metadata 

823 ) 

824 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

825 logging.DEBUG 

826 ): # pragma: NO COVER 

827 try: 

828 response_payload = pubsub.Topic.to_json(response) 

829 except: 

830 response_payload = None 

831 http_response = { 

832 "payload": response_payload, 

833 "headers": dict(response.headers), 

834 "status": response.status_code, 

835 } 

836 _LOGGER.debug( 

837 "Received response for google.pubsub_v1.PublisherClient.create_topic", 

838 extra={ 

839 "serviceName": "google.pubsub.v1.Publisher", 

840 "rpcName": "CreateTopic", 

841 "metadata": http_response["headers"], 

842 "httpResponse": http_response, 

843 }, 

844 ) 

845 return resp 

846 

847 class _DeleteTopic(_BasePublisherRestTransport._BaseDeleteTopic, PublisherRestStub): 

848 def __hash__(self): 

849 return hash("PublisherRestTransport.DeleteTopic") 

850 

851 @staticmethod 

852 def _get_response( 

853 host, 

854 metadata, 

855 query_params, 

856 session, 

857 timeout, 

858 transcoded_request, 

859 body=None, 

860 ): 

861 uri = transcoded_request["uri"] 

862 method = transcoded_request["method"] 

863 headers = dict(metadata) 

864 headers["Content-Type"] = "application/json" 

865 response = getattr(session, method)( 

866 "{host}{uri}".format(host=host, uri=uri), 

867 timeout=timeout, 

868 headers=headers, 

869 params=rest_helpers.flatten_query_params(query_params, strict=True), 

870 ) 

871 return response 

872 

873 def __call__( 

874 self, 

875 request: pubsub.DeleteTopicRequest, 

876 *, 

877 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

878 timeout: Optional[float] = None, 

879 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

880 ): 

881 r"""Call the delete topic method over HTTP. 

882 

883 Args: 

884 request (~.pubsub.DeleteTopicRequest): 

885 The request object. Request for the ``DeleteTopic`` method. 

886 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

887 should be retried. 

888 timeout (float): The timeout for this request. 

889 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

890 sent along with the request as metadata. Normally, each value must be of type `str`, 

891 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

892 be of type `bytes`. 

893 """ 

894 

895 http_options = ( 

896 _BasePublisherRestTransport._BaseDeleteTopic._get_http_options() 

897 ) 

898 

899 request, metadata = self._interceptor.pre_delete_topic(request, metadata) 

900 transcoded_request = ( 

901 _BasePublisherRestTransport._BaseDeleteTopic._get_transcoded_request( 

902 http_options, request 

903 ) 

904 ) 

905 

906 # Jsonify the query params 

907 query_params = ( 

908 _BasePublisherRestTransport._BaseDeleteTopic._get_query_params_json( 

909 transcoded_request 

910 ) 

911 ) 

912 

913 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

914 logging.DEBUG 

915 ): # pragma: NO COVER 

916 request_url = "{host}{uri}".format( 

917 host=self._host, uri=transcoded_request["uri"] 

918 ) 

919 method = transcoded_request["method"] 

920 try: 

921 request_payload = json_format.MessageToJson(request) 

922 except: 

923 request_payload = None 

924 http_request = { 

925 "payload": request_payload, 

926 "requestMethod": method, 

927 "requestUrl": request_url, 

928 "headers": dict(metadata), 

929 } 

930 _LOGGER.debug( 

931 f"Sending request for google.pubsub_v1.PublisherClient.DeleteTopic", 

932 extra={ 

933 "serviceName": "google.pubsub.v1.Publisher", 

934 "rpcName": "DeleteTopic", 

935 "httpRequest": http_request, 

936 "metadata": http_request["headers"], 

937 }, 

938 ) 

939 

940 # Send the request 

941 response = PublisherRestTransport._DeleteTopic._get_response( 

942 self._host, 

943 metadata, 

944 query_params, 

945 self._session, 

946 timeout, 

947 transcoded_request, 

948 ) 

949 

950 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

951 # subclass. 

952 if response.status_code >= 400: 

953 raise core_exceptions.from_http_response(response) 

954 

955 class _DetachSubscription( 

956 _BasePublisherRestTransport._BaseDetachSubscription, PublisherRestStub 

957 ): 

958 def __hash__(self): 

959 return hash("PublisherRestTransport.DetachSubscription") 

960 

961 @staticmethod 

962 def _get_response( 

963 host, 

964 metadata, 

965 query_params, 

966 session, 

967 timeout, 

968 transcoded_request, 

969 body=None, 

970 ): 

971 uri = transcoded_request["uri"] 

972 method = transcoded_request["method"] 

973 headers = dict(metadata) 

974 headers["Content-Type"] = "application/json" 

975 response = getattr(session, method)( 

976 "{host}{uri}".format(host=host, uri=uri), 

977 timeout=timeout, 

978 headers=headers, 

979 params=rest_helpers.flatten_query_params(query_params, strict=True), 

980 ) 

981 return response 

982 

983 def __call__( 

984 self, 

985 request: pubsub.DetachSubscriptionRequest, 

986 *, 

987 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

988 timeout: Optional[float] = None, 

989 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

990 ) -> pubsub.DetachSubscriptionResponse: 

991 r"""Call the detach subscription method over HTTP. 

992 

993 Args: 

994 request (~.pubsub.DetachSubscriptionRequest): 

995 The request object. Request for the DetachSubscription 

996 method. 

997 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

998 should be retried. 

999 timeout (float): The timeout for this request. 

1000 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

1001 sent along with the request as metadata. Normally, each value must be of type `str`, 

1002 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

1003 be of type `bytes`. 

1004 

1005 Returns: 

1006 ~.pubsub.DetachSubscriptionResponse: 

1007 Response for the DetachSubscription 

1008 method. Reserved for future use. 

1009 

1010 """ 

1011 

1012 http_options = ( 

1013 _BasePublisherRestTransport._BaseDetachSubscription._get_http_options() 

1014 ) 

1015 

1016 request, metadata = self._interceptor.pre_detach_subscription( 

1017 request, metadata 

1018 ) 

1019 transcoded_request = _BasePublisherRestTransport._BaseDetachSubscription._get_transcoded_request( 

1020 http_options, request 

1021 ) 

1022 

1023 # Jsonify the query params 

1024 query_params = _BasePublisherRestTransport._BaseDetachSubscription._get_query_params_json( 

1025 transcoded_request 

1026 ) 

1027 

1028 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1029 logging.DEBUG 

1030 ): # pragma: NO COVER 

1031 request_url = "{host}{uri}".format( 

1032 host=self._host, uri=transcoded_request["uri"] 

1033 ) 

1034 method = transcoded_request["method"] 

1035 try: 

1036 request_payload = type(request).to_json(request) 

1037 except: 

1038 request_payload = None 

1039 http_request = { 

1040 "payload": request_payload, 

1041 "requestMethod": method, 

1042 "requestUrl": request_url, 

1043 "headers": dict(metadata), 

1044 } 

1045 _LOGGER.debug( 

1046 f"Sending request for google.pubsub_v1.PublisherClient.DetachSubscription", 

1047 extra={ 

1048 "serviceName": "google.pubsub.v1.Publisher", 

1049 "rpcName": "DetachSubscription", 

1050 "httpRequest": http_request, 

1051 "metadata": http_request["headers"], 

1052 }, 

1053 ) 

1054 

1055 # Send the request 

1056 response = PublisherRestTransport._DetachSubscription._get_response( 

1057 self._host, 

1058 metadata, 

1059 query_params, 

1060 self._session, 

1061 timeout, 

1062 transcoded_request, 

1063 ) 

1064 

1065 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

1066 # subclass. 

1067 if response.status_code >= 400: 

1068 raise core_exceptions.from_http_response(response) 

1069 

1070 # Return the response 

1071 resp = pubsub.DetachSubscriptionResponse() 

1072 pb_resp = pubsub.DetachSubscriptionResponse.pb(resp) 

1073 

1074 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

1075 

1076 resp = self._interceptor.post_detach_subscription(resp) 

1077 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

1078 resp, _ = self._interceptor.post_detach_subscription_with_metadata( 

1079 resp, response_metadata 

1080 ) 

1081 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1082 logging.DEBUG 

1083 ): # pragma: NO COVER 

1084 try: 

1085 response_payload = pubsub.DetachSubscriptionResponse.to_json( 

1086 response 

1087 ) 

1088 except: 

1089 response_payload = None 

1090 http_response = { 

1091 "payload": response_payload, 

1092 "headers": dict(response.headers), 

1093 "status": response.status_code, 

1094 } 

1095 _LOGGER.debug( 

1096 "Received response for google.pubsub_v1.PublisherClient.detach_subscription", 

1097 extra={ 

1098 "serviceName": "google.pubsub.v1.Publisher", 

1099 "rpcName": "DetachSubscription", 

1100 "metadata": http_response["headers"], 

1101 "httpResponse": http_response, 

1102 }, 

1103 ) 

1104 return resp 

1105 

1106 class _GetTopic(_BasePublisherRestTransport._BaseGetTopic, PublisherRestStub): 

1107 def __hash__(self): 

1108 return hash("PublisherRestTransport.GetTopic") 

1109 

1110 @staticmethod 

1111 def _get_response( 

1112 host, 

1113 metadata, 

1114 query_params, 

1115 session, 

1116 timeout, 

1117 transcoded_request, 

1118 body=None, 

1119 ): 

1120 uri = transcoded_request["uri"] 

1121 method = transcoded_request["method"] 

1122 headers = dict(metadata) 

1123 headers["Content-Type"] = "application/json" 

1124 response = getattr(session, method)( 

1125 "{host}{uri}".format(host=host, uri=uri), 

1126 timeout=timeout, 

1127 headers=headers, 

1128 params=rest_helpers.flatten_query_params(query_params, strict=True), 

1129 ) 

1130 return response 

1131 

1132 def __call__( 

1133 self, 

1134 request: pubsub.GetTopicRequest, 

1135 *, 

1136 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

1137 timeout: Optional[float] = None, 

1138 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

1139 ) -> pubsub.Topic: 

1140 r"""Call the get topic method over HTTP. 

1141 

1142 Args: 

1143 request (~.pubsub.GetTopicRequest): 

1144 The request object. Request for the GetTopic method. 

1145 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

1146 should be retried. 

1147 timeout (float): The timeout for this request. 

1148 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

1149 sent along with the request as metadata. Normally, each value must be of type `str`, 

1150 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

1151 be of type `bytes`. 

1152 

1153 Returns: 

1154 ~.pubsub.Topic: 

1155 A topic resource. 

1156 """ 

1157 

1158 http_options = _BasePublisherRestTransport._BaseGetTopic._get_http_options() 

1159 

1160 request, metadata = self._interceptor.pre_get_topic(request, metadata) 

1161 transcoded_request = ( 

1162 _BasePublisherRestTransport._BaseGetTopic._get_transcoded_request( 

1163 http_options, request 

1164 ) 

1165 ) 

1166 

1167 # Jsonify the query params 

1168 query_params = ( 

1169 _BasePublisherRestTransport._BaseGetTopic._get_query_params_json( 

1170 transcoded_request 

1171 ) 

1172 ) 

1173 

1174 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1175 logging.DEBUG 

1176 ): # pragma: NO COVER 

1177 request_url = "{host}{uri}".format( 

1178 host=self._host, uri=transcoded_request["uri"] 

1179 ) 

1180 method = transcoded_request["method"] 

1181 try: 

1182 request_payload = type(request).to_json(request) 

1183 except: 

1184 request_payload = None 

1185 http_request = { 

1186 "payload": request_payload, 

1187 "requestMethod": method, 

1188 "requestUrl": request_url, 

1189 "headers": dict(metadata), 

1190 } 

1191 _LOGGER.debug( 

1192 f"Sending request for google.pubsub_v1.PublisherClient.GetTopic", 

1193 extra={ 

1194 "serviceName": "google.pubsub.v1.Publisher", 

1195 "rpcName": "GetTopic", 

1196 "httpRequest": http_request, 

1197 "metadata": http_request["headers"], 

1198 }, 

1199 ) 

1200 

1201 # Send the request 

1202 response = PublisherRestTransport._GetTopic._get_response( 

1203 self._host, 

1204 metadata, 

1205 query_params, 

1206 self._session, 

1207 timeout, 

1208 transcoded_request, 

1209 ) 

1210 

1211 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

1212 # subclass. 

1213 if response.status_code >= 400: 

1214 raise core_exceptions.from_http_response(response) 

1215 

1216 # Return the response 

1217 resp = pubsub.Topic() 

1218 pb_resp = pubsub.Topic.pb(resp) 

1219 

1220 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

1221 

1222 resp = self._interceptor.post_get_topic(resp) 

1223 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

1224 resp, _ = self._interceptor.post_get_topic_with_metadata( 

1225 resp, response_metadata 

1226 ) 

1227 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1228 logging.DEBUG 

1229 ): # pragma: NO COVER 

1230 try: 

1231 response_payload = pubsub.Topic.to_json(response) 

1232 except: 

1233 response_payload = None 

1234 http_response = { 

1235 "payload": response_payload, 

1236 "headers": dict(response.headers), 

1237 "status": response.status_code, 

1238 } 

1239 _LOGGER.debug( 

1240 "Received response for google.pubsub_v1.PublisherClient.get_topic", 

1241 extra={ 

1242 "serviceName": "google.pubsub.v1.Publisher", 

1243 "rpcName": "GetTopic", 

1244 "metadata": http_response["headers"], 

1245 "httpResponse": http_response, 

1246 }, 

1247 ) 

1248 return resp 

1249 

1250 class _ListTopics(_BasePublisherRestTransport._BaseListTopics, PublisherRestStub): 

1251 def __hash__(self): 

1252 return hash("PublisherRestTransport.ListTopics") 

1253 

1254 @staticmethod 

1255 def _get_response( 

1256 host, 

1257 metadata, 

1258 query_params, 

1259 session, 

1260 timeout, 

1261 transcoded_request, 

1262 body=None, 

1263 ): 

1264 uri = transcoded_request["uri"] 

1265 method = transcoded_request["method"] 

1266 headers = dict(metadata) 

1267 headers["Content-Type"] = "application/json" 

1268 response = getattr(session, method)( 

1269 "{host}{uri}".format(host=host, uri=uri), 

1270 timeout=timeout, 

1271 headers=headers, 

1272 params=rest_helpers.flatten_query_params(query_params, strict=True), 

1273 ) 

1274 return response 

1275 

1276 def __call__( 

1277 self, 

1278 request: pubsub.ListTopicsRequest, 

1279 *, 

1280 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

1281 timeout: Optional[float] = None, 

1282 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

1283 ) -> pubsub.ListTopicsResponse: 

1284 r"""Call the list topics method over HTTP. 

1285 

1286 Args: 

1287 request (~.pubsub.ListTopicsRequest): 

1288 The request object. Request for the ``ListTopics`` method. 

1289 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

1290 should be retried. 

1291 timeout (float): The timeout for this request. 

1292 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

1293 sent along with the request as metadata. Normally, each value must be of type `str`, 

1294 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

1295 be of type `bytes`. 

1296 

1297 Returns: 

1298 ~.pubsub.ListTopicsResponse: 

1299 Response for the ``ListTopics`` method. 

1300 """ 

1301 

1302 http_options = ( 

1303 _BasePublisherRestTransport._BaseListTopics._get_http_options() 

1304 ) 

1305 

1306 request, metadata = self._interceptor.pre_list_topics(request, metadata) 

1307 transcoded_request = ( 

1308 _BasePublisherRestTransport._BaseListTopics._get_transcoded_request( 

1309 http_options, request 

1310 ) 

1311 ) 

1312 

1313 # Jsonify the query params 

1314 query_params = ( 

1315 _BasePublisherRestTransport._BaseListTopics._get_query_params_json( 

1316 transcoded_request 

1317 ) 

1318 ) 

1319 

1320 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1321 logging.DEBUG 

1322 ): # pragma: NO COVER 

1323 request_url = "{host}{uri}".format( 

1324 host=self._host, uri=transcoded_request["uri"] 

1325 ) 

1326 method = transcoded_request["method"] 

1327 try: 

1328 request_payload = type(request).to_json(request) 

1329 except: 

1330 request_payload = None 

1331 http_request = { 

1332 "payload": request_payload, 

1333 "requestMethod": method, 

1334 "requestUrl": request_url, 

1335 "headers": dict(metadata), 

1336 } 

1337 _LOGGER.debug( 

1338 f"Sending request for google.pubsub_v1.PublisherClient.ListTopics", 

1339 extra={ 

1340 "serviceName": "google.pubsub.v1.Publisher", 

1341 "rpcName": "ListTopics", 

1342 "httpRequest": http_request, 

1343 "metadata": http_request["headers"], 

1344 }, 

1345 ) 

1346 

1347 # Send the request 

1348 response = PublisherRestTransport._ListTopics._get_response( 

1349 self._host, 

1350 metadata, 

1351 query_params, 

1352 self._session, 

1353 timeout, 

1354 transcoded_request, 

1355 ) 

1356 

1357 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

1358 # subclass. 

1359 if response.status_code >= 400: 

1360 raise core_exceptions.from_http_response(response) 

1361 

1362 # Return the response 

1363 resp = pubsub.ListTopicsResponse() 

1364 pb_resp = pubsub.ListTopicsResponse.pb(resp) 

1365 

1366 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

1367 

1368 resp = self._interceptor.post_list_topics(resp) 

1369 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

1370 resp, _ = self._interceptor.post_list_topics_with_metadata( 

1371 resp, response_metadata 

1372 ) 

1373 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1374 logging.DEBUG 

1375 ): # pragma: NO COVER 

1376 try: 

1377 response_payload = pubsub.ListTopicsResponse.to_json(response) 

1378 except: 

1379 response_payload = None 

1380 http_response = { 

1381 "payload": response_payload, 

1382 "headers": dict(response.headers), 

1383 "status": response.status_code, 

1384 } 

1385 _LOGGER.debug( 

1386 "Received response for google.pubsub_v1.PublisherClient.list_topics", 

1387 extra={ 

1388 "serviceName": "google.pubsub.v1.Publisher", 

1389 "rpcName": "ListTopics", 

1390 "metadata": http_response["headers"], 

1391 "httpResponse": http_response, 

1392 }, 

1393 ) 

1394 return resp 

1395 

1396 class _ListTopicSnapshots( 

1397 _BasePublisherRestTransport._BaseListTopicSnapshots, PublisherRestStub 

1398 ): 

1399 def __hash__(self): 

1400 return hash("PublisherRestTransport.ListTopicSnapshots") 

1401 

1402 @staticmethod 

1403 def _get_response( 

1404 host, 

1405 metadata, 

1406 query_params, 

1407 session, 

1408 timeout, 

1409 transcoded_request, 

1410 body=None, 

1411 ): 

1412 uri = transcoded_request["uri"] 

1413 method = transcoded_request["method"] 

1414 headers = dict(metadata) 

1415 headers["Content-Type"] = "application/json" 

1416 response = getattr(session, method)( 

1417 "{host}{uri}".format(host=host, uri=uri), 

1418 timeout=timeout, 

1419 headers=headers, 

1420 params=rest_helpers.flatten_query_params(query_params, strict=True), 

1421 ) 

1422 return response 

1423 

1424 def __call__( 

1425 self, 

1426 request: pubsub.ListTopicSnapshotsRequest, 

1427 *, 

1428 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

1429 timeout: Optional[float] = None, 

1430 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

1431 ) -> pubsub.ListTopicSnapshotsResponse: 

1432 r"""Call the list topic snapshots method over HTTP. 

1433 

1434 Args: 

1435 request (~.pubsub.ListTopicSnapshotsRequest): 

1436 The request object. Request for the ``ListTopicSnapshots`` method. 

1437 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

1438 should be retried. 

1439 timeout (float): The timeout for this request. 

1440 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

1441 sent along with the request as metadata. Normally, each value must be of type `str`, 

1442 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

1443 be of type `bytes`. 

1444 

1445 Returns: 

1446 ~.pubsub.ListTopicSnapshotsResponse: 

1447 Response for the ``ListTopicSnapshots`` method. 

1448 """ 

1449 

1450 http_options = ( 

1451 _BasePublisherRestTransport._BaseListTopicSnapshots._get_http_options() 

1452 ) 

1453 

1454 request, metadata = self._interceptor.pre_list_topic_snapshots( 

1455 request, metadata 

1456 ) 

1457 transcoded_request = _BasePublisherRestTransport._BaseListTopicSnapshots._get_transcoded_request( 

1458 http_options, request 

1459 ) 

1460 

1461 # Jsonify the query params 

1462 query_params = _BasePublisherRestTransport._BaseListTopicSnapshots._get_query_params_json( 

1463 transcoded_request 

1464 ) 

1465 

1466 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1467 logging.DEBUG 

1468 ): # pragma: NO COVER 

1469 request_url = "{host}{uri}".format( 

1470 host=self._host, uri=transcoded_request["uri"] 

1471 ) 

1472 method = transcoded_request["method"] 

1473 try: 

1474 request_payload = type(request).to_json(request) 

1475 except: 

1476 request_payload = None 

1477 http_request = { 

1478 "payload": request_payload, 

1479 "requestMethod": method, 

1480 "requestUrl": request_url, 

1481 "headers": dict(metadata), 

1482 } 

1483 _LOGGER.debug( 

1484 f"Sending request for google.pubsub_v1.PublisherClient.ListTopicSnapshots", 

1485 extra={ 

1486 "serviceName": "google.pubsub.v1.Publisher", 

1487 "rpcName": "ListTopicSnapshots", 

1488 "httpRequest": http_request, 

1489 "metadata": http_request["headers"], 

1490 }, 

1491 ) 

1492 

1493 # Send the request 

1494 response = PublisherRestTransport._ListTopicSnapshots._get_response( 

1495 self._host, 

1496 metadata, 

1497 query_params, 

1498 self._session, 

1499 timeout, 

1500 transcoded_request, 

1501 ) 

1502 

1503 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

1504 # subclass. 

1505 if response.status_code >= 400: 

1506 raise core_exceptions.from_http_response(response) 

1507 

1508 # Return the response 

1509 resp = pubsub.ListTopicSnapshotsResponse() 

1510 pb_resp = pubsub.ListTopicSnapshotsResponse.pb(resp) 

1511 

1512 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

1513 

1514 resp = self._interceptor.post_list_topic_snapshots(resp) 

1515 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

1516 resp, _ = self._interceptor.post_list_topic_snapshots_with_metadata( 

1517 resp, response_metadata 

1518 ) 

1519 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1520 logging.DEBUG 

1521 ): # pragma: NO COVER 

1522 try: 

1523 response_payload = pubsub.ListTopicSnapshotsResponse.to_json( 

1524 response 

1525 ) 

1526 except: 

1527 response_payload = None 

1528 http_response = { 

1529 "payload": response_payload, 

1530 "headers": dict(response.headers), 

1531 "status": response.status_code, 

1532 } 

1533 _LOGGER.debug( 

1534 "Received response for google.pubsub_v1.PublisherClient.list_topic_snapshots", 

1535 extra={ 

1536 "serviceName": "google.pubsub.v1.Publisher", 

1537 "rpcName": "ListTopicSnapshots", 

1538 "metadata": http_response["headers"], 

1539 "httpResponse": http_response, 

1540 }, 

1541 ) 

1542 return resp 

1543 

1544 class _ListTopicSubscriptions( 

1545 _BasePublisherRestTransport._BaseListTopicSubscriptions, PublisherRestStub 

1546 ): 

1547 def __hash__(self): 

1548 return hash("PublisherRestTransport.ListTopicSubscriptions") 

1549 

1550 @staticmethod 

1551 def _get_response( 

1552 host, 

1553 metadata, 

1554 query_params, 

1555 session, 

1556 timeout, 

1557 transcoded_request, 

1558 body=None, 

1559 ): 

1560 uri = transcoded_request["uri"] 

1561 method = transcoded_request["method"] 

1562 headers = dict(metadata) 

1563 headers["Content-Type"] = "application/json" 

1564 response = getattr(session, method)( 

1565 "{host}{uri}".format(host=host, uri=uri), 

1566 timeout=timeout, 

1567 headers=headers, 

1568 params=rest_helpers.flatten_query_params(query_params, strict=True), 

1569 ) 

1570 return response 

1571 

1572 def __call__( 

1573 self, 

1574 request: pubsub.ListTopicSubscriptionsRequest, 

1575 *, 

1576 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

1577 timeout: Optional[float] = None, 

1578 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

1579 ) -> pubsub.ListTopicSubscriptionsResponse: 

1580 r"""Call the list topic subscriptions method over HTTP. 

1581 

1582 Args: 

1583 request (~.pubsub.ListTopicSubscriptionsRequest): 

1584 The request object. Request for the ``ListTopicSubscriptions`` method. 

1585 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

1586 should be retried. 

1587 timeout (float): The timeout for this request. 

1588 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

1589 sent along with the request as metadata. Normally, each value must be of type `str`, 

1590 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

1591 be of type `bytes`. 

1592 

1593 Returns: 

1594 ~.pubsub.ListTopicSubscriptionsResponse: 

1595 Response for the ``ListTopicSubscriptions`` method. 

1596 """ 

1597 

1598 http_options = ( 

1599 _BasePublisherRestTransport._BaseListTopicSubscriptions._get_http_options() 

1600 ) 

1601 

1602 request, metadata = self._interceptor.pre_list_topic_subscriptions( 

1603 request, metadata 

1604 ) 

1605 transcoded_request = _BasePublisherRestTransport._BaseListTopicSubscriptions._get_transcoded_request( 

1606 http_options, request 

1607 ) 

1608 

1609 # Jsonify the query params 

1610 query_params = _BasePublisherRestTransport._BaseListTopicSubscriptions._get_query_params_json( 

1611 transcoded_request 

1612 ) 

1613 

1614 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1615 logging.DEBUG 

1616 ): # pragma: NO COVER 

1617 request_url = "{host}{uri}".format( 

1618 host=self._host, uri=transcoded_request["uri"] 

1619 ) 

1620 method = transcoded_request["method"] 

1621 try: 

1622 request_payload = type(request).to_json(request) 

1623 except: 

1624 request_payload = None 

1625 http_request = { 

1626 "payload": request_payload, 

1627 "requestMethod": method, 

1628 "requestUrl": request_url, 

1629 "headers": dict(metadata), 

1630 } 

1631 _LOGGER.debug( 

1632 f"Sending request for google.pubsub_v1.PublisherClient.ListTopicSubscriptions", 

1633 extra={ 

1634 "serviceName": "google.pubsub.v1.Publisher", 

1635 "rpcName": "ListTopicSubscriptions", 

1636 "httpRequest": http_request, 

1637 "metadata": http_request["headers"], 

1638 }, 

1639 ) 

1640 

1641 # Send the request 

1642 response = PublisherRestTransport._ListTopicSubscriptions._get_response( 

1643 self._host, 

1644 metadata, 

1645 query_params, 

1646 self._session, 

1647 timeout, 

1648 transcoded_request, 

1649 ) 

1650 

1651 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

1652 # subclass. 

1653 if response.status_code >= 400: 

1654 raise core_exceptions.from_http_response(response) 

1655 

1656 # Return the response 

1657 resp = pubsub.ListTopicSubscriptionsResponse() 

1658 pb_resp = pubsub.ListTopicSubscriptionsResponse.pb(resp) 

1659 

1660 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

1661 

1662 resp = self._interceptor.post_list_topic_subscriptions(resp) 

1663 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

1664 resp, _ = self._interceptor.post_list_topic_subscriptions_with_metadata( 

1665 resp, response_metadata 

1666 ) 

1667 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1668 logging.DEBUG 

1669 ): # pragma: NO COVER 

1670 try: 

1671 response_payload = pubsub.ListTopicSubscriptionsResponse.to_json( 

1672 response 

1673 ) 

1674 except: 

1675 response_payload = None 

1676 http_response = { 

1677 "payload": response_payload, 

1678 "headers": dict(response.headers), 

1679 "status": response.status_code, 

1680 } 

1681 _LOGGER.debug( 

1682 "Received response for google.pubsub_v1.PublisherClient.list_topic_subscriptions", 

1683 extra={ 

1684 "serviceName": "google.pubsub.v1.Publisher", 

1685 "rpcName": "ListTopicSubscriptions", 

1686 "metadata": http_response["headers"], 

1687 "httpResponse": http_response, 

1688 }, 

1689 ) 

1690 return resp 

1691 

1692 class _Publish(_BasePublisherRestTransport._BasePublish, PublisherRestStub): 

1693 def __hash__(self): 

1694 return hash("PublisherRestTransport.Publish") 

1695 

1696 @staticmethod 

1697 def _get_response( 

1698 host, 

1699 metadata, 

1700 query_params, 

1701 session, 

1702 timeout, 

1703 transcoded_request, 

1704 body=None, 

1705 ): 

1706 uri = transcoded_request["uri"] 

1707 method = transcoded_request["method"] 

1708 headers = dict(metadata) 

1709 headers["Content-Type"] = "application/json" 

1710 response = getattr(session, method)( 

1711 "{host}{uri}".format(host=host, uri=uri), 

1712 timeout=timeout, 

1713 headers=headers, 

1714 params=rest_helpers.flatten_query_params(query_params, strict=True), 

1715 data=body, 

1716 ) 

1717 return response 

1718 

1719 def __call__( 

1720 self, 

1721 request: pubsub.PublishRequest, 

1722 *, 

1723 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

1724 timeout: Optional[float] = None, 

1725 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

1726 ) -> pubsub.PublishResponse: 

1727 r"""Call the publish method over HTTP. 

1728 

1729 Args: 

1730 request (~.pubsub.PublishRequest): 

1731 The request object. Request for the Publish method. 

1732 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

1733 should be retried. 

1734 timeout (float): The timeout for this request. 

1735 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

1736 sent along with the request as metadata. Normally, each value must be of type `str`, 

1737 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

1738 be of type `bytes`. 

1739 

1740 Returns: 

1741 ~.pubsub.PublishResponse: 

1742 Response for the ``Publish`` method. 

1743 """ 

1744 

1745 http_options = _BasePublisherRestTransport._BasePublish._get_http_options() 

1746 

1747 request, metadata = self._interceptor.pre_publish(request, metadata) 

1748 transcoded_request = ( 

1749 _BasePublisherRestTransport._BasePublish._get_transcoded_request( 

1750 http_options, request 

1751 ) 

1752 ) 

1753 

1754 body = _BasePublisherRestTransport._BasePublish._get_request_body_json( 

1755 transcoded_request 

1756 ) 

1757 

1758 # Jsonify the query params 

1759 query_params = ( 

1760 _BasePublisherRestTransport._BasePublish._get_query_params_json( 

1761 transcoded_request 

1762 ) 

1763 ) 

1764 

1765 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1766 logging.DEBUG 

1767 ): # pragma: NO COVER 

1768 request_url = "{host}{uri}".format( 

1769 host=self._host, uri=transcoded_request["uri"] 

1770 ) 

1771 method = transcoded_request["method"] 

1772 try: 

1773 request_payload = type(request).to_json(request) 

1774 except: 

1775 request_payload = None 

1776 http_request = { 

1777 "payload": request_payload, 

1778 "requestMethod": method, 

1779 "requestUrl": request_url, 

1780 "headers": dict(metadata), 

1781 } 

1782 _LOGGER.debug( 

1783 f"Sending request for google.pubsub_v1.PublisherClient.Publish", 

1784 extra={ 

1785 "serviceName": "google.pubsub.v1.Publisher", 

1786 "rpcName": "Publish", 

1787 "httpRequest": http_request, 

1788 "metadata": http_request["headers"], 

1789 }, 

1790 ) 

1791 

1792 # Send the request 

1793 response = PublisherRestTransport._Publish._get_response( 

1794 self._host, 

1795 metadata, 

1796 query_params, 

1797 self._session, 

1798 timeout, 

1799 transcoded_request, 

1800 body, 

1801 ) 

1802 

1803 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

1804 # subclass. 

1805 if response.status_code >= 400: 

1806 raise core_exceptions.from_http_response(response) 

1807 

1808 # Return the response 

1809 resp = pubsub.PublishResponse() 

1810 pb_resp = pubsub.PublishResponse.pb(resp) 

1811 

1812 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

1813 

1814 resp = self._interceptor.post_publish(resp) 

1815 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

1816 resp, _ = self._interceptor.post_publish_with_metadata( 

1817 resp, response_metadata 

1818 ) 

1819 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1820 logging.DEBUG 

1821 ): # pragma: NO COVER 

1822 try: 

1823 response_payload = pubsub.PublishResponse.to_json(response) 

1824 except: 

1825 response_payload = None 

1826 http_response = { 

1827 "payload": response_payload, 

1828 "headers": dict(response.headers), 

1829 "status": response.status_code, 

1830 } 

1831 _LOGGER.debug( 

1832 "Received response for google.pubsub_v1.PublisherClient.publish", 

1833 extra={ 

1834 "serviceName": "google.pubsub.v1.Publisher", 

1835 "rpcName": "Publish", 

1836 "metadata": http_response["headers"], 

1837 "httpResponse": http_response, 

1838 }, 

1839 ) 

1840 return resp 

1841 

1842 class _UpdateTopic(_BasePublisherRestTransport._BaseUpdateTopic, PublisherRestStub): 

1843 def __hash__(self): 

1844 return hash("PublisherRestTransport.UpdateTopic") 

1845 

1846 @staticmethod 

1847 def _get_response( 

1848 host, 

1849 metadata, 

1850 query_params, 

1851 session, 

1852 timeout, 

1853 transcoded_request, 

1854 body=None, 

1855 ): 

1856 uri = transcoded_request["uri"] 

1857 method = transcoded_request["method"] 

1858 headers = dict(metadata) 

1859 headers["Content-Type"] = "application/json" 

1860 response = getattr(session, method)( 

1861 "{host}{uri}".format(host=host, uri=uri), 

1862 timeout=timeout, 

1863 headers=headers, 

1864 params=rest_helpers.flatten_query_params(query_params, strict=True), 

1865 data=body, 

1866 ) 

1867 return response 

1868 

1869 def __call__( 

1870 self, 

1871 request: pubsub.UpdateTopicRequest, 

1872 *, 

1873 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

1874 timeout: Optional[float] = None, 

1875 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

1876 ) -> pubsub.Topic: 

1877 r"""Call the update topic method over HTTP. 

1878 

1879 Args: 

1880 request (~.pubsub.UpdateTopicRequest): 

1881 The request object. Request for the UpdateTopic method. 

1882 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

1883 should be retried. 

1884 timeout (float): The timeout for this request. 

1885 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

1886 sent along with the request as metadata. Normally, each value must be of type `str`, 

1887 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

1888 be of type `bytes`. 

1889 

1890 Returns: 

1891 ~.pubsub.Topic: 

1892 A topic resource. 

1893 """ 

1894 

1895 http_options = ( 

1896 _BasePublisherRestTransport._BaseUpdateTopic._get_http_options() 

1897 ) 

1898 

1899 request, metadata = self._interceptor.pre_update_topic(request, metadata) 

1900 transcoded_request = ( 

1901 _BasePublisherRestTransport._BaseUpdateTopic._get_transcoded_request( 

1902 http_options, request 

1903 ) 

1904 ) 

1905 

1906 body = _BasePublisherRestTransport._BaseUpdateTopic._get_request_body_json( 

1907 transcoded_request 

1908 ) 

1909 

1910 # Jsonify the query params 

1911 query_params = ( 

1912 _BasePublisherRestTransport._BaseUpdateTopic._get_query_params_json( 

1913 transcoded_request 

1914 ) 

1915 ) 

1916 

1917 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1918 logging.DEBUG 

1919 ): # pragma: NO COVER 

1920 request_url = "{host}{uri}".format( 

1921 host=self._host, uri=transcoded_request["uri"] 

1922 ) 

1923 method = transcoded_request["method"] 

1924 try: 

1925 request_payload = type(request).to_json(request) 

1926 except: 

1927 request_payload = None 

1928 http_request = { 

1929 "payload": request_payload, 

1930 "requestMethod": method, 

1931 "requestUrl": request_url, 

1932 "headers": dict(metadata), 

1933 } 

1934 _LOGGER.debug( 

1935 f"Sending request for google.pubsub_v1.PublisherClient.UpdateTopic", 

1936 extra={ 

1937 "serviceName": "google.pubsub.v1.Publisher", 

1938 "rpcName": "UpdateTopic", 

1939 "httpRequest": http_request, 

1940 "metadata": http_request["headers"], 

1941 }, 

1942 ) 

1943 

1944 # Send the request 

1945 response = PublisherRestTransport._UpdateTopic._get_response( 

1946 self._host, 

1947 metadata, 

1948 query_params, 

1949 self._session, 

1950 timeout, 

1951 transcoded_request, 

1952 body, 

1953 ) 

1954 

1955 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

1956 # subclass. 

1957 if response.status_code >= 400: 

1958 raise core_exceptions.from_http_response(response) 

1959 

1960 # Return the response 

1961 resp = pubsub.Topic() 

1962 pb_resp = pubsub.Topic.pb(resp) 

1963 

1964 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

1965 

1966 resp = self._interceptor.post_update_topic(resp) 

1967 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

1968 resp, _ = self._interceptor.post_update_topic_with_metadata( 

1969 resp, response_metadata 

1970 ) 

1971 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1972 logging.DEBUG 

1973 ): # pragma: NO COVER 

1974 try: 

1975 response_payload = pubsub.Topic.to_json(response) 

1976 except: 

1977 response_payload = None 

1978 http_response = { 

1979 "payload": response_payload, 

1980 "headers": dict(response.headers), 

1981 "status": response.status_code, 

1982 } 

1983 _LOGGER.debug( 

1984 "Received response for google.pubsub_v1.PublisherClient.update_topic", 

1985 extra={ 

1986 "serviceName": "google.pubsub.v1.Publisher", 

1987 "rpcName": "UpdateTopic", 

1988 "metadata": http_response["headers"], 

1989 "httpResponse": http_response, 

1990 }, 

1991 ) 

1992 return resp 

1993 

1994 @property 

1995 def create_topic(self) -> Callable[[pubsub.Topic], pubsub.Topic]: 

1996 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

1997 # In C++ this would require a dynamic_cast 

1998 return self._CreateTopic(self._session, self._host, self._interceptor) # type: ignore 

1999 

2000 @property 

2001 def delete_topic(self) -> Callable[[pubsub.DeleteTopicRequest], empty_pb2.Empty]: 

2002 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

2003 # In C++ this would require a dynamic_cast 

2004 return self._DeleteTopic(self._session, self._host, self._interceptor) # type: ignore 

2005 

2006 @property 

2007 def detach_subscription( 

2008 self, 

2009 ) -> Callable[ 

2010 [pubsub.DetachSubscriptionRequest], pubsub.DetachSubscriptionResponse 

2011 ]: 

2012 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

2013 # In C++ this would require a dynamic_cast 

2014 return self._DetachSubscription(self._session, self._host, self._interceptor) # type: ignore 

2015 

2016 @property 

2017 def get_topic(self) -> Callable[[pubsub.GetTopicRequest], pubsub.Topic]: 

2018 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

2019 # In C++ this would require a dynamic_cast 

2020 return self._GetTopic(self._session, self._host, self._interceptor) # type: ignore 

2021 

2022 @property 

2023 def list_topics( 

2024 self, 

2025 ) -> Callable[[pubsub.ListTopicsRequest], pubsub.ListTopicsResponse]: 

2026 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

2027 # In C++ this would require a dynamic_cast 

2028 return self._ListTopics(self._session, self._host, self._interceptor) # type: ignore 

2029 

2030 @property 

2031 def list_topic_snapshots( 

2032 self, 

2033 ) -> Callable[ 

2034 [pubsub.ListTopicSnapshotsRequest], pubsub.ListTopicSnapshotsResponse 

2035 ]: 

2036 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

2037 # In C++ this would require a dynamic_cast 

2038 return self._ListTopicSnapshots(self._session, self._host, self._interceptor) # type: ignore 

2039 

2040 @property 

2041 def list_topic_subscriptions( 

2042 self, 

2043 ) -> Callable[ 

2044 [pubsub.ListTopicSubscriptionsRequest], pubsub.ListTopicSubscriptionsResponse 

2045 ]: 

2046 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

2047 # In C++ this would require a dynamic_cast 

2048 return self._ListTopicSubscriptions(self._session, self._host, self._interceptor) # type: ignore 

2049 

2050 @property 

2051 def publish(self) -> Callable[[pubsub.PublishRequest], pubsub.PublishResponse]: 

2052 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

2053 # In C++ this would require a dynamic_cast 

2054 return self._Publish(self._session, self._host, self._interceptor) # type: ignore 

2055 

2056 @property 

2057 def update_topic(self) -> Callable[[pubsub.UpdateTopicRequest], pubsub.Topic]: 

2058 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

2059 # In C++ this would require a dynamic_cast 

2060 return self._UpdateTopic(self._session, self._host, self._interceptor) # type: ignore 

2061 

2062 @property 

2063 def get_iam_policy(self): 

2064 return self._GetIamPolicy(self._session, self._host, self._interceptor) # type: ignore 

2065 

2066 class _GetIamPolicy( 

2067 _BasePublisherRestTransport._BaseGetIamPolicy, PublisherRestStub 

2068 ): 

2069 def __hash__(self): 

2070 return hash("PublisherRestTransport.GetIamPolicy") 

2071 

2072 @staticmethod 

2073 def _get_response( 

2074 host, 

2075 metadata, 

2076 query_params, 

2077 session, 

2078 timeout, 

2079 transcoded_request, 

2080 body=None, 

2081 ): 

2082 uri = transcoded_request["uri"] 

2083 method = transcoded_request["method"] 

2084 headers = dict(metadata) 

2085 headers["Content-Type"] = "application/json" 

2086 response = getattr(session, method)( 

2087 "{host}{uri}".format(host=host, uri=uri), 

2088 timeout=timeout, 

2089 headers=headers, 

2090 params=rest_helpers.flatten_query_params(query_params, strict=True), 

2091 ) 

2092 return response 

2093 

2094 def __call__( 

2095 self, 

2096 request: iam_policy_pb2.GetIamPolicyRequest, 

2097 *, 

2098 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

2099 timeout: Optional[float] = None, 

2100 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

2101 ) -> policy_pb2.Policy: 

2102 r"""Call the get iam policy method over HTTP. 

2103 

2104 Args: 

2105 request (iam_policy_pb2.GetIamPolicyRequest): 

2106 The request object for GetIamPolicy method. 

2107 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

2108 should be retried. 

2109 timeout (float): The timeout for this request. 

2110 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

2111 sent along with the request as metadata. Normally, each value must be of type `str`, 

2112 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

2113 be of type `bytes`. 

2114 

2115 Returns: 

2116 policy_pb2.Policy: Response from GetIamPolicy method. 

2117 """ 

2118 

2119 http_options = ( 

2120 _BasePublisherRestTransport._BaseGetIamPolicy._get_http_options() 

2121 ) 

2122 

2123 request, metadata = self._interceptor.pre_get_iam_policy(request, metadata) 

2124 transcoded_request = ( 

2125 _BasePublisherRestTransport._BaseGetIamPolicy._get_transcoded_request( 

2126 http_options, request 

2127 ) 

2128 ) 

2129 

2130 # Jsonify the query params 

2131 query_params = ( 

2132 _BasePublisherRestTransport._BaseGetIamPolicy._get_query_params_json( 

2133 transcoded_request 

2134 ) 

2135 ) 

2136 

2137 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

2138 logging.DEBUG 

2139 ): # pragma: NO COVER 

2140 request_url = "{host}{uri}".format( 

2141 host=self._host, uri=transcoded_request["uri"] 

2142 ) 

2143 method = transcoded_request["method"] 

2144 try: 

2145 request_payload = json_format.MessageToJson(request) 

2146 except: 

2147 request_payload = None 

2148 http_request = { 

2149 "payload": request_payload, 

2150 "requestMethod": method, 

2151 "requestUrl": request_url, 

2152 "headers": dict(metadata), 

2153 } 

2154 _LOGGER.debug( 

2155 f"Sending request for google.pubsub_v1.PublisherClient.GetIamPolicy", 

2156 extra={ 

2157 "serviceName": "google.pubsub.v1.Publisher", 

2158 "rpcName": "GetIamPolicy", 

2159 "httpRequest": http_request, 

2160 "metadata": http_request["headers"], 

2161 }, 

2162 ) 

2163 

2164 # Send the request 

2165 response = PublisherRestTransport._GetIamPolicy._get_response( 

2166 self._host, 

2167 metadata, 

2168 query_params, 

2169 self._session, 

2170 timeout, 

2171 transcoded_request, 

2172 ) 

2173 

2174 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

2175 # subclass. 

2176 if response.status_code >= 400: 

2177 raise core_exceptions.from_http_response(response) 

2178 

2179 content = response.content.decode("utf-8") 

2180 resp = policy_pb2.Policy() 

2181 resp = json_format.Parse(content, resp) 

2182 resp = self._interceptor.post_get_iam_policy(resp) 

2183 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

2184 logging.DEBUG 

2185 ): # pragma: NO COVER 

2186 try: 

2187 response_payload = json_format.MessageToJson(resp) 

2188 except: 

2189 response_payload = None 

2190 http_response = { 

2191 "payload": response_payload, 

2192 "headers": dict(response.headers), 

2193 "status": response.status_code, 

2194 } 

2195 _LOGGER.debug( 

2196 "Received response for google.pubsub_v1.PublisherAsyncClient.GetIamPolicy", 

2197 extra={ 

2198 "serviceName": "google.pubsub.v1.Publisher", 

2199 "rpcName": "GetIamPolicy", 

2200 "httpResponse": http_response, 

2201 "metadata": http_response["headers"], 

2202 }, 

2203 ) 

2204 return resp 

2205 

2206 @property 

2207 def set_iam_policy(self): 

2208 return self._SetIamPolicy(self._session, self._host, self._interceptor) # type: ignore 

2209 

2210 class _SetIamPolicy( 

2211 _BasePublisherRestTransport._BaseSetIamPolicy, PublisherRestStub 

2212 ): 

2213 def __hash__(self): 

2214 return hash("PublisherRestTransport.SetIamPolicy") 

2215 

2216 @staticmethod 

2217 def _get_response( 

2218 host, 

2219 metadata, 

2220 query_params, 

2221 session, 

2222 timeout, 

2223 transcoded_request, 

2224 body=None, 

2225 ): 

2226 uri = transcoded_request["uri"] 

2227 method = transcoded_request["method"] 

2228 headers = dict(metadata) 

2229 headers["Content-Type"] = "application/json" 

2230 response = getattr(session, method)( 

2231 "{host}{uri}".format(host=host, uri=uri), 

2232 timeout=timeout, 

2233 headers=headers, 

2234 params=rest_helpers.flatten_query_params(query_params, strict=True), 

2235 data=body, 

2236 ) 

2237 return response 

2238 

2239 def __call__( 

2240 self, 

2241 request: iam_policy_pb2.SetIamPolicyRequest, 

2242 *, 

2243 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

2244 timeout: Optional[float] = None, 

2245 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

2246 ) -> policy_pb2.Policy: 

2247 r"""Call the set iam policy method over HTTP. 

2248 

2249 Args: 

2250 request (iam_policy_pb2.SetIamPolicyRequest): 

2251 The request object for SetIamPolicy method. 

2252 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

2253 should be retried. 

2254 timeout (float): The timeout for this request. 

2255 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

2256 sent along with the request as metadata. Normally, each value must be of type `str`, 

2257 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

2258 be of type `bytes`. 

2259 

2260 Returns: 

2261 policy_pb2.Policy: Response from SetIamPolicy method. 

2262 """ 

2263 

2264 http_options = ( 

2265 _BasePublisherRestTransport._BaseSetIamPolicy._get_http_options() 

2266 ) 

2267 

2268 request, metadata = self._interceptor.pre_set_iam_policy(request, metadata) 

2269 transcoded_request = ( 

2270 _BasePublisherRestTransport._BaseSetIamPolicy._get_transcoded_request( 

2271 http_options, request 

2272 ) 

2273 ) 

2274 

2275 body = _BasePublisherRestTransport._BaseSetIamPolicy._get_request_body_json( 

2276 transcoded_request 

2277 ) 

2278 

2279 # Jsonify the query params 

2280 query_params = ( 

2281 _BasePublisherRestTransport._BaseSetIamPolicy._get_query_params_json( 

2282 transcoded_request 

2283 ) 

2284 ) 

2285 

2286 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

2287 logging.DEBUG 

2288 ): # pragma: NO COVER 

2289 request_url = "{host}{uri}".format( 

2290 host=self._host, uri=transcoded_request["uri"] 

2291 ) 

2292 method = transcoded_request["method"] 

2293 try: 

2294 request_payload = json_format.MessageToJson(request) 

2295 except: 

2296 request_payload = None 

2297 http_request = { 

2298 "payload": request_payload, 

2299 "requestMethod": method, 

2300 "requestUrl": request_url, 

2301 "headers": dict(metadata), 

2302 } 

2303 _LOGGER.debug( 

2304 f"Sending request for google.pubsub_v1.PublisherClient.SetIamPolicy", 

2305 extra={ 

2306 "serviceName": "google.pubsub.v1.Publisher", 

2307 "rpcName": "SetIamPolicy", 

2308 "httpRequest": http_request, 

2309 "metadata": http_request["headers"], 

2310 }, 

2311 ) 

2312 

2313 # Send the request 

2314 response = PublisherRestTransport._SetIamPolicy._get_response( 

2315 self._host, 

2316 metadata, 

2317 query_params, 

2318 self._session, 

2319 timeout, 

2320 transcoded_request, 

2321 body, 

2322 ) 

2323 

2324 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

2325 # subclass. 

2326 if response.status_code >= 400: 

2327 raise core_exceptions.from_http_response(response) 

2328 

2329 content = response.content.decode("utf-8") 

2330 resp = policy_pb2.Policy() 

2331 resp = json_format.Parse(content, resp) 

2332 resp = self._interceptor.post_set_iam_policy(resp) 

2333 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

2334 logging.DEBUG 

2335 ): # pragma: NO COVER 

2336 try: 

2337 response_payload = json_format.MessageToJson(resp) 

2338 except: 

2339 response_payload = None 

2340 http_response = { 

2341 "payload": response_payload, 

2342 "headers": dict(response.headers), 

2343 "status": response.status_code, 

2344 } 

2345 _LOGGER.debug( 

2346 "Received response for google.pubsub_v1.PublisherAsyncClient.SetIamPolicy", 

2347 extra={ 

2348 "serviceName": "google.pubsub.v1.Publisher", 

2349 "rpcName": "SetIamPolicy", 

2350 "httpResponse": http_response, 

2351 "metadata": http_response["headers"], 

2352 }, 

2353 ) 

2354 return resp 

2355 

2356 @property 

2357 def test_iam_permissions(self): 

2358 return self._TestIamPermissions(self._session, self._host, self._interceptor) # type: ignore 

2359 

2360 class _TestIamPermissions( 

2361 _BasePublisherRestTransport._BaseTestIamPermissions, PublisherRestStub 

2362 ): 

2363 def __hash__(self): 

2364 return hash("PublisherRestTransport.TestIamPermissions") 

2365 

2366 @staticmethod 

2367 def _get_response( 

2368 host, 

2369 metadata, 

2370 query_params, 

2371 session, 

2372 timeout, 

2373 transcoded_request, 

2374 body=None, 

2375 ): 

2376 uri = transcoded_request["uri"] 

2377 method = transcoded_request["method"] 

2378 headers = dict(metadata) 

2379 headers["Content-Type"] = "application/json" 

2380 response = getattr(session, method)( 

2381 "{host}{uri}".format(host=host, uri=uri), 

2382 timeout=timeout, 

2383 headers=headers, 

2384 params=rest_helpers.flatten_query_params(query_params, strict=True), 

2385 data=body, 

2386 ) 

2387 return response 

2388 

2389 def __call__( 

2390 self, 

2391 request: iam_policy_pb2.TestIamPermissionsRequest, 

2392 *, 

2393 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

2394 timeout: Optional[float] = None, 

2395 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

2396 ) -> iam_policy_pb2.TestIamPermissionsResponse: 

2397 r"""Call the test iam permissions method over HTTP. 

2398 

2399 Args: 

2400 request (iam_policy_pb2.TestIamPermissionsRequest): 

2401 The request object for TestIamPermissions method. 

2402 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

2403 should be retried. 

2404 timeout (float): The timeout for this request. 

2405 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

2406 sent along with the request as metadata. Normally, each value must be of type `str`, 

2407 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

2408 be of type `bytes`. 

2409 

2410 Returns: 

2411 iam_policy_pb2.TestIamPermissionsResponse: Response from TestIamPermissions method. 

2412 """ 

2413 

2414 http_options = ( 

2415 _BasePublisherRestTransport._BaseTestIamPermissions._get_http_options() 

2416 ) 

2417 

2418 request, metadata = self._interceptor.pre_test_iam_permissions( 

2419 request, metadata 

2420 ) 

2421 transcoded_request = _BasePublisherRestTransport._BaseTestIamPermissions._get_transcoded_request( 

2422 http_options, request 

2423 ) 

2424 

2425 body = _BasePublisherRestTransport._BaseTestIamPermissions._get_request_body_json( 

2426 transcoded_request 

2427 ) 

2428 

2429 # Jsonify the query params 

2430 query_params = _BasePublisherRestTransport._BaseTestIamPermissions._get_query_params_json( 

2431 transcoded_request 

2432 ) 

2433 

2434 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

2435 logging.DEBUG 

2436 ): # pragma: NO COVER 

2437 request_url = "{host}{uri}".format( 

2438 host=self._host, uri=transcoded_request["uri"] 

2439 ) 

2440 method = transcoded_request["method"] 

2441 try: 

2442 request_payload = json_format.MessageToJson(request) 

2443 except: 

2444 request_payload = None 

2445 http_request = { 

2446 "payload": request_payload, 

2447 "requestMethod": method, 

2448 "requestUrl": request_url, 

2449 "headers": dict(metadata), 

2450 } 

2451 _LOGGER.debug( 

2452 f"Sending request for google.pubsub_v1.PublisherClient.TestIamPermissions", 

2453 extra={ 

2454 "serviceName": "google.pubsub.v1.Publisher", 

2455 "rpcName": "TestIamPermissions", 

2456 "httpRequest": http_request, 

2457 "metadata": http_request["headers"], 

2458 }, 

2459 ) 

2460 

2461 # Send the request 

2462 response = PublisherRestTransport._TestIamPermissions._get_response( 

2463 self._host, 

2464 metadata, 

2465 query_params, 

2466 self._session, 

2467 timeout, 

2468 transcoded_request, 

2469 body, 

2470 ) 

2471 

2472 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

2473 # subclass. 

2474 if response.status_code >= 400: 

2475 raise core_exceptions.from_http_response(response) 

2476 

2477 content = response.content.decode("utf-8") 

2478 resp = iam_policy_pb2.TestIamPermissionsResponse() 

2479 resp = json_format.Parse(content, resp) 

2480 resp = self._interceptor.post_test_iam_permissions(resp) 

2481 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

2482 logging.DEBUG 

2483 ): # pragma: NO COVER 

2484 try: 

2485 response_payload = json_format.MessageToJson(resp) 

2486 except: 

2487 response_payload = None 

2488 http_response = { 

2489 "payload": response_payload, 

2490 "headers": dict(response.headers), 

2491 "status": response.status_code, 

2492 } 

2493 _LOGGER.debug( 

2494 "Received response for google.pubsub_v1.PublisherAsyncClient.TestIamPermissions", 

2495 extra={ 

2496 "serviceName": "google.pubsub.v1.Publisher", 

2497 "rpcName": "TestIamPermissions", 

2498 "httpResponse": http_response, 

2499 "metadata": http_response["headers"], 

2500 }, 

2501 ) 

2502 return resp 

2503 

2504 @property 

2505 def kind(self) -> str: 

2506 return "rest" 

2507 

2508 def close(self): 

2509 self._session.close() 

2510 

2511 

2512__all__ = ("PublisherRestTransport",)