Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/google/pubsub_v1/services/subscriber/transports/grpc.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

160 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2024 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import json 

17import logging as std_logging 

18import pickle 

19import warnings 

20from typing import Callable, Dict, Optional, Sequence, Tuple, Union 

21 

22from google.api_core import grpc_helpers 

23from google.api_core import gapic_v1 

24import google.auth # type: ignore 

25from google.auth import credentials as ga_credentials # type: ignore 

26from google.auth.transport.grpc import SslCredentials # type: ignore 

27from google.protobuf.json_format import MessageToJson 

28import google.protobuf.message 

29 

30import grpc # type: ignore 

31import proto # type: ignore 

32 

33from google.iam.v1 import iam_policy_pb2 # type: ignore 

34from google.iam.v1 import policy_pb2 # type: ignore 

35from google.protobuf import empty_pb2 # type: ignore 

36from google.pubsub_v1.types import pubsub 

37from .base import SubscriberTransport, DEFAULT_CLIENT_INFO 

38 

39try: 

40 from google.api_core import client_logging # type: ignore 

41 

42 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

43except ImportError: # pragma: NO COVER 

44 CLIENT_LOGGING_SUPPORTED = False 

45 

46_LOGGER = std_logging.getLogger(__name__) 

47 

48 

49class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER 

50 def intercept_unary_unary(self, continuation, client_call_details, request): 

51 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

52 std_logging.DEBUG 

53 ) 

54 if logging_enabled: # pragma: NO COVER 

55 request_metadata = client_call_details.metadata 

56 if isinstance(request, proto.Message): 

57 request_payload = type(request).to_json(request) 

58 elif isinstance(request, google.protobuf.message.Message): 

59 request_payload = MessageToJson(request) 

60 else: 

61 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" 

62 

63 request_metadata = { 

64 key: value.decode("utf-8") if isinstance(value, bytes) else value 

65 for key, value in request_metadata 

66 } 

67 grpc_request = { 

68 "payload": request_payload, 

69 "requestMethod": "grpc", 

70 "metadata": dict(request_metadata), 

71 } 

72 _LOGGER.debug( 

73 f"Sending request for {client_call_details.method}", 

74 extra={ 

75 "serviceName": "google.pubsub.v1.Subscriber", 

76 "rpcName": client_call_details.method, 

77 "request": grpc_request, 

78 "metadata": grpc_request["metadata"], 

79 }, 

80 ) 

81 

82 response = continuation(client_call_details, request) 

83 if logging_enabled: # pragma: NO COVER 

84 response_metadata = response.trailing_metadata() 

85 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples 

86 metadata = ( 

87 dict([(k, str(v)) for k, v in response_metadata]) 

88 if response_metadata 

89 else None 

90 ) 

91 result = response.result() 

92 if isinstance(result, proto.Message): 

93 response_payload = type(result).to_json(result) 

94 elif isinstance(result, google.protobuf.message.Message): 

95 response_payload = MessageToJson(result) 

96 else: 

97 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" 

98 grpc_response = { 

99 "payload": response_payload, 

100 "metadata": metadata, 

101 "status": "OK", 

102 } 

103 _LOGGER.debug( 

104 f"Received response for {client_call_details.method}.", 

105 extra={ 

106 "serviceName": "google.pubsub.v1.Subscriber", 

107 "rpcName": client_call_details.method, 

108 "response": grpc_response, 

109 "metadata": grpc_response["metadata"], 

110 }, 

111 ) 

112 return response 

113 

114 

115class SubscriberGrpcTransport(SubscriberTransport): 

116 """gRPC backend transport for Subscriber. 

117 

118 The service that an application uses to manipulate subscriptions and 

119 to consume messages from a subscription via the ``Pull`` method or 

120 by establishing a bi-directional stream using the ``StreamingPull`` 

121 method. 

122 

123 This class defines the same methods as the primary client, so the 

124 primary client can load the underlying transport implementation 

125 and call it. 

126 

127 It sends protocol buffers over the wire using gRPC (which is built on 

128 top of HTTP/2); the ``grpcio`` package must be installed. 

129 """ 

130 

131 _stubs: Dict[str, Callable] 

132 

133 def __init__( 

134 self, 

135 *, 

136 host: str = "pubsub.googleapis.com", 

137 credentials: Optional[ga_credentials.Credentials] = None, 

138 credentials_file: Optional[str] = None, 

139 scopes: Optional[Sequence[str]] = None, 

140 channel: Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]] = None, 

141 api_mtls_endpoint: Optional[str] = None, 

142 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

143 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 

144 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

145 quota_project_id: Optional[str] = None, 

146 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

147 always_use_jwt_access: Optional[bool] = False, 

