Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/pubsub_v1/services/publisher/transports/rest.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

450 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import logging 

17import json # type: ignore 

18 

19from google.auth.transport.requests import AuthorizedSession # type: ignore 

20from google.auth import credentials as ga_credentials # type: ignore 

21from google.api_core import exceptions as core_exceptions 

22from google.api_core import retry as retries 

23from google.api_core import rest_helpers 

24from google.api_core import rest_streaming 

25from google.api_core import gapic_v1 

26import google.protobuf 

27 

28from google.protobuf import json_format 

29from google.iam.v1 import iam_policy_pb2 # type: ignore 

30from google.iam.v1 import policy_pb2 # type: ignore 

31 

32from requests import __version__ as requests_version 

33import dataclasses 

34from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union 

35import warnings 

36 

37 

38from google.protobuf import empty_pb2 # type: ignore 

39from google.pubsub_v1.types import pubsub 

40 

41 

42from .rest_base import _BasePublisherRestTransport 

43from .base import DEFAULT_CLIENT_INFO as BASE_DEFAULT_CLIENT_INFO 

44 

45try: 

46 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None] 

47except AttributeError: # pragma: NO COVER 

48 OptionalRetry = Union[retries.Retry, object, None] # type: ignore 

49 

50try: 

51 from google.api_core import client_logging # type: ignore 

52 

53 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

54except ImportError: # pragma: NO COVER 

55 CLIENT_LOGGING_SUPPORTED = False 

56 

57_LOGGER = logging.getLogger(__name__) 

58 

59DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

60 gapic_version=BASE_DEFAULT_CLIENT_INFO.gapic_version, 

61 grpc_version=None, 

62 rest_version=f"requests@{requests_version}", 

63) 

64 

65if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER 

66 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__ 

67 

68 

69class PublisherRestInterceptor: 

70 """Interceptor for Publisher. 

71 

72 Interceptors are used to manipulate requests, request metadata, and responses 

73 in arbitrary ways. 

74 Example use cases include: 

75 * Logging 

76 * Verifying requests according to service or custom semantics 

77 * Stripping extraneous information from responses 

78 

79 These use cases and more can be enabled by injecting an 

80 instance of a custom subclass when constructing the PublisherRestTransport. 

81 

82 .. code-block:: python 

83 class MyCustomPublisherInterceptor(PublisherRestInterceptor): 

84 def pre_create_topic(self, request, metadata): 

85 logging.log(f"Received request: {request}") 

86 return request, metadata 

87 

88 def post_create_topic(self, response): 

89 logging.log(f"Received response: {response}") 

90 return response 

91 

92 def pre_delete_topic(self, request, metadata): 

93 logging.log(f"Received request: {request}") 

94 return request, metadata 

95 

96 def pre_detach_subscription(self, request, metadata): 

97 logging.log(f"Received request: {request}") 

98 return request, metadata 

99 

100 def post_detach_subscription(self, response): 

101 logging.log(f"Received response: {response}") 

102 return response 

103 

104 def pre_get_topic(self, request, metadata): 

105 logging.log(f"Received request: {request}") 

106 return request, metadata 

107 

108 def post_get_topic(self, response): 

109 logging.log(f"Received response: {response}") 

110 return response 

111 

112 def pre_list_topics(self, request, metadata): 

113 logging.log(f"Received request: {request}") 

114 return request, metadata 

115 

116 def post_list_topics(self, response): 

117 logging.log(f"Received response: {response}") 

118 return response 

119 

120 def pre_list_topic_snapshots(self, request, metadata): 

121 logging.log(f"Received request: {request}") 

122 return request, metadata 

123 

124 def post_list_topic_snapshots(self, response): 

125 logging.log(f"Received response: {response}") 

126 return response 

127 

128 def pre_list_topic_subscriptions(self, request, metadata): 

129 logging.log(f"Received request: {request}") 

130 return request, metadata 

131 

132 def post_list_topic_subscriptions(self, response): 

133 logging.log(f"Received response: {response}") 

134 return response 

135 

136 def pre_publish(self, request, metadata): 

137 logging.log(f"Received request: {request}") 

138 return request, metadata 

139 

140 def post_publish(self, response): 

141 logging.log(f"Received response: {response}") 

142 return response 

143 

144 def pre_update_topic(self, request, metadata): 

145 logging.log(f"Received request: {request}") 

146 return request, metadata 

147 

148 def post_update_topic(self, response): 

149 logging.log(f"Received response: {response}") 

150 return response 

151 

152 transport = PublisherRestTransport(interceptor=MyCustomPublisherInterceptor()) 

153 client = PublisherClient(transport=transport) 

154 

155 

156 """ 

157 

158 def pre_create_topic( 

159 self, request: pubsub.Topic, metadata: Sequence[Tuple[str, Union[str, bytes]]] 

160 ) -> Tuple[pubsub.Topic, Sequence[Tuple[str, Union[str, bytes]]]]: 

161 """Pre-rpc interceptor for create_topic 

162 

163 Override in a subclass to manipulate the request or metadata 

164 before they are sent to the Publisher server. 

165 """ 

166 return request, metadata 

167 

168 def post_create_topic(self, response: pubsub.Topic) -> pubsub.Topic: 

169 """Post-rpc interceptor for create_topic 

170 

171 DEPRECATED. Please use the `post_create_topic_with_metadata` 

172 interceptor instead. 

173 

174 Override in a subclass to read or manipulate the response 

175 after it is returned by the Publisher server but before 

176 it is returned to user code. This `post_create_topic` interceptor runs 

177 before the `post_create_topic_with_metadata` interceptor. 

178 """ 

179 return response 

180 

181 def post_create_topic_with_metadata( 

182 self, response: pubsub.Topic, metadata: Sequence[Tuple[str, Union[str, bytes]]] 

183 ) -> Tuple[pubsub.Topic, Sequence[Tuple[str, Union[str, bytes]]]]: 

184 """Post-rpc interceptor for create_topic 

185 

186 Override in a subclass to read or manipulate the response or metadata after it 

187 is returned by the Publisher server but before it is returned to user code. 

188 

189 We recommend only using this `post_create_topic_with_metadata` 

190 interceptor in new development instead of the `post_create_topic` interceptor. 

191 When both interceptors are used, this `post_create_topic_with_metadata` interceptor runs after the 

192 `post_create_topic` interceptor. The (possibly modified) response returned by 

193 `post_create_topic` will be passed to 

194 `post_create_topic_with_metadata`. 

195 """ 

196 return response, metadata 

197 

198 def pre_delete_topic( 

199 self, 

200 request: pubsub.DeleteTopicRequest, 

201 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

202 ) -> Tuple[pubsub.DeleteTopicRequest, Sequence[Tuple[str, Union[str, bytes]]]]: 

203 """Pre-rpc interceptor for delete_topic 

204 

205 Override in a subclass to manipulate the request or metadata 

206 before they are sent to the Publisher server. 

207 """ 

208 return request, metadata 

209 

210 def pre_detach_subscription( 

211 self, 

212 request: pubsub.DetachSubscriptionRequest, 

213 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

214 ) -> Tuple[ 

215 pubsub.DetachSubscriptionRequest, Sequence[Tuple[str, Union[str, bytes]]] 

216 ]: 

217 """Pre-rpc interceptor for detach_subscription 

218 

219 Override in a subclass to manipulate the request or metadata 

220 before they are sent to the Publisher server. 

221 """ 

222 return request, metadata 

223 

224 def post_detach_subscription( 

225 self, response: pubsub.DetachSubscriptionResponse 

226 ) -> pubsub.DetachSubscriptionResponse: 

227 """Post-rpc interceptor for detach_subscription 

228 

229 DEPRECATED. Please use the `post_detach_subscription_with_metadata` 

230 interceptor instead. 

231 

232 Override in a subclass to read or manipulate the response 

233 after it is returned by the Publisher server but before 

234 it is returned to user code. This `post_detach_subscription` interceptor runs 

235 before the `post_detach_subscription_with_metadata` interceptor. 

236 """ 

237 return response 

238 

239 def post_detach_subscription_with_metadata( 

240 self, 

241 response: pubsub.DetachSubscriptionResponse, 

242 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

243 ) -> Tuple[ 

244 pubsub.DetachSubscriptionResponse, Sequence[Tuple[str, Union[str, bytes]]] 

245 ]: 

246 """Post-rpc interceptor for detach_subscription 

247 

248 Override in a subclass to read or manipulate the response or metadata after it 

249 is returned by the Publisher server but before it is returned to user code. 

250 

251 We recommend only using this `post_detach_subscription_with_metadata` 

252 interceptor in new development instead of the `post_detach_subscription` interceptor. 

253 When both interceptors are used, this `post_detach_subscription_with_metadata` interceptor runs after the 

254 `post_detach_subscription` interceptor. The (possibly modified) response returned by 

255 `post_detach_subscription` will be passed to 

256 `post_detach_subscription_with_metadata`. 

257 """ 

258 return response, metadata 

259 

260 def pre_get_topic( 

261 self, 

262 request: pubsub.GetTopicRequest, 

263 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

264 ) -> Tuple[pubsub.GetTopicRequest, Sequence[Tuple[str, Union[str, bytes]]]]: 

265 """Pre-rpc interceptor for get_topic 

266 

267 Override in a subclass to manipulate the request or metadata 

268 before they are sent to the Publisher server. 

269 """ 

270 return request, metadata 

271 

272 def post_get_topic(self, response: pubsub.Topic) -> pubsub.Topic: 

273 """Post-rpc interceptor for get_topic 

274 

275 DEPRECATED. Please use the `post_get_topic_with_metadata` 

276 interceptor instead. 

277 

278 Override in a subclass to read or manipulate the response 

279 after it is returned by the Publisher server but before 

280 it is returned to user code. This `post_get_topic` interceptor runs 

281 before the `post_get_topic_with_metadata` interceptor. 

282 """ 

283 return response 

284 

285 def post_get_topic_with_metadata( 

286 self, response: pubsub.Topic, metadata: Sequence[Tuple[str, Union[str, bytes]]] 

287 ) -> Tuple[pubsub.Topic, Sequence[Tuple[str, Union[str, bytes]]]]: 

288 """Post-rpc interceptor for get_topic 

289 

290 Override in a subclass to read or manipulate the response or metadata after it 

291 is returned by the Publisher server but before it is returned to user code. 

292 

293 We recommend only using this `post_get_topic_with_metadata` 

294 interceptor in new development instead of the `post_get_topic` interceptor. 

295 When both interceptors are used, this `post_get_topic_with_metadata` interceptor runs after the 

296 `post_get_topic` interceptor. The (possibly modified) response returned by 

297 `post_get_topic` will be passed to 

298 `post_get_topic_with_metadata`. 

299 """ 

300 return response, metadata 

301 

302 def pre_list_topics( 

303 self, 

304 request: pubsub.ListTopicsRequest, 

305 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

306 ) -> Tuple[pubsub.ListTopicsRequest, Sequence[Tuple[str, Union[str, bytes]]]]: 