148 api_audience: Optional[str] = None, 

149 ) -> None: 

150 """Instantiate the transport. 

151 

152 Args: 

153 host (Optional[str]): 

154 The hostname to connect to (default: 'pubsub.googleapis.com'). 

155 credentials (Optional[google.auth.credentials.Credentials]): The 

156 authorization credentials to attach to requests. These 

157 credentials identify the application to the service; if none 

158 are specified, the client will attempt to ascertain the 

159 credentials from the environment. 

160 This argument is ignored if a ``channel`` instance is provided. 

161 credentials_file (Optional[str]): A file with credentials that can 

162 be loaded with :func:`google.auth.load_credentials_from_file`. 

163 This argument is ignored if a ``channel`` instance is provided. 

164 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

165 ignored if a ``channel`` instance is provided. 

166 channel (Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]]): 

167 A ``Channel`` instance through which to make calls, or a Callable 

168 that constructs and returns one. If set to None, ``self.create_channel`` 

169 is used to create the channel. If a Callable is given, it will be called 

170 with the same arguments as used in ``self.create_channel``. 

171 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 

172 If provided, it overrides the ``host`` argument and tries to create 

173 a mutual TLS channel with client SSL credentials from 

174 ``client_cert_source`` or application default SSL credentials. 

175 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 

176 Deprecated. A callback to provide client SSL certificate bytes and 

177 private key bytes, both in PEM format. It is ignored if 

178 ``api_mtls_endpoint`` is None. 

179 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 

180 for the grpc channel. It is ignored if a ``channel`` instance is provided. 

181 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 

182 A callback to provide client certificate bytes and private key bytes, 

183 both in PEM format. It is used to configure a mutual TLS channel. It is 

184 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided. 

185 quota_project_id (Optional[str]): An optional project to use for billing 

186 and quota. 

187 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

188 The client info used to send a user-agent string along with 

189 API requests. If ``None``, then default info will be used. 

190 Generally, you only need to set this if you're developing 

191 your own client library. 

192 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

193 be used for service account credentials. 

194 

195 Raises: 

196 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport 

197 creation failed for any reason. 

198 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

199 and ``credentials_file`` are passed. 

200 """ 

201 self._grpc_channel = None 

202 self._ssl_channel_credentials = ssl_channel_credentials 

203 self._stubs: Dict[str, Callable] = {} 

204 

205 if api_mtls_endpoint: 

206 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 

207 if client_cert_source: 

208 warnings.warn("client_cert_source is deprecated", DeprecationWarning) 

209 

210 if isinstance(channel, grpc.Channel): 

211 # Ignore credentials if a channel was passed. 

212 credentials = None 

213 self._ignore_credentials = True 

214 # If a channel was explicitly provided, set it. 

215 self._grpc_channel = channel 

216 self._ssl_channel_credentials = None 

217 

218 else: 

219 if api_mtls_endpoint: 

220 host = api_mtls_endpoint 

221 

222 # Create SSL credentials with client_cert_source or application 

223 # default SSL credentials. 

224 if client_cert_source: 

225 cert, key = client_cert_source() 

226 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

227 certificate_chain=cert, private_key=key 

228 ) 

229 else: 

230 self._ssl_channel_credentials = SslCredentials().ssl_credentials 

231 

232 else: 

233 if client_cert_source_for_mtls and not ssl_channel_credentials: 

234 cert, key = client_cert_source_for_mtls() 

235 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

236 certificate_chain=cert, private_key=key 

237 ) 

238 

239 # The base transport sets the host, credentials and scopes 

240 super().__init__( 

241 host=host, 

242 credentials=credentials, 

243 credentials_file=credentials_file, 

244 scopes=scopes, 

245 quota_project_id=quota_project_id, 

246 client_info=client_info, 

247 always_use_jwt_access=always_use_jwt_access, 

248 api_audience=api_audience, 

249 ) 

250 

251 if not self._grpc_channel: 

252 # initialize with the provided callable or the default channel 

253 channel_init = channel or type(self).create_channel 

254 self._grpc_channel = channel_init( 

255 self._host, 

256 # use the credentials which are saved 

257 credentials=self._credentials, 

258 # Set ``credentials_file`` to ``None`` here as 

259 # the credentials that we saved earlier should be used. 

260 credentials_file=None, 

261 scopes=self._scopes, 

262 ssl_credentials=self._ssl_channel_credentials, 

263 quota_project_id=quota_project_id, 

264 options=[ 

265 ("grpc.max_send_message_length", -1), 

266 ("grpc.max_receive_message_length", -1), 

267 ("grpc.max_metadata_size", 4 * 1024 * 1024), 

268 ("grpc.keepalive_time_ms", 30000), 

269 ], 

270 ) 

271 

272 self._interceptor = _LoggingClientInterceptor() 

273 self._logged_channel = grpc.intercept_channel( 

274 self._grpc_channel, self._interceptor 

275 ) 

276 

277 # Wrap messages. This must be done after self._logged_channel exists 

278 self._prep_wrapped_messages(client_info) 

279 

280 @classmethod 

281 def create_channel( 

282 cls, 

283 host: str = "pubsub.googleapis.com", 

284 credentials: Optional[ga_credentials.Credentials] = None, 

285 credentials_file: Optional[str] = None, 

286 scopes: Optional[Sequence[str]] = None, 

287 quota_project_id: Optional[str] = None, 

288 **kwargs, 

289 ) -> grpc.Channel: 

290 """Create and return a gRPC channel object. 

291 Args: 

292 host (Optional[str]): The host for the channel to use. 

293 credentials (Optional[~.Credentials]): The 

294 authorization credentials to attach to requests. These 

295 credentials identify this application to the service. If 

296 none are specified, the client will attempt to ascertain 

297 the credentials from the environment. 

298 credentials_file (Optional[str]): A file with credentials that can 

299 be loaded with :func:`google.auth.load_credentials_from_file`. 

300 This argument is mutually exclusive with credentials. 

301 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

302 service. These are only used when credentials are not specified and 

303 are passed to :func:`google.auth.default`. 

304 quota_project_id (Optional[str]): An optional project to use for billing 

305 and quota. 

306 kwargs (Optional[dict]): Keyword arguments, which are passed to the 

307 channel creation. 

308 Returns: 

309 grpc.Channel: A gRPC channel object. 

310 

311 Raises: 

312 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

313 and ``credentials_file`` are passed. 

314 """ 

315 

316 return grpc_helpers.create_channel( 

317 host, 

318 credentials=credentials, 

319 credentials_file=credentials_file, 

320 quota_project_id=quota_project_id, 

321 default_scopes=cls.AUTH_SCOPES, 

322 scopes=scopes, 

323 default_host=cls.DEFAULT_HOST, 

324 **kwargs, 

325 ) 

326 

327 @property 

328 def grpc_channel(self) -> grpc.Channel: 

329 """Return the channel designed to connect to this service.""" 

330 return self._grpc_channel 

331 

332 @property 

333 def create_subscription( 

334 self, 

335 ) -> Callable[[pubsub.Subscription], pubsub.Subscription]: 

336 r"""Return a callable for the create subscription method over gRPC. 

337 

338 Creates a subscription to a given topic. See the [resource name 

339 rules] 

340 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 

341 If the subscription already exists, returns ``ALREADY_EXISTS``. 

342 If the corresponding topic doesn't exist, returns ``NOT_FOUND``. 

343 

344 If the name is not provided in the request, the server will 

345 assign a random name for this subscription on the same project 

346 as the topic, conforming to the [resource name format] 

347 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 

348 The generated name is populated in the returned Subscription 

349 object. Note that for REST API requests, you must specify a name 

350 in the request. 

351 

352 Returns: 

353 Callable[[~.Subscription], 

354 ~.Subscription]: 

355 A function that, when called, will call the underlying RPC 

356 on the server. 

357 """ 

358 # Generate a "stub function" on-the-fly which will actually make 

359 # the request. 

360 # gRPC handles serialization and deserialization, so we just need 

361 # to pass in the functions for each. 

362 if "create_subscription" not in self._stubs: 

363 self._stubs["create_subscription"] = self._logged_channel.unary_unary( 

364 "/google.pubsub.v1.Subscriber/CreateSubscription", 

365 request_serializer=pubsub.Subscription.serialize, 

366 response_deserializer=pubsub.Subscription.deserialize, 

367 ) 

368 return self._stubs["create_subscription"] 

369 

370 @property 

371 def get_subscription( 

372 self, 

373 ) -> Callable[[pubsub.GetSubscriptionRequest], pubsub.Subscription]: 

374 r"""Return a callable for the get subscription method over gRPC. 

375 

376 Gets the configuration details of a subscription. 

377 

378 Returns: 

379 Callable[[~.GetSubscriptionRequest], 

380 ~.Subscription]: 

381 A function that, when called, will call the underlying RPC 

382 on the server. 

383 """ 

384 # Generate a "stub function" on-the-fly which will actually make 

385 # the request. 

386 # gRPC handles serialization and deserialization, so we just need 

387 # to pass in the functions for each. 

388 if "get_subscription" not in self._stubs: 