307 """Pre-rpc interceptor for list_topics 

308 

309 Override in a subclass to manipulate the request or metadata 

310 before they are sent to the Publisher server. 

311 """ 

312 return request, metadata 

313 

314 def post_list_topics( 

315 self, response: pubsub.ListTopicsResponse 

316 ) -> pubsub.ListTopicsResponse: 

317 """Post-rpc interceptor for list_topics 

318 

319 DEPRECATED. Please use the `post_list_topics_with_metadata` 

320 interceptor instead. 

321 

322 Override in a subclass to read or manipulate the response 

323 after it is returned by the Publisher server but before 

324 it is returned to user code. This `post_list_topics` interceptor runs 

325 before the `post_list_topics_with_metadata` interceptor. 

326 """ 

327 return response 

328 

329 def post_list_topics_with_metadata( 

330 self, 

331 response: pubsub.ListTopicsResponse, 

332 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

333 ) -> Tuple[pubsub.ListTopicsResponse, Sequence[Tuple[str, Union[str, bytes]]]]: 

334 """Post-rpc interceptor for list_topics 

335 

336 Override in a subclass to read or manipulate the response or metadata after it 

337 is returned by the Publisher server but before it is returned to user code. 

338 

339 We recommend only using this `post_list_topics_with_metadata` 

340 interceptor in new development instead of the `post_list_topics` interceptor. 

341 When both interceptors are used, this `post_list_topics_with_metadata` interceptor runs after the 

342 `post_list_topics` interceptor. The (possibly modified) response returned by 

343 `post_list_topics` will be passed to 

344 `post_list_topics_with_metadata`. 

345 """ 

346 return response, metadata 

347 

348 def pre_list_topic_snapshots( 

349 self, 

350 request: pubsub.ListTopicSnapshotsRequest, 

351 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

352 ) -> Tuple[ 

353 pubsub.ListTopicSnapshotsRequest, Sequence[Tuple[str, Union[str, bytes]]] 

354 ]: 

355 """Pre-rpc interceptor for list_topic_snapshots 

356 

357 Override in a subclass to manipulate the request or metadata 

358 before they are sent to the Publisher server. 

359 """ 

360 return request, metadata 

361 

362 def post_list_topic_snapshots( 

363 self, response: pubsub.ListTopicSnapshotsResponse 

364 ) -> pubsub.ListTopicSnapshotsResponse: 

365 """Post-rpc interceptor for list_topic_snapshots 

366 

367 DEPRECATED. Please use the `post_list_topic_snapshots_with_metadata` 

368 interceptor instead. 

369 

370 Override in a subclass to read or manipulate the response 

371 after it is returned by the Publisher server but before 

372 it is returned to user code. This `post_list_topic_snapshots` interceptor runs 

373 before the `post_list_topic_snapshots_with_metadata` interceptor. 

374 """ 

375 return response 

376 

377 def post_list_topic_snapshots_with_metadata( 

378 self, 

379 response: pubsub.ListTopicSnapshotsResponse, 

380 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

381 ) -> Tuple[ 

382 pubsub.ListTopicSnapshotsResponse, Sequence[Tuple[str, Union[str, bytes]]] 

383 ]: 

384 """Post-rpc interceptor for list_topic_snapshots 

385 

386 Override in a subclass to read or manipulate the response or metadata after it 

387 is returned by the Publisher server but before it is returned to user code. 

388 

389 We recommend only using this `post_list_topic_snapshots_with_metadata` 

390 interceptor in new development instead of the `post_list_topic_snapshots` interceptor. 

391 When both interceptors are used, this `post_list_topic_snapshots_with_metadata` interceptor runs after the 

392 `post_list_topic_snapshots` interceptor. The (possibly modified) response returned by 

393 `post_list_topic_snapshots` will be passed to 

394 `post_list_topic_snapshots_with_metadata`. 

395 """ 

396 return response, metadata 

397 

398 def pre_list_topic_subscriptions( 

399 self, 

400 request: pubsub.ListTopicSubscriptionsRequest, 

401 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

402 ) -> Tuple[ 

403 pubsub.ListTopicSubscriptionsRequest, Sequence[Tuple[str, Union[str, bytes]]] 

404 ]: 

405 """Pre-rpc interceptor for list_topic_subscriptions 

406 

407 Override in a subclass to manipulate the request or metadata 

408 before they are sent to the Publisher server. 

409 """ 

410 return request, metadata 

411 

412 def post_list_topic_subscriptions( 

413 self, response: pubsub.ListTopicSubscriptionsResponse 

414 ) -> pubsub.ListTopicSubscriptionsResponse: 

415 """Post-rpc interceptor for list_topic_subscriptions 

416 

417 DEPRECATED. Please use the `post_list_topic_subscriptions_with_metadata` 

418 interceptor instead. 

419 

420 Override in a subclass to read or manipulate the response 

421 after it is returned by the Publisher server but before 

422 it is returned to user code. This `post_list_topic_subscriptions` interceptor runs 

423 before the `post_list_topic_subscriptions_with_metadata` interceptor. 

424 """ 

425 return response 

426 

427 def post_list_topic_subscriptions_with_metadata( 

428 self, 

429 response: pubsub.ListTopicSubscriptionsResponse, 

430 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

431 ) -> Tuple[ 

432 pubsub.ListTopicSubscriptionsResponse, Sequence[Tuple[str, Union[str, bytes]]] 

433 ]: 

434 """Post-rpc interceptor for list_topic_subscriptions 

435 

436 Override in a subclass to read or manipulate the response or metadata after it 

437 is returned by the Publisher server but before it is returned to user code. 

438 

439 We recommend only using this `post_list_topic_subscriptions_with_metadata` 

440 interceptor in new development instead of the `post_list_topic_subscriptions` interceptor. 

441 When both interceptors are used, this `post_list_topic_subscriptions_with_metadata` interceptor runs after the 

442 `post_list_topic_subscriptions` interceptor. The (possibly modified) response returned by 

443 `post_list_topic_subscriptions` will be passed to 

444 `post_list_topic_subscriptions_with_metadata`. 

445 """ 

446 return response, metadata 

447 

448 def pre_publish( 

449 self, 

450 request: pubsub.PublishRequest, 

451 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

452 ) -> Tuple[pubsub.PublishRequest, Sequence[Tuple[str, Union[str, bytes]]]]: 

453 """Pre-rpc interceptor for publish 

454 

455 Override in a subclass to manipulate the request or metadata 

456 before they are sent to the Publisher server. 

457 """ 

458 return request, metadata 

459 

460 def post_publish(self, response: pubsub.PublishResponse) -> pubsub.PublishResponse: 

461 """Post-rpc interceptor for publish 

462 

463 DEPRECATED. Please use the `post_publish_with_metadata` 

464 interceptor instead. 

465 

466 Override in a subclass to read or manipulate the response 

467 after it is returned by the Publisher server but before 

468 it is returned to user code. This `post_publish` interceptor runs 

469 before the `post_publish_with_metadata` interceptor. 

470 """ 

471 return response 

472 

473 def post_publish_with_metadata( 

474 self, 

475 response: pubsub.PublishResponse, 

476 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

477 ) -> Tuple[pubsub.PublishResponse, Sequence[Tuple[str, Union[str, bytes]]]]: 

478 """Post-rpc interceptor for publish 

479 

480 Override in a subclass to read or manipulate the response or metadata after it 

481 is returned by the Publisher server but before it is returned to user code. 

482 

483 We recommend only using this `post_publish_with_metadata` 

484 interceptor in new development instead of the `post_publish` interceptor. 

485 When both interceptors are used, this `post_publish_with_metadata` interceptor runs after the 

486 `post_publish` interceptor. The (possibly modified) response returned by 

487 `post_publish` will be passed to 

488 `post_publish_with_metadata`. 

489 """ 

490 return response, metadata 

491 

492 def pre_update_topic( 

493 self, 

494 request: pubsub.UpdateTopicRequest, 

495 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

496 ) -> Tuple[pubsub.UpdateTopicRequest, Sequence[Tuple[str, Union[str, bytes]]]]: 

497 """Pre-rpc interceptor for update_topic 

498 

499 Override in a subclass to manipulate the request or metadata 

500 before they are sent to the Publisher server. 

501 """ 

502 return request, metadata 

503 

504 def post_update_topic(self, response: pubsub.Topic) -> pubsub.Topic: 

505 """Post-rpc interceptor for update_topic 

506 

507 DEPRECATED. Please use the `post_update_topic_with_metadata` 

508 interceptor instead. 

509 

510 Override in a subclass to read or manipulate the response 

511 after it is returned by the Publisher server but before 

512 it is returned to user code. This `post_update_topic` interceptor runs 

513 before the `post_update_topic_with_metadata` interceptor. 

514 """ 

515 return response 

516 

517 def post_update_topic_with_metadata( 

518 self, response: pubsub.Topic, metadata: Sequence[Tuple[str, Union[str, bytes]]] 

519 ) -> Tuple[pubsub.Topic, Sequence[Tuple[str, Union[str, bytes]]]]: 

520 """Post-rpc interceptor for update_topic 

521 

522 Override in a subclass to read or manipulate the response or metadata after it 

523 is returned by the Publisher server but before it is returned to user code. 

524 

525 We recommend only using this `post_update_topic_with_metadata` 

526 interceptor in new development instead of the `post_update_topic` interceptor. 

527 When both interceptors are used, this `post_update_topic_with_metadata` interceptor runs after the 

528 `post_update_topic` interceptor. The (possibly modified) response returned by 

529 `post_update_topic` will be passed to 

530 `post_update_topic_with_metadata`. 

531 """ 

532 return response, metadata 

533 

534 def pre_get_iam_policy( 

535 self, 

536 request: iam_policy_pb2.GetIamPolicyRequest, 

537 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

538 ) -> Tuple[ 

539 iam_policy_pb2.GetIamPolicyRequest, Sequence[Tuple[str, Union[str, bytes]]] 

540 ]: 

541 """Pre-rpc interceptor for get_iam_policy 

542 

543 Override in a subclass to manipulate the request or metadata 

544 before they are sent to the Publisher server. 

545 """ 

546 return request, metadata 

547 

548 def post_get_iam_policy(self, response: policy_pb2.Policy) -> policy_pb2.Policy: 

549 """Post-rpc interceptor for get_iam_policy 

550 

551 Override in a subclass to manipulate the response 

552 after it is returned by the Publisher server but before 

553 it is returned to user code. 

554 """ 

555 return response 

556 

557 def pre_set_iam_policy( 

558 self, 

559 request: iam_policy_pb2.SetIamPolicyRequest, 

560 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

561 ) -> Tuple[ 

562 iam_policy_pb2.SetIamPolicyRequest, Sequence[Tuple[str, Union[str, bytes]]] 

563 ]: 

564 """Pre-rpc interceptor for set_iam_policy 

565 

566 Override in a subclass to manipulate the request or metadata 

567 before they are sent to the Publisher server. 

568 """ 

569 return request, metadata 

570 