389 self._stubs["get_subscription"] = self._logged_channel.unary_unary( 

390 "/google.pubsub.v1.Subscriber/GetSubscription", 

391 request_serializer=pubsub.GetSubscriptionRequest.serialize, 

392 response_deserializer=pubsub.Subscription.deserialize, 

393 ) 

394 return self._stubs["get_subscription"] 

395 

396 @property 

397 def update_subscription( 

398 self, 

399 ) -> Callable[[pubsub.UpdateSubscriptionRequest], pubsub.Subscription]: 

400 r"""Return a callable for the update subscription method over gRPC. 

401 

402 Updates an existing subscription by updating the 

403 fields specified in the update mask. Note that certain 

404 properties of a subscription, such as its topic, are not 

405 modifiable. 

406 

407 Returns: 

408 Callable[[~.UpdateSubscriptionRequest], 

409 ~.Subscription]: 

410 A function that, when called, will call the underlying RPC 

411 on the server. 

412 """ 

413 # Generate a "stub function" on-the-fly which will actually make 

414 # the request. 

415 # gRPC handles serialization and deserialization, so we just need 

416 # to pass in the functions for each. 

417 if "update_subscription" not in self._stubs: 

418 self._stubs["update_subscription"] = self._logged_channel.unary_unary( 

419 "/google.pubsub.v1.Subscriber/UpdateSubscription", 

420 request_serializer=pubsub.UpdateSubscriptionRequest.serialize, 

421 response_deserializer=pubsub.Subscription.deserialize, 

422 ) 

423 return self._stubs["update_subscription"] 

424 

425 @property 

426 def list_subscriptions( 

427 self, 

428 ) -> Callable[[pubsub.ListSubscriptionsRequest], pubsub.ListSubscriptionsResponse]: 

429 r"""Return a callable for the list subscriptions method over gRPC. 

430 

431 Lists matching subscriptions. 

432 

433 Returns: 

434 Callable[[~.ListSubscriptionsRequest], 

435 ~.ListSubscriptionsResponse]: 

436 A function that, when called, will call the underlying RPC 

437 on the server. 

438 """ 

439 # Generate a "stub function" on-the-fly which will actually make 

440 # the request. 

441 # gRPC handles serialization and deserialization, so we just need 

442 # to pass in the functions for each. 

443 if "list_subscriptions" not in self._stubs: 

444 self._stubs["list_subscriptions"] = self._logged_channel.unary_unary( 

445 "/google.pubsub.v1.Subscriber/ListSubscriptions", 

446 request_serializer=pubsub.ListSubscriptionsRequest.serialize, 

447 response_deserializer=pubsub.ListSubscriptionsResponse.deserialize, 

448 ) 

449 return self._stubs["list_subscriptions"] 

450 

451 @property 

452 def delete_subscription( 

453 self, 

454 ) -> Callable[[pubsub.DeleteSubscriptionRequest], empty_pb2.Empty]: 

455 r"""Return a callable for the delete subscription method over gRPC. 

456 

457 Deletes an existing subscription. All messages retained in the 

458 subscription are immediately dropped. Calls to ``Pull`` after 

459 deletion will return ``NOT_FOUND``. After a subscription is 

460 deleted, a new one may be created with the same name, but the 

461 new one has no association with the old subscription or its 

462 topic unless the same topic is specified. 

463 

464 Returns: 

465 Callable[[~.DeleteSubscriptionRequest], 

466 ~.Empty]: 

467 A function that, when called, will call the underlying RPC 

468 on the server. 

469 """ 

470 # Generate a "stub function" on-the-fly which will actually make 

471 # the request. 

472 # gRPC handles serialization and deserialization, so we just need 

473 # to pass in the functions for each. 

474 if "delete_subscription" not in self._stubs: 

475 self._stubs["delete_subscription"] = self._logged_channel.unary_unary( 

476 "/google.pubsub.v1.Subscriber/DeleteSubscription", 

477 request_serializer=pubsub.DeleteSubscriptionRequest.serialize, 

478 response_deserializer=empty_pb2.Empty.FromString, 

479 ) 

480 return self._stubs["delete_subscription"] 

481 

482 @property 

483 def modify_ack_deadline( 

484 self, 

485 ) -> Callable[[pubsub.ModifyAckDeadlineRequest], empty_pb2.Empty]: 

486 r"""Return a callable for the modify ack deadline method over gRPC. 

487 

488 Modifies the ack deadline for a specific message. This method is 

489 useful to indicate that more time is needed to process a message 

490 by the subscriber, or to make the message available for 

491 redelivery if the processing was interrupted. Note that this 

492 does not modify the subscription-level ``ackDeadlineSeconds`` 

493 used for subsequent messages. 

494 

495 Returns: 

496 Callable[[~.ModifyAckDeadlineRequest], 

497 ~.Empty]: 

498 A function that, when called, will call the underlying RPC 

499 on the server. 

500 """ 

501 # Generate a "stub function" on-the-fly which will actually make 

502 # the request. 

503 # gRPC handles serialization and deserialization, so we just need 

504 # to pass in the functions for each. 

505 if "modify_ack_deadline" not in self._stubs: 

506 self._stubs["modify_ack_deadline"] = self._logged_channel.unary_unary( 

507 "/google.pubsub.v1.Subscriber/ModifyAckDeadline", 

508 request_serializer=pubsub.ModifyAckDeadlineRequest.serialize, 

509 response_deserializer=empty_pb2.Empty.FromString, 

510 ) 

511 return self._stubs["modify_ack_deadline"] 

512 

513 @property 

514 def acknowledge(self) -> Callable[[pubsub.AcknowledgeRequest], empty_pb2.Empty]: 

515 r"""Return a callable for the acknowledge method over gRPC. 

516 

517 Acknowledges the messages associated with the ``ack_ids`` in the 

518 ``AcknowledgeRequest``. The Pub/Sub system can remove the 

519 relevant messages from the subscription. 

520 

521 Acknowledging a message whose ack deadline has expired may 

522 succeed, but such a message may be redelivered later. 

523 Acknowledging a message more than once will not result in an 

524 error. 

525 

526 Returns: 

527 Callable[[~.AcknowledgeRequest], 

528 ~.Empty]: 

529 A function that, when called, will call the underlying RPC 

530 on the server. 

531 """ 

532 # Generate a "stub function" on-the-fly which will actually make 

533 # the request. 

534 # gRPC handles serialization and deserialization, so we just need 

535 # to pass in the functions for each. 

536 if "acknowledge" not in self._stubs: 

537 self._stubs["acknowledge"] = self._logged_channel.unary_unary( 

538 "/google.pubsub.v1.Subscriber/Acknowledge", 

539 request_serializer=pubsub.AcknowledgeRequest.serialize, 

540 response_deserializer=empty_pb2.Empty.FromString, 

541 ) 

542 return self._stubs["acknowledge"] 

543 

544 @property 

545 def pull(self) -> Callable[[pubsub.PullRequest], pubsub.PullResponse]: 

546 r"""Return a callable for the pull method over gRPC. 

547 

548 Pulls messages from the server. 

549 

550 Returns: 

551 Callable[[~.PullRequest], 

552 ~.PullResponse]: 

553 A function that, when called, will call the underlying RPC 

554 on the server. 

555 """ 

556 # Generate a "stub function" on-the-fly which will actually make 

557 # the request. 

558 # gRPC handles serialization and deserialization, so we just need 

559 # to pass in the functions for each. 

560 if "pull" not in self._stubs: 

561 self._stubs["pull"] = self._logged_channel.unary_unary( 

562 "/google.pubsub.v1.Subscriber/Pull", 

563 request_serializer=pubsub.PullRequest.serialize, 

564 response_deserializer=pubsub.PullResponse.deserialize, 

565 ) 

566 return self._stubs["pull"] 

567 

568 @property 

569 def streaming_pull( 

570 self, 

571 ) -> Callable[[pubsub.StreamingPullRequest], pubsub.StreamingPullResponse]: 

572 r"""Return a callable for the streaming pull method over gRPC. 

573 

574 Establishes a stream with the server, which sends messages down 

575 to the client. The client streams acknowledgements and ack 

576 deadline modifications back to the server. The server will close 

577 the stream and return the status on any error. The server may 

578 close the stream with status ``UNAVAILABLE`` to reassign 

579 server-side resources, in which case, the client should 

580 re-establish the stream. Flow control can be achieved by 

581 configuring the underlying RPC channel. 

582 

583 Returns: 

584 Callable[[~.StreamingPullRequest], 

585 ~.StreamingPullResponse]: 

586 A function that, when called, will call the underlying RPC 

587 on the server. 

588 """ 

589 # Generate a "stub function" on-the-fly which will actually make 

590 # the request. 

591 # gRPC handles serialization and deserialization, so we just need 

592 # to pass in the functions for each. 

593 if "streaming_pull" not in self._stubs: 

594 self._stubs["streaming_pull"] = self._logged_channel.stream_stream( 

595 "/google.pubsub.v1.Subscriber/StreamingPull", 

596 request_serializer=pubsub.StreamingPullRequest.serialize, 

597 response_deserializer=pubsub.StreamingPullResponse.deserialize, 

598 ) 

599 return self._stubs["streaming_pull"] 

600 

601 @property 