571 def post_set_iam_policy(self, response: policy_pb2.Policy) -> policy_pb2.Policy: 

572 """Post-rpc interceptor for set_iam_policy 

573 

574 Override in a subclass to manipulate the response 

575 after it is returned by the Publisher server but before 

576 it is returned to user code. 

577 """ 

578 return response 

579 

580 def pre_test_iam_permissions( 

581 self, 

582 request: iam_policy_pb2.TestIamPermissionsRequest, 

583 metadata: Sequence[Tuple[str, Union[str, bytes]]], 

584 ) -> Tuple[ 

585 iam_policy_pb2.TestIamPermissionsRequest, 

586 Sequence[Tuple[str, Union[str, bytes]]], 

587 ]: 

588 """Pre-rpc interceptor for test_iam_permissions 

589 

590 Override in a subclass to manipulate the request or metadata 

591 before they are sent to the Publisher server. 

592 """ 

593 return request, metadata 

594 

595 def post_test_iam_permissions( 

596 self, response: iam_policy_pb2.TestIamPermissionsResponse 

597 ) -> iam_policy_pb2.TestIamPermissionsResponse: 

598 """Post-rpc interceptor for test_iam_permissions 

599 

600 Override in a subclass to manipulate the response 

601 after it is returned by the Publisher server but before 

602 it is returned to user code. 

603 """ 

604 return response 

605 

606 

607@dataclasses.dataclass 

608class PublisherRestStub: 

609 _session: AuthorizedSession 

610 _host: str 

611 _interceptor: PublisherRestInterceptor 

612 

613 

614class PublisherRestTransport(_BasePublisherRestTransport): 

615 """REST backend synchronous transport for Publisher. 

616 

617 The service that an application uses to manipulate topics, 

618 and to send messages to a topic. 

619 

620 This class defines the same methods as the primary client, so the 

621 primary client can load the underlying transport implementation 

622 and call it. 

623 

624 It sends JSON representations of protocol buffers over HTTP/1.1 

625 """ 

626 

627 def __init__( 

628 self, 

629 *, 

630 host: str = "pubsub.googleapis.com", 

631 credentials: Optional[ga_credentials.Credentials] = None, 

632 credentials_file: Optional[str] = None, 

633 scopes: Optional[Sequence[str]] = None, 

634 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

635 quota_project_id: Optional[str] = None, 

636 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

637 always_use_jwt_access: Optional[bool] = False, 

638 url_scheme: str = "https", 

639 interceptor: Optional[PublisherRestInterceptor] = None, 

640 api_audience: Optional[str] = None, 

641 ) -> None: 

642 """Instantiate the transport. 

643 

644 Args: 

645 host (Optional[str]): 

646 The hostname to connect to (default: 'pubsub.googleapis.com'). 

647 credentials (Optional[google.auth.credentials.Credentials]): The 

648 authorization credentials to attach to requests. These 

649 credentials identify the application to the service; if none 

650 are specified, the client will attempt to ascertain the 

651 credentials from the environment. 

652 

653 credentials_file (Optional[str]): A file with credentials that can 

654 be loaded with :func:`google.auth.load_credentials_from_file`. 

655 This argument is ignored if ``channel`` is provided. 

656 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

657 ignored if ``channel`` is provided. 

658 client_cert_source_for_mtls (Callable[[], Tuple[bytes, bytes]]): Client 

659 certificate to configure mutual TLS HTTP channel. It is ignored 

660 if ``channel`` is provided. 

661 quota_project_id (Optional[str]): An optional project to use for billing 

662 and quota. 

663 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

664 The client info used to send a user-agent string along with 

665 API requests. If ``None``, then default info will be used. 

666 Generally, you only need to set this if you are developing 

667 your own client library. 

668 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

669 be used for service account credentials. 

670 url_scheme: the protocol scheme for the API endpoint. Normally 

671 "https", but for testing or local servers, 

672 "http" can be specified. 

673 """ 

674 # Run the base constructor 

675 # TODO(yon-mg): resolve other ctor params i.e. scopes, quota, etc. 

676 # TODO: When custom host (api_endpoint) is set, `scopes` must *also* be set on the 

677 # credentials object 

678 super().__init__( 

679 host=host, 

680 credentials=credentials, 

681 client_info=client_info, 

682 always_use_jwt_access=always_use_jwt_access, 

683 url_scheme=url_scheme, 

684 api_audience=api_audience, 

685 ) 

686 self._session = AuthorizedSession( 

687 self._credentials, default_host=self.DEFAULT_HOST 

688 ) 

689 if client_cert_source_for_mtls: 

690 self._session.configure_mtls_channel(client_cert_source_for_mtls) 

691 self._interceptor = interceptor or PublisherRestInterceptor() 

692 self._prep_wrapped_messages(client_info) 

693 

694 class _CreateTopic(_BasePublisherRestTransport._BaseCreateTopic, PublisherRestStub): 

695 def __hash__(self): 

696 return hash("PublisherRestTransport.CreateTopic") 

697 

698 @staticmethod 

699 def _get_response( 

700 host, 

701 metadata, 

702 query_params, 

703 session, 

704 timeout, 

705 transcoded_request, 

706 body=None, 

707 ): 

708 uri = transcoded_request["uri"] 

709 method = transcoded_request["method"] 

710 headers = dict(metadata) 

711 headers["Content-Type"] = "application/json" 

712 response = getattr(session, method)( 

713 "{host}{uri}".format(host=host, uri=uri), 

714 timeout=timeout, 

715 headers=headers, 

716 params=rest_helpers.flatten_query_params(query_params, strict=True), 

717 data=body, 

718 ) 

719 return response 

720 

721 def __call__( 

722 self, 

723 request: pubsub.Topic, 

724 *, 

725 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

726 timeout: Optional[float] = None, 

727 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

728 ) -> pubsub.Topic: 

729 r"""Call the create topic method over HTTP. 

730 

731 Args: 

732 request (~.pubsub.Topic): 

733 The request object. A topic resource. 

734 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

735 should be retried. 

736 timeout (float): The timeout for this request. 

737 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

738 sent along with the request as metadata. Normally, each value must be of type `str`, 

739 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

740 be of type `bytes`. 

741 

742 Returns: 

743 ~.pubsub.Topic: 

744 A topic resource. 

745 """ 

746 

747 http_options = ( 

748 _BasePublisherRestTransport._BaseCreateTopic._get_http_options() 

749 ) 

750 

751 request, metadata = self._interceptor.pre_create_topic(request, metadata) 

752 transcoded_request = ( 

753 _BasePublisherRestTransport._BaseCreateTopic._get_transcoded_request( 

754 http_options, request 

755 ) 

756 ) 

757 

758 body = _BasePublisherRestTransport._BaseCreateTopic._get_request_body_json( 

759 transcoded_request 

760 ) 

761 

762 # Jsonify the query params 

763 query_params = ( 

764 _BasePublisherRestTransport._BaseCreateTopic._get_query_params_json( 

765 transcoded_request 

766 ) 

767 ) 

768 

769 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

770 logging.DEBUG 

771 ): # pragma: NO COVER 

772 request_url = "{host}{uri}".format( 

773 host=self._host, uri=transcoded_request["uri"] 

774 ) 

775 method = transcoded_request["method"] 

776 try: 

777 request_payload = type(request).to_json(request) 

778 except: 

779 request_payload = None 

780 http_request = { 

781 "payload": request_payload, 

782 "requestMethod": method, 

783 "requestUrl": request_url, 

784 "headers": dict(metadata), 

785 } 

786 _LOGGER.debug( 

787 f"Sending request for google.pubsub_v1.PublisherClient.CreateTopic", 

788 extra={ 

789 "serviceName": "google.pubsub.v1.Publisher", 

790 "rpcName": "CreateTopic", 

791 "httpRequest": http_request, 

792 "metadata": http_request["headers"], 

793 }, 

794 ) 

795 

796 # Send the request 

797 response = PublisherRestTransport._CreateTopic._get_response( 

798 self._host, 

799 metadata, 

800 query_params, 

801 self._session, 

802 timeout, 

803 transcoded_request, 

804 body, 

805 ) 

806 

807 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

808 # subclass. 

809 if response.status_code >= 400: 

810 raise core_exceptions.from_http_response(response) 

811 

812 # Return the response 

813 resp = pubsub.Topic() 

814 pb_resp = pubsub.Topic.pb(resp) 

815 

816 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

817 

818 resp = self._interceptor.post_create_topic(resp) 

819 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

820 resp, _ = self._interceptor.post_create_topic_with_metadata( 

821 resp, response_metadata 

822 ) 

823 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

824 logging.DEBUG 

825 ): # pragma: NO COVER 

826 try: 

827 response_payload = pubsub.Topic.to_json(response) 

828 except: 

829 response_payload = None 

830 http_response = { 

831 "payload": response_payload, 

832 "headers": dict(response.headers), 

833 "status": response.status_code, 

834 } 

835 _LOGGER.debug( 

836 "Received response for google.pubsub_v1.PublisherClient.create_topic", 

837 extra={ 

838 "serviceName": "google.pubsub.v1.Publisher", 

839 "rpcName": "CreateTopic", 

840 "metadata": http_response["headers"], 

841 "httpResponse": http_response, 

842 }, 

843 ) 

844 return resp 

845 

846 class _DeleteTopic(_BasePublisherRestTransport._BaseDeleteTopic, PublisherRestStub): 

847 def __hash__(self): 

848 return hash("PublisherRestTransport.DeleteTopic") 

849 

850 @staticmethod 

851 def _get_response( 

852 host, 

853 metadata, 

854 query_params, 

855 session, 

856 timeout, 

857 transcoded_request, 

858 body=None, 

859 ): 

860 uri = transcoded_request["uri"] 

861 method = transcoded_request["method"] 

862 headers = dict(metadata) 

863 headers["Content-Type"] = "application/json" 

864 response = getattr(session, method)( 

865 "{host}{uri}".format(host=host, uri=uri), 

866 timeout=timeout, 

867 headers=headers, 

868 params=rest_helpers.flatten_query_params(query_params, strict=True), 

869 ) 

870 return response 

871 

872 def __call__( 

873 self, 

874 request: pubsub.DeleteTopicRequest, 

875 *, 

876 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

877 timeout: Optional[float] = None, 

878 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

879 ): 

880 r"""Call the delete topic method over HTTP. 

881 

882 Args: 

883 request (~.pubsub.DeleteTopicRequest): 

884 The request object. Request for the ``DeleteTopic`` method. 

885 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

886 should be retried. 

887 timeout (float): The timeout for this request. 

888 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

889 sent along with the request as metadata. Normally, each value must be of type `str`, 

890 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

891 be of type `bytes`. 

892 """ 

893 

894 http_options = ( 

895 _BasePublisherRestTransport._BaseDeleteTopic._get_http_options() 

896 ) 

897 

898 request, metadata = self._interceptor.pre_delete_topic(request, metadata) 