602 def modify_push_config( 

603 self, 

604 ) -> Callable[[pubsub.ModifyPushConfigRequest], empty_pb2.Empty]: 

605 r"""Return a callable for the modify push config method over gRPC. 

606 

607 Modifies the ``PushConfig`` for a specified subscription. 

608 

609 This may be used to change a push subscription to a pull one 

610 (signified by an empty ``PushConfig``) or vice versa, or change 

611 the endpoint URL and other attributes of a push subscription. 

612 Messages will accumulate for delivery continuously through the 

613 call regardless of changes to the ``PushConfig``. 

614 

615 Returns: 

616 Callable[[~.ModifyPushConfigRequest], 

617 ~.Empty]: 

618 A function that, when called, will call the underlying RPC 

619 on the server. 

620 """ 

621 # Generate a "stub function" on-the-fly which will actually make 

622 # the request. 

623 # gRPC handles serialization and deserialization, so we just need 

624 # to pass in the functions for each. 

625 if "modify_push_config" not in self._stubs: 

626 self._stubs["modify_push_config"] = self._logged_channel.unary_unary( 

627 "/google.pubsub.v1.Subscriber/ModifyPushConfig", 

628 request_serializer=pubsub.ModifyPushConfigRequest.serialize, 

629 response_deserializer=empty_pb2.Empty.FromString, 

630 ) 

631 return self._stubs["modify_push_config"] 

632 

633 @property 

634 def get_snapshot(self) -> Callable[[pubsub.GetSnapshotRequest], pubsub.Snapshot]: 

635 r"""Return a callable for the get snapshot method over gRPC. 

636 

637 Gets the configuration details of a snapshot. Snapshots are used 

638 in 

639 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

640 operations, which allow you to manage message acknowledgments in 

641 bulk. That is, you can set the acknowledgment state of messages 

642 in an existing subscription to the state captured by a snapshot. 

643 

644 Returns: 

645 Callable[[~.GetSnapshotRequest], 

646 ~.Snapshot]: 

647 A function that, when called, will call the underlying RPC 

648 on the server. 

649 """ 

650 # Generate a "stub function" on-the-fly which will actually make 

651 # the request. 

652 # gRPC handles serialization and deserialization, so we just need 

653 # to pass in the functions for each. 

654 if "get_snapshot" not in self._stubs: 

655 self._stubs["get_snapshot"] = self._logged_channel.unary_unary( 

656 "/google.pubsub.v1.Subscriber/GetSnapshot", 

657 request_serializer=pubsub.GetSnapshotRequest.serialize, 

658 response_deserializer=pubsub.Snapshot.deserialize, 

659 ) 

660 return self._stubs["get_snapshot"] 

661 

662 @property 

663 def list_snapshots( 

664 self, 

665 ) -> Callable[[pubsub.ListSnapshotsRequest], pubsub.ListSnapshotsResponse]: 

666 r"""Return a callable for the list snapshots method over gRPC. 

667 

668 Lists the existing snapshots. Snapshots are used in 

669 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

670 operations, which allow you to manage message acknowledgments in 

671 bulk. That is, you can set the acknowledgment state of messages 

672 in an existing subscription to the state captured by a snapshot. 

673 

674 Returns: 

675 Callable[[~.ListSnapshotsRequest], 

676 ~.ListSnapshotsResponse]: 

677 A function that, when called, will call the underlying RPC 

678 on the server. 

679 """ 

680 # Generate a "stub function" on-the-fly which will actually make 

681 # the request. 

682 # gRPC handles serialization and deserialization, so we just need 

683 # to pass in the functions for each. 

684 if "list_snapshots" not in self._stubs: 

685 self._stubs["list_snapshots"] = self._logged_channel.unary_unary( 

686 "/google.pubsub.v1.Subscriber/ListSnapshots", 

687 request_serializer=pubsub.ListSnapshotsRequest.serialize, 

688 response_deserializer=pubsub.ListSnapshotsResponse.deserialize, 

689 ) 

690 return self._stubs["list_snapshots"] 

691 

692 @property 

693 def create_snapshot( 

694 self, 

695 ) -> Callable[[pubsub.CreateSnapshotRequest], pubsub.Snapshot]: 

696 r"""Return a callable for the create snapshot method over gRPC. 

697 

698 Creates a snapshot from the requested subscription. Snapshots 

699 are used in 

700 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

701 operations, which allow you to manage message acknowledgments in 

702 bulk. That is, you can set the acknowledgment state of messages 

703 in an existing subscription to the state captured by a snapshot. 

704 If the snapshot already exists, returns ``ALREADY_EXISTS``. If 

705 the requested subscription doesn't exist, returns ``NOT_FOUND``. 

706 If the backlog in the subscription is too old -- and the 

707 resulting snapshot would expire in less than 1 hour -- then 

708 ``FAILED_PRECONDITION`` is returned. See also the 

709 ``Snapshot.expire_time`` field. If the name is not provided in 

710 the request, the server will assign a random name for this 

711 snapshot on the same project as the subscription, conforming to 

712 the [resource name format] 

713 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 

714 The generated name is populated in the returned Snapshot object. 

715 Note that for REST API requests, you must specify a name in the 

716 request. 

717 

718 Returns: 

719 Callable[[~.CreateSnapshotRequest], 

720 ~.Snapshot]: 

721 A function that, when called, will call the underlying RPC 

722 on the server. 

723 """ 

724 # Generate a "stub function" on-the-fly which will actually make 

725 # the request. 

726 # gRPC handles serialization and deserialization, so we just need 

727 # to pass in the functions for each. 

728 if "create_snapshot" not in self._stubs: 

729 self._stubs["create_snapshot"] = self._logged_channel.unary_unary( 

730 "/google.pubsub.v1.Subscriber/CreateSnapshot", 

731 request_serializer=pubsub.CreateSnapshotRequest.serialize, 

732 response_deserializer=pubsub.Snapshot.deserialize, 

733 ) 

734 return self._stubs["create_snapshot"] 

735 

736 @property 

737 def update_snapshot( 

738 self, 

739 ) -> Callable[[pubsub.UpdateSnapshotRequest], pubsub.Snapshot]: 

740 r"""Return a callable for the update snapshot method over gRPC. 

741 

742 Updates an existing snapshot by updating the fields specified in 

743 the update mask. Snapshots are used in 

744 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

745 operations, which allow you to manage message acknowledgments in 

746 bulk. That is, you can set the acknowledgment state of messages 

747 in an existing subscription to the state captured by a snapshot. 

748 

749 Returns: 

750 Callable[[~.UpdateSnapshotRequest], 

751 ~.Snapshot]: 

752 A function that, when called, will call the underlying RPC 

753 on the server. 

754 """ 

755 # Generate a "stub function" on-the-fly which will actually make 

756 # the request. 

757 # gRPC handles serialization and deserialization, so we just need 

758 # to pass in the functions for each. 

759 if "update_snapshot" not in self._stubs: 

760 self._stubs["update_snapshot"] = self._logged_channel.unary_unary( 

761 "/google.pubsub.v1.Subscriber/UpdateSnapshot", 

762 request_serializer=pubsub.UpdateSnapshotRequest.serialize, 

763 response_deserializer=pubsub.Snapshot.deserialize, 

764 ) 

765 return self._stubs["update_snapshot"] 

766 

767 @property 

768 def delete_snapshot( 

769 self, 

770 ) -> Callable[[pubsub.DeleteSnapshotRequest], empty_pb2.Empty]: 

771 r"""Return a callable for the delete snapshot method over gRPC. 

772 

773 Removes an existing snapshot. Snapshots are used in [Seek] 

774 (https://cloud.google.com/pubsub/docs/replay-overview) 

775 operations, which allow you to manage message acknowledgments in 

776 bulk. That is, you can set the acknowledgment state of messages 

777 in an existing subscription to the state captured by a snapshot. 

778 When the snapshot is deleted, all messages retained in the 

779 snapshot are immediately dropped. After a snapshot is deleted, a 

780 new one may be created with the same name, but the new one has 

781 no association with the old snapshot or its subscription, unless 

782 the same subscription is specified. 

783 

784 Returns: 

785 Callable[[~.DeleteSnapshotRequest], 

786 ~.Empty]: 

787 A function that, when called, will call the underlying RPC 

788 on the server. 

789 """ 

790 # Generate a "stub function" on-the-fly which will actually make 

791 # the request. 

792 # gRPC handles serialization and deserialization, so we just need 

793 # to pass in the functions for each. 

794 if "delete_snapshot" not in self._stubs: 

795 self._stubs["delete_snapshot"] = self._logged_channel.unary_unary( 

796 "/google.pubsub.v1.Subscriber/DeleteSnapshot", 

797 request_serializer=pubsub.DeleteSnapshotRequest.serialize, 

798 response_deserializer=empty_pb2.Empty.FromString, 

799 ) 

800 return self._stubs["delete_snapshot"] 

801 

802 @property 

803 def seek(self) -> Callable[[pubsub.SeekRequest], pubsub.SeekResponse]: 