899 transcoded_request = ( 

900 _BasePublisherRestTransport._BaseDeleteTopic._get_transcoded_request( 

901 http_options, request 

902 ) 

903 ) 

904 

905 # Jsonify the query params 

906 query_params = ( 

907 _BasePublisherRestTransport._BaseDeleteTopic._get_query_params_json( 

908 transcoded_request 

909 ) 

910 ) 

911 

912 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

913 logging.DEBUG 

914 ): # pragma: NO COVER 

915 request_url = "{host}{uri}".format( 

916 host=self._host, uri=transcoded_request["uri"] 

917 ) 

918 method = transcoded_request["method"] 

919 try: 

920 request_payload = json_format.MessageToJson(request) 

921 except: 

922 request_payload = None 

923 http_request = { 

924 "payload": request_payload, 

925 "requestMethod": method, 

926 "requestUrl": request_url, 

927 "headers": dict(metadata), 

928 } 

929 _LOGGER.debug( 

930 f"Sending request for google.pubsub_v1.PublisherClient.DeleteTopic", 

931 extra={ 

932 "serviceName": "google.pubsub.v1.Publisher", 

933 "rpcName": "DeleteTopic", 

934 "httpRequest": http_request, 

935 "metadata": http_request["headers"], 

936 }, 

937 ) 

938 

939 # Send the request 

940 response = PublisherRestTransport._DeleteTopic._get_response( 

941 self._host, 

942 metadata, 

943 query_params, 

944 self._session, 

945 timeout, 

946 transcoded_request, 

947 ) 

948 

949 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

950 # subclass. 

951 if response.status_code >= 400: 

952 raise core_exceptions.from_http_response(response) 

953 

954 class _DetachSubscription( 

955 _BasePublisherRestTransport._BaseDetachSubscription, PublisherRestStub 

956 ): 

957 def __hash__(self): 

958 return hash("PublisherRestTransport.DetachSubscription") 

959 

960 @staticmethod 

961 def _get_response( 

962 host, 

963 metadata, 

964 query_params, 

965 session, 

966 timeout, 

967 transcoded_request, 

968 body=None, 

969 ): 

970 uri = transcoded_request["uri"] 

971 method = transcoded_request["method"] 

972 headers = dict(metadata) 

973 headers["Content-Type"] = "application/json" 

974 response = getattr(session, method)( 

975 "{host}{uri}".format(host=host, uri=uri), 

976 timeout=timeout, 

977 headers=headers, 

978 params=rest_helpers.flatten_query_params(query_params, strict=True), 

979 ) 

980 return response 

981 

982 def __call__( 

983 self, 

984 request: pubsub.DetachSubscriptionRequest, 

985 *, 

986 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

987 timeout: Optional[float] = None, 

988 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

989 ) -> pubsub.DetachSubscriptionResponse: 

990 r"""Call the detach subscription method over HTTP. 

991 

992 Args: 

993 request (~.pubsub.DetachSubscriptionRequest): 

994 The request object. Request for the DetachSubscription 

995 method. 

996 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

997 should be retried. 

998 timeout (float): The timeout for this request. 

999 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

1000 sent along with the request as metadata. Normally, each value must be of type `str`, 

1001 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

1002 be of type `bytes`. 

1003 

1004 Returns: 

1005 ~.pubsub.DetachSubscriptionResponse: 

1006 Response for the DetachSubscription 

1007 method. Reserved for future use. 

1008 

1009 """ 

1010 

1011 http_options = ( 

1012 _BasePublisherRestTransport._BaseDetachSubscription._get_http_options() 

1013 ) 

1014 

1015 request, metadata = self._interceptor.pre_detach_subscription( 

1016 request, metadata 

1017 ) 

1018 transcoded_request = _BasePublisherRestTransport._BaseDetachSubscription._get_transcoded_request( 

1019 http_options, request 

1020 ) 

1021 

1022 # Jsonify the query params 

1023 query_params = _BasePublisherRestTransport._BaseDetachSubscription._get_query_params_json( 

1024 transcoded_request 

1025 ) 

1026 

1027 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1028 logging.DEBUG 

1029 ): # pragma: NO COVER 

1030 request_url = "{host}{uri}".format( 

1031 host=self._host, uri=transcoded_request["uri"] 

1032 ) 

1033 method = transcoded_request["method"] 

1034 try: 

1035 request_payload = type(request).to_json(request) 

1036 except: 

1037 request_payload = None 

1038 http_request = { 

1039 "payload": request_payload, 

1040 "requestMethod": method, 

1041 "requestUrl": request_url, 

1042 "headers": dict(metadata), 

1043 } 

1044 _LOGGER.debug( 

1045 f"Sending request for google.pubsub_v1.PublisherClient.DetachSubscription", 

1046 extra={ 

1047 "serviceName": "google.pubsub.v1.Publisher", 

1048 "rpcName": "DetachSubscription", 

1049 "httpRequest": http_request, 

1050 "metadata": http_request["headers"], 

1051 }, 

1052 ) 

1053 

1054 # Send the request 

1055 response = PublisherRestTransport._DetachSubscription._get_response( 

1056 self._host, 

1057 metadata, 

1058 query_params, 

1059 self._session, 

1060 timeout, 

1061 transcoded_request, 

1062 ) 

1063 

1064 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

1065 # subclass. 

1066 if response.status_code >= 400: 

1067 raise core_exceptions.from_http_response(response) 

1068 

1069 # Return the response 

1070 resp = pubsub.DetachSubscriptionResponse() 

1071 pb_resp = pubsub.DetachSubscriptionResponse.pb(resp) 

1072 

1073 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

1074 

1075 resp = self._interceptor.post_detach_subscription(resp) 

1076 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

1077 resp, _ = self._interceptor.post_detach_subscription_with_metadata( 

1078 resp, response_metadata 

1079 ) 

1080 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1081 logging.DEBUG 

1082 ): # pragma: NO COVER 

1083 try: 

1084 response_payload = pubsub.DetachSubscriptionResponse.to_json( 

1085 response 

1086 ) 

1087 except: 

1088 response_payload = None 

1089 http_response = { 

1090 "payload": response_payload, 

1091 "headers": dict(response.headers), 

1092 "status": response.status_code, 

1093 } 

1094 _LOGGER.debug( 

1095 "Received response for google.pubsub_v1.PublisherClient.detach_subscription", 

1096 extra={ 

1097 "serviceName": "google.pubsub.v1.Publisher", 

1098 "rpcName": "DetachSubscription", 

1099 "metadata": http_response["headers"], 

1100 "httpResponse": http_response, 

1101 }, 

1102 ) 

1103 return resp 

1104 

1105 class _GetTopic(_BasePublisherRestTransport._BaseGetTopic, PublisherRestStub): 

1106 def __hash__(self): 

1107 return hash("PublisherRestTransport.GetTopic") 

1108 

1109 @staticmethod 

1110 def _get_response( 

1111 host, 

1112 metadata, 

1113 query_params, 

1114 session, 

1115 timeout, 

1116 transcoded_request, 

1117 body=None, 

1118 ): 

1119 uri = transcoded_request["uri"] 

1120 method = transcoded_request["method"] 

1121 headers = dict(metadata) 

1122 headers["Content-Type"] = "application/json" 

1123 response = getattr(session, method)( 

1124 "{host}{uri}".format(host=host, uri=uri), 

1125 timeout=timeout, 

1126 headers=headers, 

1127 params=rest_helpers.flatten_query_params(query_params, strict=True), 

1128 ) 

1129 return response 

1130 

1131 def __call__( 

1132 self, 

1133 request: pubsub.GetTopicRequest, 

1134 *, 

1135 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

1136 timeout: Optional[float] = None, 

1137 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

1138 ) -> pubsub.Topic: 

1139 r"""Call the get topic method over HTTP. 

1140 

1141 Args: 

1142 request (~.pubsub.GetTopicRequest): 

1143 The request object. Request for the GetTopic method. 

1144 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

1145 should be retried. 

1146 timeout (float): The timeout for this request. 

1147 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

1148 sent along with the request as metadata. Normally, each value must be of type `str`, 

1149 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

1150 be of type `bytes`. 

1151 

1152 Returns: 

1153 ~.pubsub.Topic: 

1154 A topic resource. 

1155 """ 

1156 

1157 http_options = _BasePublisherRestTransport._BaseGetTopic._get_http_options() 

1158 

1159 request, metadata = self._interceptor.pre_get_topic(request, metadata) 

1160 transcoded_request = ( 

1161 _BasePublisherRestTransport._BaseGetTopic._get_transcoded_request( 

1162 http_options, request 

1163 ) 

1164 ) 

1165 

1166 # Jsonify the query params 

1167 query_params = ( 

1168 _BasePublisherRestTransport._BaseGetTopic._get_query_params_json( 

1169 transcoded_request 

1170 ) 

1171 ) 

1172 

1173 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1174 logging.DEBUG 

1175 ): # pragma: NO COVER 

1176 request_url = "{host}{uri}".format( 

1177 host=self._host, uri=transcoded_request["uri"] 

1178 ) 

1179 method = transcoded_request["method"] 

1180 try: 

1181 request_payload = type(request).to_json(request) 

1182 except: 

1183 request_payload = None 

1184 http_request = { 

1185 "payload": request_payload, 

1186 "requestMethod": method, 

1187 "requestUrl": request_url, 

1188 "headers": dict(metadata), 

1189 } 

1190 _LOGGER.debug( 

1191 f"Sending request for google.pubsub_v1.PublisherClient.GetTopic", 

1192 extra={ 

1193 "serviceName": "google.pubsub.v1.Publisher", 

1194 "rpcName": "GetTopic", 

1195 "httpRequest": http_request, 

1196 "metadata": http_request["headers"], 

1197 }, 

1198 ) 

1199 

1200 # Send the request 

1201 response = PublisherRestTransport._GetTopic._get_response( 

1202 self._host, 

1203 metadata, 

1204 query_params, 

1205 self._session, 

1206 timeout, 

1207 transcoded_request, 

1208 ) 

1209 

1210 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

1211 # subclass. 

1212 if response.status_code >= 400: 

1213 raise core_exceptions.from_http_response(response) 

1214 

1215 # Return the response 

1216 resp = pubsub.Topic() 

1217 pb_resp = pubsub.Topic.pb(resp) 

1218 

1219 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

1220 

1221 resp = self._interceptor.post_get_topic(resp) 

1222 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

1223 resp, _ = self._interceptor.post_get_topic_with_metadata( 

1224 resp, response_metadata 

1225 ) 

1226 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1227 logging.DEBUG 

1228 ): # pragma: NO COVER 

1229 try: 

1230 response_payload = pubsub.Topic.to_json(response) 

1231 except: 

1232 response_payload = None 

1233 http_response = { 

1234 "payload": response_payload, 

1235 "headers": dict(response.headers), 

1236 "status": response.status_code, 

1237 } 

1238 _LOGGER.debug( 

1239 "Received response for google.pubsub_v1.PublisherClient.get_topic", 

1240 extra={ 

1241 "serviceName": "google.pubsub.v1.Publisher", 

1242 "rpcName": "GetTopic", 

1243 "metadata": http_response["headers"], 

1244 "httpResponse": http_response, 

1245 }, 

1246 ) 

1247 return resp 

1248 

1249 class _ListTopics(_BasePublisherRestTransport._BaseListTopics, PublisherRestStub): 

1250 def __hash__(self): 

1251 return hash("PublisherRestTransport.ListTopics") 

1252 

1253 @staticmethod 

1254 def _get_response( 

1255 host, 

1256 metadata, 

1257 query_params, 

1258 session, 

1259 timeout, 

1260 transcoded_request, 

1261 body=None, 

1262 ): 

1263 uri = transcoded_request["uri"] 

1264 method = transcoded_request["method"] 

1265 headers = dict(metadata) 

1266 headers["Content-Type"] = "application/json" 

1267 response = getattr(session, method)( 

1268 "{host}{uri}".format(host=host, uri=uri), 

1269 timeout=timeout, 

1270 headers=headers, 

1271 params=rest_helpers.flatten_query_params(query_params, strict=True), 

1272 ) 

1273 return response 

1274 

1275 def __call__( 

1276 self, 

1277 request: pubsub.ListTopicsRequest, 

1278 *, 

1279 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

1280 timeout: Optional[float] = None, 

1281 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

1282 ) -> pubsub.ListTopicsResponse: 

1283 r"""Call the list topics method over HTTP. 

1284 

1285 Args: 

1286 request (~.pubsub.ListTopicsRequest): 

1287 The request object. Request for the ``ListTopics`` method. 

1288 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

1289 should be retried. 

1290 timeout (float): The timeout for this request. 

1291 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

1292 sent along with the request as metadata. Normally, each value must be of type `str`, 

1293 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

1294 be of type `bytes`. 

1295 

1296 Returns: 

1297 ~.pubsub.ListTopicsResponse: 

1298 Response for the ``ListTopics`` method. 

1299 """ 

1300 

1301 http_options = ( 

1302 _BasePublisherRestTransport._BaseListTopics._get_http_options() 

1303 ) 

1304 

1305 request, metadata = self._interceptor.pre_list_topics(request, metadata) 

1306 transcoded_request = ( 

1307 _BasePublisherRestTransport._BaseListTopics._get_transcoded_request( 

1308 http_options, request 

1309 ) 

1310 ) 

1311 

1312 # Jsonify the query params 

1313 query_params = ( 

1314 _BasePublisherRestTransport._BaseListTopics._get_query_params_json( 

1315 transcoded_request 

1316 ) 

1317 ) 

1318 

1319 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1320 logging.DEBUG 

1321 ): # pragma: NO COVER 

1322 request_url = "{host}{uri}".format( 

1323 host=self._host, uri=transcoded_request["uri"] 

1324 ) 

1325 method = transcoded_request["method"] 

1326 try: 

1327 request_payload = type(request).to_json(request) 

1328 except: 

1329 request_payload = None 

1330 http_request = { 

1331 "payload": request_payload, 

1332 "requestMethod": method, 

1333 "requestUrl": request_url, 

1334 "headers": dict(metadata), 

1335 } 

1336 _LOGGER.debug( 

1337 f"Sending request for google.pubsub_v1.PublisherClient.ListTopics", 

1338 extra={ 

1339 "serviceName": "google.pubsub.v1.Publisher", 

1340 "rpcName": "ListTopics", 

1341 "httpRequest": http_request, 

1342 "metadata": http_request["headers"], 

1343 }, 

1344 ) 

1345 

1346 # Send the request 

1347 response = PublisherRestTransport._ListTopics._get_response( 

1348 self._host, 

1349 metadata, 

1350 query_params, 

1351 self._session, 

1352 timeout, 

1353 transcoded_request, 

1354 ) 

1355 

1356 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

1357 # subclass. 

1358 if response.status_code >= 400: 

1359 raise core_exceptions.from_http_response(response) 

1360 

1361 # Return the response 

1362 resp = pubsub.ListTopicsResponse() 

1363 pb_resp = pubsub.ListTopicsResponse.pb(resp) 

1364 

1365 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

1366 

1367 resp = self._interceptor.post_list_topics(resp) 

1368 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

1369 resp, _ = self._interceptor.post_list_topics_with_metadata( 

1370 resp, response_metadata 

1371 ) 

1372 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1373 logging.DEBUG 

1374 ): # pragma: NO COVER 

1375 try: 

1376 response_payload = pubsub.ListTopicsResponse.to_json(response) 

1377 except: 

1378 response_payload = None 

1379 http_response = { 

1380 "payload": response_payload, 

1381 "headers": dict(response.headers), 

1382 "status": response.status_code, 

1383 } 

1384 _LOGGER.debug( 

1385 "Received response for google.pubsub_v1.PublisherClient.list_topics", 

1386 extra={ 

1387 "serviceName": "google.pubsub.v1.Publisher", 

1388 "rpcName": "ListTopics", 

1389 "metadata": http_response["headers"], 

1390 "httpResponse": http_response, 

1391 }, 

1392 ) 

1393 return resp 

1394 

1395 class _ListTopicSnapshots( 

1396 _BasePublisherRestTransport._BaseListTopicSnapshots, PublisherRestStub 

1397 ): 

1398 def __hash__(self): 

1399 return hash("PublisherRestTransport.ListTopicSnapshots") 

1400 

1401 @staticmethod 

1402 def _get_response( 

1403 host, 

1404 metadata, 

1405 query_params, 

1406 session, 

1407 timeout, 

1408 transcoded_request, 

1409 body=None, 

1410 ): 

1411 uri = transcoded_request["uri"] 

1412 method = transcoded_request["method"] 

1413 headers = dict(metadata) 

1414 headers["Content-Type"] = "application/json" 

1415 response = getattr(session, method)( 

1416 "{host}{uri}".format(host=host, uri=uri), 

1417 timeout=timeout, 

1418 headers=headers, 

1419 params=rest_helpers.flatten_query_params(query_params, strict=True), 

1420 ) 

1421 return response 

1422 

1423 def __call__( 

1424 self, 

1425 request: pubsub.ListTopicSnapshotsRequest, 

1426 *, 

1427 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

1428 timeout: Optional[float] = None, 

1429 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

1430 ) -> pubsub.ListTopicSnapshotsResponse: 

1431 r"""Call the list topic snapshots method over HTTP. 

1432 

1433 Args: 

1434 request (~.pubsub.ListTopicSnapshotsRequest): 

1435 The request object. Request for the ``ListTopicSnapshots`` method. 

1436 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

1437 should be retried. 

1438 timeout (float): The timeout for this request. 

1439 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

1440 sent along with the request as metadata. Normally, each value must be of type `str`, 

1441 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

1442 be of type `bytes`. 

1443 

1444 Returns: 

1445 ~.pubsub.ListTopicSnapshotsResponse: 

1446 Response for the ``ListTopicSnapshots`` method. 

1447 """ 

1448 

1449 http_options = ( 

1450 _BasePublisherRestTransport._BaseListTopicSnapshots._get_http_options() 

1451 ) 

1452 

1453 request, metadata = self._interceptor.pre_list_topic_snapshots( 

1454 request, metadata 

1455 ) 

1456 transcoded_request = _BasePublisherRestTransport._BaseListTopicSnapshots._get_transcoded_request( 

1457 http_options, request 

1458 ) 

1459 

1460 # Jsonify the query params 

1461 query_params = _BasePublisherRestTransport._BaseListTopicSnapshots._get_query_params_json( 

1462 transcoded_request 

1463 ) 

1464 

1465 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1466 logging.DEBUG 

1467 ): # pragma: NO COVER 

1468 request_url = "{host}{uri}".format( 

1469 host=self._host, uri=transcoded_request["uri"] 

1470 ) 

1471 method = transcoded_request["method"] 

1472 try: 

1473 request_payload = type(request).to_json(request) 

1474 except: 

1475 request_payload = None 

1476 http_request = { 

1477 "payload": request_payload, 

1478 "requestMethod": method, 

1479 "requestUrl": request_url, 

1480 "headers": dict(metadata), 

1481 } 

1482 _LOGGER.debug( 

1483 f"Sending request for google.pubsub_v1.PublisherClient.ListTopicSnapshots", 

1484 extra={ 

1485 "serviceName": "google.pubsub.v1.Publisher", 

1486 "rpcName": "ListTopicSnapshots", 

1487 "httpRequest": http_request, 

1488 "metadata": http_request["headers"], 

1489 }, 

1490 ) 

1491 

1492 # Send the request 

1493 response = PublisherRestTransport._ListTopicSnapshots._get_response( 

1494 self._host, 

1495 metadata, 

1496 query_params, 

1497 self._session, 

1498 timeout, 

1499 transcoded_request, 

1500 ) 

1501 

1502 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

1503 # subclass. 

1504 if response.status_code >= 400: 

1505 raise core_exceptions.from_http_response(response) 

1506 

1507 # Return the response 

1508 resp = pubsub.ListTopicSnapshotsResponse() 

1509 pb_resp = pubsub.ListTopicSnapshotsResponse.pb(resp) 

1510 

1511 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

1512 

1513 resp = self._interceptor.post_list_topic_snapshots(resp) 

1514 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

1515 resp, _ = self._interceptor.post_list_topic_snapshots_with_metadata( 

1516 resp, response_metadata 

1517 ) 

1518 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1519 logging.DEBUG 

1520 ): # pragma: NO COVER 

1521 try: 

1522 response_payload = pubsub.ListTopicSnapshotsResponse.to_json( 

1523 response 

1524 ) 

1525 except: 

1526 response_payload = None 

1527 http_response = { 

1528 "payload": response_payload, 

1529 "headers": dict(response.headers), 

1530 "status": response.status_code, 

1531 } 

1532 _LOGGER.debug( 

1533 "Received response for google.pubsub_v1.PublisherClient.list_topic_snapshots", 

1534 extra={ 

1535 "serviceName": "google.pubsub.v1.Publisher", 

1536 "rpcName": "ListTopicSnapshots", 

1537 "metadata": http_response["headers"], 

1538 "httpResponse": http_response, 

1539 }, 

1540 ) 

1541 return resp 

1542 

1543 class _ListTopicSubscriptions( 

1544 _BasePublisherRestTransport._BaseListTopicSubscriptions, PublisherRestStub 

1545 ): 

1546 def __hash__(self): 

1547 return hash("PublisherRestTransport.ListTopicSubscriptions") 

1548 

1549 @staticmethod 

1550 def _get_response( 

1551 host, 

1552 metadata, 

1553 query_params, 

1554 session, 

1555 timeout, 

1556 transcoded_request, 

1557 body=None, 

1558 ): 

1559 uri = transcoded_request["uri"] 

1560 method = transcoded_request["method"] 

1561 headers = dict(metadata) 

1562 headers["Content-Type"] = "application/json" 