804 r"""Return a callable for the seek method over gRPC. 

805 

806 Seeks an existing subscription to a point in time or to a given 

807 snapshot, whichever is provided in the request. Snapshots are 

808 used in [Seek] 

809 (https://cloud.google.com/pubsub/docs/replay-overview) 

810 operations, which allow you to manage message acknowledgments in 

811 bulk. That is, you can set the acknowledgment state of messages 

812 in an existing subscription to the state captured by a snapshot. 

813 Note that both the subscription and the snapshot must be on the 

814 same topic. 

815 

816 Returns: 

817 Callable[[~.SeekRequest], 

818 ~.SeekResponse]: 

819 A function that, when called, will call the underlying RPC 

820 on the server. 

821 """ 

822 # Generate a "stub function" on-the-fly which will actually make 

823 # the request. 

824 # gRPC handles serialization and deserialization, so we just need 

825 # to pass in the functions for each. 

826 if "seek" not in self._stubs: 

827 self._stubs["seek"] = self._logged_channel.unary_unary( 

828 "/google.pubsub.v1.Subscriber/Seek", 

829 request_serializer=pubsub.SeekRequest.serialize, 

830 response_deserializer=pubsub.SeekResponse.deserialize, 

831 ) 

832 return self._stubs["seek"] 

833 

834 def close(self): 

835 self._logged_channel.close() 

836 

837 @property 

838 def set_iam_policy( 

839 self, 

840 ) -> Callable[[iam_policy_pb2.SetIamPolicyRequest], policy_pb2.Policy]: 

841 r"""Return a callable for the set iam policy method over gRPC. 

842 Sets the IAM access control policy on the specified 

843 function. Replaces any existing policy. 

844 Returns: 

845 Callable[[~.SetIamPolicyRequest], 

846 ~.Policy]: 

847 A function that, when called, will call the underlying RPC 

848 on the server. 

849 """ 

850 # Generate a "stub function" on-the-fly which will actually make 

851 # the request. 

852 # gRPC handles serialization and deserialization, so we just need 

853 # to pass in the functions for each. 

854 if "set_iam_policy" not in self._stubs: 

855 self._stubs["set_iam_policy"] = self._logged_channel.unary_unary( 

856 "/google.iam.v1.IAMPolicy/SetIamPolicy", 

857 request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString, 

858 response_deserializer=policy_pb2.Policy.FromString, 

859 ) 

860 return self._stubs["set_iam_policy"] 

861 

862 @property 

863 def get_iam_policy( 

864 self, 

865 ) -> Callable[[iam_policy_pb2.GetIamPolicyRequest], policy_pb2.Policy]: 

866 r"""Return a callable for the get iam policy method over gRPC. 

867 Gets the IAM access control policy for a function. 

868 Returns an empty policy if the function exists and does 

869 not have a policy set. 

870 Returns: 

871 Callable[[~.GetIamPolicyRequest], 

872 ~.Policy]: 

873 A function that, when called, will call the underlying RPC 

874 on the server. 

875 """ 

876 # Generate a "stub function" on-the-fly which will actually make 

877 # the request. 

878 # gRPC handles serialization and deserialization, so we just need 

879 # to pass in the functions for each. 

880 if "get_iam_policy" not in self._stubs: 

881 self._stubs["get_iam_policy"] = self._logged_channel.unary_unary( 

882 "/google.iam.v1.IAMPolicy/GetIamPolicy", 

883 request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString, 

884 response_deserializer=policy_pb2.Policy.FromString, 

885 ) 

886 return self._stubs["get_iam_policy"] 

887 

888 @property 

889 def test_iam_permissions( 

890 self, 

891 ) -> Callable[ 

892 [iam_policy_pb2.TestIamPermissionsRequest], 

893 iam_policy_pb2.TestIamPermissionsResponse, 

894 ]: 

895 r"""Return a callable for the test iam permissions method over gRPC. 

896 Tests the specified permissions against the IAM access control 

897 policy for a function. If the function does not exist, this will 

898 return an empty set of permissions, not a NOT_FOUND error. 

899 Returns: 

900 Callable[[~.TestIamPermissionsRequest], 

901 ~.TestIamPermissionsResponse]: 

902 A function that, when called, will call the underlying RPC 

903 on the server. 

904 """ 

905 # Generate a "stub function" on-the-fly which will actually make 

906 # the request. 

907 # gRPC handles serialization and deserialization, so we just need 

908 # to pass in the functions for each. 

909 if "test_iam_permissions" not in self._stubs: 

910 self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary( 

911 "/google.iam.v1.IAMPolicy/TestIamPermissions", 

912 request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString, 

913 response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString, 

914 ) 

915 return self._stubs["test_iam_permissions"] 

916 

917 @property 

918 def kind(self) -> str: 

919 return "grpc" 

920 

921 

922__all__ = ("SubscriberGrpcTransport",)