1563 response = getattr(session, method)( 

1564 "{host}{uri}".format(host=host, uri=uri), 

1565 timeout=timeout, 

1566 headers=headers, 

1567 params=rest_helpers.flatten_query_params(query_params, strict=True), 

1568 ) 

1569 return response 

1570 

1571 def __call__( 

1572 self, 

1573 request: pubsub.ListTopicSubscriptionsRequest, 

1574 *, 

1575 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

1576 timeout: Optional[float] = None, 

1577 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

1578 ) -> pubsub.ListTopicSubscriptionsResponse: 

1579 r"""Call the list topic subscriptions method over HTTP. 

1580 

1581 Args: 

1582 request (~.pubsub.ListTopicSubscriptionsRequest): 

1583 The request object. Request for the ``ListTopicSubscriptions`` method. 

1584 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

1585 should be retried. 

1586 timeout (float): The timeout for this request. 

1587 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

1588 sent along with the request as metadata. Normally, each value must be of type `str`, 

1589 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

1590 be of type `bytes`. 

1591 

1592 Returns: 

1593 ~.pubsub.ListTopicSubscriptionsResponse: 

1594 Response for the ``ListTopicSubscriptions`` method. 

1595 """ 

1596 

1597 http_options = ( 

1598 _BasePublisherRestTransport._BaseListTopicSubscriptions._get_http_options() 

1599 ) 

1600 

1601 request, metadata = self._interceptor.pre_list_topic_subscriptions( 

1602 request, metadata 

1603 ) 

1604 transcoded_request = _BasePublisherRestTransport._BaseListTopicSubscriptions._get_transcoded_request( 

1605 http_options, request 

1606 ) 

1607 

1608 # Jsonify the query params 

1609 query_params = _BasePublisherRestTransport._BaseListTopicSubscriptions._get_query_params_json( 

1610 transcoded_request 

1611 ) 

1612 

1613 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1614 logging.DEBUG 

1615 ): # pragma: NO COVER 

1616 request_url = "{host}{uri}".format( 

1617 host=self._host, uri=transcoded_request["uri"] 

1618 ) 

1619 method = transcoded_request["method"] 

1620 try: 

1621 request_payload = type(request).to_json(request) 

1622 except: 

1623 request_payload = None 

1624 http_request = { 

1625 "payload": request_payload, 

1626 "requestMethod": method, 

1627 "requestUrl": request_url, 

1628 "headers": dict(metadata), 

1629 } 

1630 _LOGGER.debug( 

1631 f"Sending request for google.pubsub_v1.PublisherClient.ListTopicSubscriptions", 

1632 extra={ 

1633 "serviceName": "google.pubsub.v1.Publisher", 

1634 "rpcName": "ListTopicSubscriptions", 

1635 "httpRequest": http_request, 

1636 "metadata": http_request["headers"], 

1637 }, 

1638 ) 

1639 

1640 # Send the request 

1641 response = PublisherRestTransport._ListTopicSubscriptions._get_response( 

1642 self._host, 

1643 metadata, 

1644 query_params, 

1645 self._session, 

1646 timeout, 

1647 transcoded_request, 

1648 ) 

1649 

1650 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

1651 # subclass. 

1652 if response.status_code >= 400: 

1653 raise core_exceptions.from_http_response(response) 

1654 

1655 # Return the response 

1656 resp = pubsub.ListTopicSubscriptionsResponse() 

1657 pb_resp = pubsub.ListTopicSubscriptionsResponse.pb(resp) 

1658 

1659 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

1660 

1661 resp = self._interceptor.post_list_topic_subscriptions(resp) 

1662 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

1663 resp, _ = self._interceptor.post_list_topic_subscriptions_with_metadata( 

1664 resp, response_metadata 

1665 ) 

1666 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1667 logging.DEBUG 

1668 ): # pragma: NO COVER 

1669 try: 

1670 response_payload = pubsub.ListTopicSubscriptionsResponse.to_json( 

1671 response 

1672 ) 

1673 except: 

1674 response_payload = None 

1675 http_response = { 

1676 "payload": response_payload, 

1677 "headers": dict(response.headers), 

1678 "status": response.status_code, 

1679 } 

1680 _LOGGER.debug( 

1681 "Received response for google.pubsub_v1.PublisherClient.list_topic_subscriptions", 

1682 extra={ 

1683 "serviceName": "google.pubsub.v1.Publisher", 

1684 "rpcName": "ListTopicSubscriptions", 

1685 "metadata": http_response["headers"], 

1686 "httpResponse": http_response, 

1687 }, 

1688 ) 

1689 return resp 

1690 

1691 class _Publish(_BasePublisherRestTransport._BasePublish, PublisherRestStub): 

1692 def __hash__(self): 

1693 return hash("PublisherRestTransport.Publish") 

1694 

1695 @staticmethod 

1696 def _get_response( 

1697 host, 

1698 metadata, 

1699 query_params, 

1700 session, 

1701 timeout, 

1702 transcoded_request, 

1703 body=None, 

1704 ): 

1705 uri = transcoded_request["uri"] 

1706 method = transcoded_request["method"] 

1707 headers = dict(metadata) 

1708 headers["Content-Type"] = "application/json" 

1709 response = getattr(session, method)( 

1710 "{host}{uri}".format(host=host, uri=uri), 

1711 timeout=timeout, 

1712 headers=headers, 

1713 params=rest_helpers.flatten_query_params(query_params, strict=True), 

1714 data=body, 

1715 ) 

1716 return response 

1717 

1718 def __call__( 

1719 self, 

1720 request: pubsub.PublishRequest, 

1721 *, 

1722 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

1723 timeout: Optional[float] = None, 

1724 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

1725 ) -> pubsub.PublishResponse: 

1726 r"""Call the publish method over HTTP. 

1727 

1728 Args: 

1729 request (~.pubsub.PublishRequest): 

1730 The request object. Request for the Publish method. 

1731 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

1732 should be retried. 

1733 timeout (float): The timeout for this request. 

1734 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

1735 sent along with the request as metadata. Normally, each value must be of type `str`, 

1736 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

1737 be of type `bytes`. 

1738 

1739 Returns: 

1740 ~.pubsub.PublishResponse: 

1741 Response for the ``Publish`` method. 

1742 """ 

1743 

1744 http_options = _BasePublisherRestTransport._BasePublish._get_http_options() 

1745 

1746 request, metadata = self._interceptor.pre_publish(request, metadata) 

1747 transcoded_request = ( 

1748 _BasePublisherRestTransport._BasePublish._get_transcoded_request( 

1749 http_options, request 

1750 ) 

1751 ) 

1752 

1753 body = _BasePublisherRestTransport._BasePublish._get_request_body_json( 

1754 transcoded_request 

1755 ) 

1756 

1757 # Jsonify the query params 

1758 query_params = ( 

1759 _BasePublisherRestTransport._BasePublish._get_query_params_json( 

1760 transcoded_request 

1761 ) 

1762 ) 

1763 

1764 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1765 logging.DEBUG 

1766 ): # pragma: NO COVER 

1767 request_url = "{host}{uri}".format( 

1768 host=self._host, uri=transcoded_request["uri"] 

1769 ) 

1770 method = transcoded_request["method"] 

1771 try: 

1772 request_payload = type(request).to_json(request) 

1773 except: 

1774 request_payload = None 

1775 http_request = { 

1776 "payload": request_payload, 

1777 "requestMethod": method, 

1778 "requestUrl": request_url, 

1779 "headers": dict(metadata), 

1780 } 

1781 _LOGGER.debug( 

1782 f"Sending request for google.pubsub_v1.PublisherClient.Publish", 

1783 extra={ 

1784 "serviceName": "google.pubsub.v1.Publisher", 

1785 "rpcName": "Publish", 

1786 "httpRequest": http_request, 

1787 "metadata": http_request["headers"], 

1788 }, 

1789 ) 

1790 

1791 # Send the request 

1792 response = PublisherRestTransport._Publish._get_response( 

1793 self._host, 

1794 metadata, 

1795 query_params, 

1796 self._session, 

1797 timeout, 

1798 transcoded_request, 

1799 body, 

1800 ) 

1801 

1802 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

1803 # subclass. 

1804 if response.status_code >= 400: 

1805 raise core_exceptions.from_http_response(response) 

1806 

1807 # Return the response 

1808 resp = pubsub.PublishResponse() 

1809 pb_resp = pubsub.PublishResponse.pb(resp) 

1810 

1811 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

1812 

1813 resp = self._interceptor.post_publish(resp) 

1814 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

1815 resp, _ = self._interceptor.post_publish_with_metadata( 

1816 resp, response_metadata 

1817 ) 

1818 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1819 logging.DEBUG 

1820 ): # pragma: NO COVER 

1821 try: 

1822 response_payload = pubsub.PublishResponse.to_json(response) 

1823 except: 

1824 response_payload = None 

1825 http_response = { 

1826 "payload": response_payload, 

1827 "headers": dict(response.headers), 

1828 "status": response.status_code, 

1829 } 

1830 _LOGGER.debug( 

1831 "Received response for google.pubsub_v1.PublisherClient.publish", 

1832 extra={ 

1833 "serviceName": "google.pubsub.v1.Publisher", 

1834 "rpcName": "Publish", 

1835 "metadata": http_response["headers"], 

1836 "httpResponse": http_response, 

1837 }, 

1838 ) 

1839 return resp 

1840 

1841 class _UpdateTopic(_BasePublisherRestTransport._BaseUpdateTopic, PublisherRestStub): 

1842 def __hash__(self): 

1843 return hash("PublisherRestTransport.UpdateTopic") 

1844 

1845 @staticmethod 

1846 def _get_response( 

1847 host, 

1848 metadata, 

1849 query_params, 

1850 session, 

1851 timeout, 

1852 transcoded_request, 

1853 body=None, 

1854 ): 

1855 uri = transcoded_request["uri"] 

1856 method = transcoded_request["method"] 

1857 headers = dict(metadata) 

1858 headers["Content-Type"] = "application/json" 

1859 response = getattr(session, method)( 

1860 "{host}{uri}".format(host=host, uri=uri), 

1861 timeout=timeout, 

1862 headers=headers, 

1863 params=rest_helpers.flatten_query_params(query_params, strict=True), 

1864 data=body, 

1865 ) 

1866 return response 

1867 

1868 def __call__( 

1869 self, 

1870 request: pubsub.UpdateTopicRequest, 

1871 *, 

1872 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

1873 timeout: Optional[float] = None, 

1874 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

1875 ) -> pubsub.Topic: 

1876 r"""Call the update topic method over HTTP. 

1877 

1878 Args: 

1879 request (~.pubsub.UpdateTopicRequest): 

1880 The request object. Request for the UpdateTopic method. 

1881 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

1882 should be retried. 

1883 timeout (float): The timeout for this request. 

1884 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

1885 sent along with the request as metadata. Normally, each value must be of type `str`, 

1886 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

1887 be of type `bytes`. 

1888 

1889 Returns: 

1890 ~.pubsub.Topic: 

1891 A topic resource. 

1892 """ 

1893 

1894 http_options = ( 

1895 _BasePublisherRestTransport._BaseUpdateTopic._get_http_options() 

1896 ) 

1897 

1898 request, metadata = self._interceptor.pre_update_topic(request, metadata) 

1899 transcoded_request = ( 

1900 _BasePublisherRestTransport._BaseUpdateTopic._get_transcoded_request( 

1901 http_options, request 

1902 ) 

1903 ) 

1904 

1905 body = _BasePublisherRestTransport._BaseUpdateTopic._get_request_body_json( 

1906 transcoded_request 

1907 ) 

1908 

1909 # Jsonify the query params 

1910 query_params = ( 

1911 _BasePublisherRestTransport._BaseUpdateTopic._get_query_params_json( 

1912 transcoded_request 

1913 ) 

1914 ) 

1915 

1916 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1917 logging.DEBUG 

1918 ): # pragma: NO COVER 

1919 request_url = "{host}{uri}".format( 

1920 host=self._host, uri=transcoded_request["uri"] 

1921 ) 

1922 method = transcoded_request["method"] 

1923 try: 

1924 request_payload = type(request).to_json(request) 

1925 except: 

1926 request_payload = None 

1927 http_request = { 

1928 "payload": request_payload, 

1929 "requestMethod": method, 

1930 "requestUrl": request_url, 

1931 "headers": dict(metadata), 

1932 } 

1933 _LOGGER.debug( 

1934 f"Sending request for google.pubsub_v1.PublisherClient.UpdateTopic", 

1935 extra={ 

1936 "serviceName": "google.pubsub.v1.Publisher", 

1937 "rpcName": "UpdateTopic", 

1938 "httpRequest": http_request, 

1939 "metadata": http_request["headers"], 

1940 }, 

1941 ) 

1942 

1943 # Send the request 

1944 response = PublisherRestTransport._UpdateTopic._get_response( 

1945 self._host, 

1946 metadata, 

1947 query_params, 

1948 self._session, 

1949 timeout, 

1950 transcoded_request, 

1951 body, 

1952 ) 

1953 

1954 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

1955 # subclass. 

1956 if response.status_code >= 400: 

1957 raise core_exceptions.from_http_response(response) 

1958 

1959 # Return the response 

1960 resp = pubsub.Topic() 

1961 pb_resp = pubsub.Topic.pb(resp) 

1962 

1963 json_format.Parse(response.content, pb_resp, ignore_unknown_fields=True) 

1964 

1965 resp = self._interceptor.post_update_topic(resp) 

1966 response_metadata = [(k, str(v)) for k, v in response.headers.items()] 

1967 resp, _ = self._interceptor.post_update_topic_with_metadata( 

1968 resp, response_metadata 

1969 ) 

1970 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

1971 logging.DEBUG 

1972 ): # pragma: NO COVER 

1973 try: 

1974 response_payload = pubsub.Topic.to_json(response) 

1975 except: 

1976 response_payload = None 

1977 http_response = { 

1978 "payload": response_payload, 

1979 "headers": dict(response.headers), 

1980 "status": response.status_code, 

1981 } 

1982 _LOGGER.debug( 

1983 "Received response for google.pubsub_v1.PublisherClient.update_topic", 

1984 extra={ 

1985 "serviceName": "google.pubsub.v1.Publisher", 

1986 "rpcName": "UpdateTopic", 

1987 "metadata": http_response["headers"], 

1988 "httpResponse": http_response, 

1989 }, 

1990 ) 

1991 return resp 

1992 

1993 @property 

1994 def create_topic(self) -> Callable[[pubsub.Topic], pubsub.Topic]: 

1995 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

1996 # In C++ this would require a dynamic_cast 

1997 return self._CreateTopic(self._session, self._host, self._interceptor) # type: ignore 

1998 

1999 @property 

2000 def delete_topic(self) -> Callable[[pubsub.DeleteTopicRequest], empty_pb2.Empty]: 

2001 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

2002 # In C++ this would require a dynamic_cast 

2003 return self._DeleteTopic(self._session, self._host, self._interceptor) # type: ignore 

2004 

2005 @property 

2006 def detach_subscription( 

2007 self, 

2008 ) -> Callable[ 

2009 [pubsub.DetachSubscriptionRequest], pubsub.DetachSubscriptionResponse 

2010 ]: 

2011 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

2012 # In C++ this would require a dynamic_cast 

2013 return self._DetachSubscription(self._session, self._host, self._interceptor) # type: ignore 

2014 

2015 @property 

2016 def get_topic(self) -> Callable[[pubsub.GetTopicRequest], pubsub.Topic]: 

2017 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

2018 # In C++ this would require a dynamic_cast 

2019 return self._GetTopic(self._session, self._host, self._interceptor) # type: ignore 

2020 

2021 @property 

2022 def list_topics( 

2023 self, 

2024 ) -> Callable[[pubsub.ListTopicsRequest], pubsub.ListTopicsResponse]: 

2025 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

2026 # In C++ this would require a dynamic_cast 

2027 return self._ListTopics(self._session, self._host, self._interceptor) # type: ignore 

2028 

2029 @property 

2030 def list_topic_snapshots( 

2031 self, 

2032 ) -> Callable[ 

2033 [pubsub.ListTopicSnapshotsRequest], pubsub.ListTopicSnapshotsResponse 

2034 ]: 

2035 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

2036 # In C++ this would require a dynamic_cast 

2037 return self._ListTopicSnapshots(self._session, self._host, self._interceptor) # type: ignore 

2038 

2039 @property 

2040 def list_topic_subscriptions( 

2041 self, 

2042 ) -> Callable[ 

2043 [pubsub.ListTopicSubscriptionsRequest], pubsub.ListTopicSubscriptionsResponse 

2044 ]: 

2045 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

2046 # In C++ this would require a dynamic_cast 

2047 return self._ListTopicSubscriptions(self._session, self._host, self._interceptor) # type: ignore 

2048 

2049 @property 

2050 def publish(self) -> Callable[[pubsub.PublishRequest], pubsub.PublishResponse]: 

2051 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

2052 # In C++ this would require a dynamic_cast 

2053 return self._Publish(self._session, self._host, self._interceptor) # type: ignore 

2054 

2055 @property 

2056 def update_topic(self) -> Callable[[pubsub.UpdateTopicRequest], pubsub.Topic]: 

2057 # The return type is fine, but mypy isn't sophisticated enough to determine what's going on here. 

2058 # In C++ this would require a dynamic_cast 

2059 return self._UpdateTopic(self._session, self._host, self._interceptor) # type: ignore 

2060 

2061 @property 

2062 def get_iam_policy(self): 

2063 return self._GetIamPolicy(self._session, self._host, self._interceptor) # type: ignore 

2064 

2065 class _GetIamPolicy( 

2066 _BasePublisherRestTransport._BaseGetIamPolicy, PublisherRestStub 

2067 ): 

2068 def __hash__(self): 

2069 return hash("PublisherRestTransport.GetIamPolicy") 

2070 

2071 @staticmethod 

2072 def _get_response( 

2073 host, 

2074 metadata, 

2075 query_params, 

2076 session, 

2077 timeout, 

2078 transcoded_request, 

2079 body=None, 

2080 ): 

2081 uri = transcoded_request["uri"] 

2082 method = transcoded_request["method"] 

2083 headers = dict(metadata) 

2084 headers["Content-Type"] = "application/json" 

2085 response = getattr(session, method)( 

2086 "{host}{uri}".format(host=host, uri=uri), 

2087 timeout=timeout, 

2088 headers=headers, 

2089 params=rest_helpers.flatten_query_params(query_params, strict=True), 

2090 ) 

2091 return response 

2092 

2093 def __call__( 

2094 self, 

2095 request: iam_policy_pb2.GetIamPolicyRequest, 

2096 *, 

2097 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

2098 timeout: Optional[float] = None, 

2099 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

2100 ) -> policy_pb2.Policy: 

2101 r"""Call the get iam policy method over HTTP. 

2102 

2103 Args: 

2104 request (iam_policy_pb2.GetIamPolicyRequest): 

2105 The request object for GetIamPolicy method. 

2106 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

2107 should be retried. 

2108 timeout (float): The timeout for this request. 

2109 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

2110 sent along with the request as metadata. Normally, each value must be of type `str`, 

2111 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

2112 be of type `bytes`. 

2113 

2114 Returns: 

2115 policy_pb2.Policy: Response from GetIamPolicy method. 

2116 """ 

2117 

2118 http_options = ( 

2119 _BasePublisherRestTransport._BaseGetIamPolicy._get_http_options() 

2120 ) 

2121 

2122 request, metadata = self._interceptor.pre_get_iam_policy(request, metadata) 

2123 transcoded_request = ( 

2124 _BasePublisherRestTransport._BaseGetIamPolicy._get_transcoded_request( 

2125 http_options, request 

2126 ) 

2127 ) 

2128 

2129 # Jsonify the query params 

2130 query_params = ( 

2131 _BasePublisherRestTransport._BaseGetIamPolicy._get_query_params_json( 

2132 transcoded_request 

2133 ) 

2134 ) 

2135 

2136 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

2137 logging.DEBUG 

2138 ): # pragma: NO COVER 

2139 request_url = "{host}{uri}".format( 

2140 host=self._host, uri=transcoded_request["uri"] 

2141 ) 

2142 method = transcoded_request["method"] 

2143 try: 

2144 request_payload = json_format.MessageToJson(request) 

2145 except: 

2146 request_payload = None 

2147 http_request = { 

2148 "payload": request_payload, 

2149 "requestMethod": method, 

2150 "requestUrl": request_url, 

2151 "headers": dict(metadata), 

2152 } 

2153 _LOGGER.debug( 

2154 f"Sending request for google.pubsub_v1.PublisherClient.GetIamPolicy", 

2155 extra={ 

2156 "serviceName": "google.pubsub.v1.Publisher", 

2157 "rpcName": "GetIamPolicy", 

2158 "httpRequest": http_request, 

2159 "metadata": http_request["headers"], 

2160 }, 

2161 ) 

2162 

2163 # Send the request 

2164 response = PublisherRestTransport._GetIamPolicy._get_response( 

2165 self._host, 

2166 metadata, 

2167 query_params, 

2168 self._session, 

2169 timeout, 

2170 transcoded_request, 

2171 ) 

2172 

2173 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

2174 # subclass. 

2175 if response.status_code >= 400: 

2176 raise core_exceptions.from_http_response(response) 

2177 

2178 content = response.content.decode("utf-8") 

2179 resp = policy_pb2.Policy() 

2180 resp = json_format.Parse(content, resp) 

2181 resp = self._interceptor.post_get_iam_policy(resp) 

2182 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

2183 logging.DEBUG 

2184 ): # pragma: NO COVER 

2185 try: 

2186 response_payload = json_format.MessageToJson(resp) 

2187 except: 

2188 response_payload = None 

2189 http_response = { 

2190 "payload": response_payload, 

2191 "headers": dict(response.headers), 

2192 "status": response.status_code, 

2193 } 

2194 _LOGGER.debug( 

2195 "Received response for google.pubsub_v1.PublisherAsyncClient.GetIamPolicy", 

2196 extra={ 

2197 "serviceName": "google.pubsub.v1.Publisher", 

2198 "rpcName": "GetIamPolicy", 

2199 "httpResponse": http_response, 

2200 "metadata": http_response["headers"], 

2201 }, 

2202 ) 

2203 return resp 

2204 

2205 @property 

2206 def set_iam_policy(self): 

2207 return self._SetIamPolicy(self._session, self._host, self._interceptor) # type: ignore 

2208 

2209 class _SetIamPolicy( 

2210 _BasePublisherRestTransport._BaseSetIamPolicy, PublisherRestStub 

2211 ): 

2212 def __hash__(self): 

2213 return hash("PublisherRestTransport.SetIamPolicy") 

2214 

2215 @staticmethod 

2216 def _get_response( 

2217 host, 

2218 metadata, 

2219 query_params, 

2220 session, 

2221 timeout, 

2222 transcoded_request, 

2223 body=None, 

2224 ): 

2225 uri = transcoded_request["uri"] 

2226 method = transcoded_request["method"] 

2227 headers = dict(metadata) 

2228 headers["Content-Type"] = "application/json" 

2229 response = getattr(session, method)( 

2230 "{host}{uri}".format(host=host, uri=uri), 

2231 timeout=timeout, 

2232 headers=headers, 

2233 params=rest_helpers.flatten_query_params(query_params, strict=True), 

2234 data=body, 

2235 ) 

2236 return response 

2237 

2238 def __call__( 

2239 self, 

2240 request: iam_policy_pb2.SetIamPolicyRequest, 

2241 *, 

2242 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

2243 timeout: Optional[float] = None, 

2244 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

2245 ) -> policy_pb2.Policy: 

2246 r"""Call the set iam policy method over HTTP. 

2247 

2248 Args: 

2249 request (iam_policy_pb2.SetIamPolicyRequest): 

2250 The request object for SetIamPolicy method. 

2251 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

2252 should be retried. 

2253 timeout (float): The timeout for this request. 

2254 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

2255 sent along with the request as metadata. Normally, each value must be of type `str`, 

2256 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

2257 be of type `bytes`. 

2258 

2259 Returns: 

2260 policy_pb2.Policy: Response from SetIamPolicy method. 

2261 """ 

2262 

2263 http_options = ( 

2264 _BasePublisherRestTransport._BaseSetIamPolicy._get_http_options() 

2265 ) 

2266 

2267 request, metadata = self._interceptor.pre_set_iam_policy(request, metadata) 

2268 transcoded_request = ( 

2269 _BasePublisherRestTransport._BaseSetIamPolicy._get_transcoded_request( 

2270 http_options, request 

2271 ) 

2272 ) 

2273 

2274 body = _BasePublisherRestTransport._BaseSetIamPolicy._get_request_body_json( 

2275 transcoded_request 

2276 ) 

2277 

2278 # Jsonify the query params 

2279 query_params = ( 

2280 _BasePublisherRestTransport._BaseSetIamPolicy._get_query_params_json( 

2281 transcoded_request 

2282 ) 

2283 ) 

2284 

2285 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

2286 logging.DEBUG 

2287 ): # pragma: NO COVER 

2288 request_url = "{host}{uri}".format( 

2289 host=self._host, uri=transcoded_request["uri"] 

2290 ) 

2291 method = transcoded_request["method"] 

2292 try: 

2293 request_payload = json_format.MessageToJson(request) 

2294 except: 

2295 request_payload = None 

2296 http_request = { 

2297 "payload": request_payload, 

2298 "requestMethod": method, 

2299 "requestUrl": request_url, 

2300 "headers": dict(metadata), 

2301 } 

2302 _LOGGER.debug( 

2303 f"Sending request for google.pubsub_v1.PublisherClient.SetIamPolicy", 

2304 extra={ 

2305 "serviceName": "google.pubsub.v1.Publisher", 

2306 "rpcName": "SetIamPolicy", 

2307 "httpRequest": http_request, 

2308 "metadata": http_request["headers"], 

2309 }, 

2310 ) 

2311 

2312 # Send the request 

2313 response = PublisherRestTransport._SetIamPolicy._get_response( 

2314 self._host, 

2315 metadata, 

2316 query_params, 

2317 self._session, 

2318 timeout, 

2319 transcoded_request, 

2320 body, 

2321 ) 

2322 

2323 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

2324 # subclass. 

2325 if response.status_code >= 400: 

2326 raise core_exceptions.from_http_response(response) 

2327 

2328 content = response.content.decode("utf-8") 

2329 resp = policy_pb2.Policy() 

2330 resp = json_format.Parse(content, resp) 

2331 resp = self._interceptor.post_set_iam_policy(resp) 

2332 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

2333 logging.DEBUG 

2334 ): # pragma: NO COVER 

2335 try: 

2336 response_payload = json_format.MessageToJson(resp) 

2337 except: 

2338 response_payload = None 

2339 http_response = { 

2340 "payload": response_payload, 

2341 "headers": dict(response.headers), 

2342 "status": response.status_code, 

2343 } 

2344 _LOGGER.debug( 

2345 "Received response for google.pubsub_v1.PublisherAsyncClient.SetIamPolicy", 

2346 extra={ 

2347 "serviceName": "google.pubsub.v1.Publisher", 

2348 "rpcName": "SetIamPolicy", 

2349 "httpResponse": http_response, 

2350 "metadata": http_response["headers"], 

2351 }, 

2352 ) 

2353 return resp 

2354 

2355 @property 

2356 def test_iam_permissions(self): 

2357 return self._TestIamPermissions(self._session, self._host, self._interceptor) # type: ignore 

2358 

2359 class _TestIamPermissions( 

2360 _BasePublisherRestTransport._BaseTestIamPermissions, PublisherRestStub 

2361 ): 

2362 def __hash__(self): 

2363 return hash("PublisherRestTransport.TestIamPermissions") 

2364 

2365 @staticmethod 

2366 def _get_response( 

2367 host, 

2368 metadata, 

2369 query_params, 

2370 session, 

2371 timeout, 

2372 transcoded_request, 

2373 body=None, 

2374 ): 

2375 uri = transcoded_request["uri"] 

2376 method = transcoded_request["method"] 

2377 headers = dict(metadata) 

2378 headers["Content-Type"] = "application/json" 

2379 response = getattr(session, method)( 

2380 "{host}{uri}".format(host=host, uri=uri), 

2381 timeout=timeout, 

2382 headers=headers, 

2383 params=rest_helpers.flatten_query_params(query_params, strict=True), 

2384 data=body, 

2385 ) 

2386 return response 

2387 

2388 def __call__( 

2389 self, 

2390 request: iam_policy_pb2.TestIamPermissionsRequest, 

2391 *, 

2392 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

2393 timeout: Optional[float] = None, 

2394 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

2395 ) -> iam_policy_pb2.TestIamPermissionsResponse: 

2396 r"""Call the test iam permissions method over HTTP. 

2397 

2398 Args: 

2399 request (iam_policy_pb2.TestIamPermissionsRequest): 

2400 The request object for TestIamPermissions method. 

2401 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

2402 should be retried. 

2403 timeout (float): The timeout for this request. 

2404 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

2405 sent along with the request as metadata. Normally, each value must be of type `str`, 

2406 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

2407 be of type `bytes`. 

2408 

2409 Returns: 

2410 iam_policy_pb2.TestIamPermissionsResponse: Response from TestIamPermissions method. 

2411 """ 

2412 

2413 http_options = ( 

2414 _BasePublisherRestTransport._BaseTestIamPermissions._get_http_options() 

2415 ) 

2416 

2417 request, metadata = self._interceptor.pre_test_iam_permissions( 

2418 request, metadata 

2419 ) 

2420 transcoded_request = _BasePublisherRestTransport._BaseTestIamPermissions._get_transcoded_request( 

2421 http_options, request 

2422 ) 

2423 

2424 body = _BasePublisherRestTransport._BaseTestIamPermissions._get_request_body_json( 

2425 transcoded_request 

2426 ) 

2427 

2428 # Jsonify the query params 

2429 query_params = _BasePublisherRestTransport._BaseTestIamPermissions._get_query_params_json( 

2430 transcoded_request 

2431 ) 

2432 

2433 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

2434 logging.DEBUG 

2435 ): # pragma: NO COVER 

2436 request_url = "{host}{uri}".format( 

2437 host=self._host, uri=transcoded_request["uri"] 

2438 ) 

2439 method = transcoded_request["method"] 

2440 try: 

2441 request_payload = json_format.MessageToJson(request) 

2442 except: 

2443 request_payload = None 

2444 http_request = { 

2445 "payload": request_payload, 

2446 "requestMethod": method, 

2447 "requestUrl": request_url, 

2448 "headers": dict(metadata), 

2449 } 

2450 _LOGGER.debug( 

2451 f"Sending request for google.pubsub_v1.PublisherClient.TestIamPermissions", 

2452 extra={ 

2453 "serviceName": "google.pubsub.v1.Publisher", 

2454 "rpcName": "TestIamPermissions", 

2455 "httpRequest": http_request, 

2456 "metadata": http_request["headers"], 

2457 }, 

2458 ) 

2459 

2460 # Send the request 

2461 response = PublisherRestTransport._TestIamPermissions._get_response( 

2462 self._host, 

2463 metadata, 

2464 query_params, 

2465 self._session, 

2466 timeout, 

2467 transcoded_request, 

2468 body, 

2469 ) 

2470 

2471 # In case of error, raise the appropriate core_exceptions.GoogleAPICallError exception 

2472 # subclass. 

2473 if response.status_code >= 400: 

2474 raise core_exceptions.from_http_response(response) 

2475 

2476 content = response.content.decode("utf-8") 

2477 resp = iam_policy_pb2.TestIamPermissionsResponse() 

2478 resp = json_format.Parse(content, resp) 

2479 resp = self._interceptor.post_test_iam_permissions(resp) 

2480 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

2481 logging.DEBUG 

2482 ): # pragma: NO COVER 

2483 try: 

2484 response_payload = json_format.MessageToJson(resp) 

2485 except: 

2486 response_payload = None 

2487 http_response = { 

2488 "payload": response_payload, 

2489 "headers": dict(response.headers), 

2490 "status": response.status_code, 

2491 } 

2492 _LOGGER.debug( 

2493 "Received response for google.pubsub_v1.PublisherAsyncClient.TestIamPermissions", 

2494 extra={ 

2495 "serviceName": "google.pubsub.v1.Publisher", 

2496 "rpcName": "TestIamPermissions", 

2497 "httpResponse": http_response, 

2498 "metadata": http_response["headers"], 

2499 }, 

2500 ) 

2501 return resp 

2502 

2503 @property 

2504 def kind(self) -> str: 

2505 return "rest" 

2506 

2507 def close(self): 

2508 self._session.close() 

2509 

2510 

2511__all__ = ("PublisherRestTransport",)