Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/pubsub_v1/services/subscriber/transports/grpc.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

161 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import json 

17import logging as std_logging 

18import pickle 

19import warnings 

20from typing import Callable, Dict, Optional, Sequence, Tuple, Union 

21 

22from google.api_core import grpc_helpers 

23from google.api_core import gapic_v1 

24import google.auth # type: ignore 

25from google.auth import credentials as ga_credentials # type: ignore 

26from google.auth.transport.grpc import SslCredentials # type: ignore 

27from google.protobuf.json_format import MessageToJson 

28import google.protobuf.message 

29 

30import grpc # type: ignore 

31import proto # type: ignore 

32 

33from google.iam.v1 import iam_policy_pb2 # type: ignore 

34from google.iam.v1 import policy_pb2 # type: ignore 

35from google.protobuf import empty_pb2 # type: ignore 

36from google.pubsub_v1.types import pubsub 

37from .base import SubscriberTransport, DEFAULT_CLIENT_INFO 

38 

39try: 

40 from google.api_core import client_logging # type: ignore 

41 

42 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

43except ImportError: # pragma: NO COVER 

44 CLIENT_LOGGING_SUPPORTED = False 

45 

46_LOGGER = std_logging.getLogger(__name__) 

47 

48 

49class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER 

50 def intercept_unary_unary(self, continuation, client_call_details, request): 

51 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

52 std_logging.DEBUG 

53 ) 

54 if logging_enabled: # pragma: NO COVER 

55 request_metadata = client_call_details.metadata 

56 if isinstance(request, proto.Message): 

57 request_payload = type(request).to_json(request) 

58 elif isinstance(request, google.protobuf.message.Message): 

59 request_payload = MessageToJson(request) 

60 else: 

61 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" 

62 

63 request_metadata = { 

64 key: value.decode("utf-8") if isinstance(value, bytes) else value 

65 for key, value in request_metadata 

66 } 

67 grpc_request = { 

68 "payload": request_payload, 

69 "requestMethod": "grpc", 

70 "metadata": dict(request_metadata), 

71 } 

72 _LOGGER.debug( 

73 f"Sending request for {client_call_details.method}", 

74 extra={ 

75 "serviceName": "google.pubsub.v1.Subscriber", 

76 "rpcName": str(client_call_details.method), 

77 "request": grpc_request, 

78 "metadata": grpc_request["metadata"], 

79 }, 

80 ) 

81 response = continuation(client_call_details, request) 

82 if logging_enabled: # pragma: NO COVER 

83 response_metadata = response.trailing_metadata() 

84 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples 

85 metadata = ( 

86 dict([(k, str(v)) for k, v in response_metadata]) 

87 if response_metadata 

88 else None 

89 ) 

90 result = response.result() 

91 if isinstance(result, proto.Message): 

92 response_payload = type(result).to_json(result) 

93 elif isinstance(result, google.protobuf.message.Message): 

94 response_payload = MessageToJson(result) 

95 else: 

96 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" 

97 grpc_response = { 

98 "payload": response_payload, 

99 "metadata": metadata, 

100 "status": "OK", 

101 } 

102 _LOGGER.debug( 

103 f"Received response for {client_call_details.method}.", 

104 extra={ 

105 "serviceName": "google.pubsub.v1.Subscriber", 

106 "rpcName": client_call_details.method, 

107 "response": grpc_response, 

108 "metadata": grpc_response["metadata"], 

109 }, 

110 ) 

111 return response 

112 

113 

114class SubscriberGrpcTransport(SubscriberTransport): 

115 """gRPC backend transport for Subscriber. 

116 

117 The service that an application uses to manipulate subscriptions and 

118 to consume messages from a subscription via the ``Pull`` method or 

119 by establishing a bi-directional stream using the ``StreamingPull`` 

120 method. 

121 

122 This class defines the same methods as the primary client, so the 

123 primary client can load the underlying transport implementation 

124 and call it. 

125 

126 It sends protocol buffers over the wire using gRPC (which is built on 

127 top of HTTP/2); the ``grpcio`` package must be installed. 

128 """ 

129 

130 _stubs: Dict[str, Callable] 

131 

132 def __init__( 

133 self, 

134 *, 

135 host: str = "pubsub.googleapis.com", 

136 credentials: Optional[ga_credentials.Credentials] = None, 

137 credentials_file: Optional[str] = None, 

138 scopes: Optional[Sequence[str]] = None, 

139 channel: Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]] = None, 

140 api_mtls_endpoint: Optional[str] = None, 

141 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

142 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 

143 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

144 quota_project_id: Optional[str] = None, 

145 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

146 always_use_jwt_access: Optional[bool] = False, 

147 api_audience: Optional[str] = None, 

148 ) -> None: 

149 """Instantiate the transport. 

150 

151 Args: 

152 host (Optional[str]): 

153 The hostname to connect to (default: 'pubsub.googleapis.com'). 

154 credentials (Optional[google.auth.credentials.Credentials]): The 

155 authorization credentials to attach to requests. These 

156 credentials identify the application to the service; if none 

157 are specified, the client will attempt to ascertain the 

158 credentials from the environment. 

159 This argument is ignored if a ``channel`` instance is provided. 

160 credentials_file (Optional[str]): Deprecated. A file with credentials that can 

161 be loaded with :func:`google.auth.load_credentials_from_file`. 

162 This argument is ignored if a ``channel`` instance is provided. 

163 This argument will be removed in the next major version of this library. 

164 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

165 ignored if a ``channel`` instance is provided. 

166 channel (Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]]): 

167 A ``Channel`` instance through which to make calls, or a Callable 

168 that constructs and returns one. If set to None, ``self.create_channel`` 

169 is used to create the channel. If a Callable is given, it will be called 

170 with the same arguments as used in ``self.create_channel``. 

171 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 

172 If provided, it overrides the ``host`` argument and tries to create 

173 a mutual TLS channel with client SSL credentials from 

174 ``client_cert_source`` or application default SSL credentials. 

175 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 

176 Deprecated. A callback to provide client SSL certificate bytes and 

177 private key bytes, both in PEM format. It is ignored if 

178 ``api_mtls_endpoint`` is None. 

179 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 

180 for the grpc channel. It is ignored if a ``channel`` instance is provided. 

181 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 

182 A callback to provide client certificate bytes and private key bytes, 

183 both in PEM format. It is used to configure a mutual TLS channel. It is 

184 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided. 

185 quota_project_id (Optional[str]): An optional project to use for billing 

186 and quota. 

187 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

188 The client info used to send a user-agent string along with 

189 API requests. If ``None``, then default info will be used. 

190 Generally, you only need to set this if you're developing 

191 your own client library. 

192 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

193 be used for service account credentials. 

194 

195 Raises: 

196 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport 

197 creation failed for any reason. 

198 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

199 and ``credentials_file`` are passed. 

200 """ 

201 self._grpc_channel = None 

202 self._ssl_channel_credentials = ssl_channel_credentials 

203 self._stubs: Dict[str, Callable] = {} 

204 

205 if api_mtls_endpoint: 

206 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 

207 if client_cert_source: 

208 warnings.warn("client_cert_source is deprecated", DeprecationWarning) 

209 

210 if isinstance(channel, grpc.Channel): 

211 # Ignore credentials if a channel was passed. 

212 credentials = None 

213 self._ignore_credentials = True 

214 # If a channel was explicitly provided, set it. 

215 self._grpc_channel = channel 

216 self._ssl_channel_credentials = None 

217 

218 else: 

219 if api_mtls_endpoint: 

220 host = api_mtls_endpoint 

221 

222 # Create SSL credentials with client_cert_source or application 

223 # default SSL credentials. 

224 if client_cert_source: 

225 cert, key = client_cert_source() 

226 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

227 certificate_chain=cert, private_key=key 

228 ) 

229 else: 

230 self._ssl_channel_credentials = SslCredentials().ssl_credentials 

231 

232 else: 

233 if client_cert_source_for_mtls and not ssl_channel_credentials: 

234 cert, key = client_cert_source_for_mtls() 

235 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

236 certificate_chain=cert, private_key=key 

237 ) 

238 

239 # The base transport sets the host, credentials and scopes 

240 super().__init__( 

241 host=host, 

242 credentials=credentials, 

243 credentials_file=credentials_file, 

244 scopes=scopes, 

245 quota_project_id=quota_project_id, 

246 client_info=client_info, 

247 always_use_jwt_access=always_use_jwt_access, 

248 api_audience=api_audience, 

249 ) 

250 

251 if not self._grpc_channel: 

252 # initialize with the provided callable or the default channel 

253 channel_init = channel or type(self).create_channel 

254 self._grpc_channel = channel_init( 

255 self._host, 

256 # use the credentials which are saved 

257 credentials=self._credentials, 

258 # Set ``credentials_file`` to ``None`` here as 

259 # the credentials that we saved earlier should be used. 

260 credentials_file=None, 

261 scopes=self._scopes, 

262 ssl_credentials=self._ssl_channel_credentials, 

263 quota_project_id=quota_project_id, 

264 options=[ 

265 ("grpc.max_send_message_length", -1), 

266 ("grpc.max_receive_message_length", -1), 

267 ("grpc.max_metadata_size", 4 * 1024 * 1024), 

268 ("grpc.keepalive_time_ms", 30000), 

269 ], 

270 ) 

271 

272 self._interceptor = _LoggingClientInterceptor() 

273 self._logged_channel = grpc.intercept_channel( 

274 self._grpc_channel, self._interceptor 

275 ) 

276 

277 # Wrap messages. This must be done after self._logged_channel exists 

278 self._prep_wrapped_messages(client_info) 

279 

280 @classmethod 

281 def create_channel( 

282 cls, 

283 host: str = "pubsub.googleapis.com", 

284 credentials: Optional[ga_credentials.Credentials] = None, 

285 credentials_file: Optional[str] = None, 

286 scopes: Optional[Sequence[str]] = None, 

287 quota_project_id: Optional[str] = None, 

288 **kwargs, 

289 ) -> grpc.Channel: 

290 """Create and return a gRPC channel object. 

291 Args: 

292 host (Optional[str]): The host for the channel to use. 

293 credentials (Optional[~.Credentials]): The 

294 authorization credentials to attach to requests. These 

295 credentials identify this application to the service. If 

296 none are specified, the client will attempt to ascertain 

297 the credentials from the environment. 

298 credentials_file (Optional[str]): Deprecated. A file with credentials that can 

299 be loaded with :func:`google.auth.load_credentials_from_file`. 

300 This argument is mutually exclusive with credentials. This argument will be 

301 removed in the next major version of this library. 

302 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

303 service. These are only used when credentials are not specified and 

304 are passed to :func:`google.auth.default`. 

305 quota_project_id (Optional[str]): An optional project to use for billing 

306 and quota. 

307 kwargs (Optional[dict]): Keyword arguments, which are passed to the 

308 channel creation. 

309 Returns: 

310 grpc.Channel: A gRPC channel object. 

311 

312 Raises: 

313 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

314 and ``credentials_file`` are passed. 

315 """ 

316 

317 return grpc_helpers.create_channel( 

318 host, 

319 credentials=credentials, 

320 credentials_file=credentials_file, 

321 quota_project_id=quota_project_id, 

322 default_scopes=cls.AUTH_SCOPES, 

323 scopes=scopes, 

324 default_host=cls.DEFAULT_HOST, 

325 **kwargs, 

326 ) 

327 

328 @property 

329 def grpc_channel(self) -> grpc.Channel: 

330 """Return the channel designed to connect to this service.""" 

331 return self._grpc_channel 

332 

333 @property 

334 def create_subscription( 

335 self, 

336 ) -> Callable[[pubsub.Subscription], pubsub.Subscription]: 

337 r"""Return a callable for the create subscription method over gRPC. 

338 

339 Creates a subscription to a given topic. See the [resource name 

340 rules] 

341 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 

342 If the subscription already exists, returns ``ALREADY_EXISTS``. 

343 If the corresponding topic doesn't exist, returns ``NOT_FOUND``. 

344 

345 If the name is not provided in the request, the server will 

346 assign a random name for this subscription on the same project 

347 as the topic, conforming to the [resource name format] 

348 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 

349 The generated name is populated in the returned Subscription 

350 object. Note that for REST API requests, you must specify a name 

351 in the request. 

352 

353 Returns: 

354 Callable[[~.Subscription], 

355 ~.Subscription]: 

356 A function that, when called, will call the underlying RPC 

357 on the server. 

358 """ 

359 # Generate a "stub function" on-the-fly which will actually make 

360 # the request. 

361 # gRPC handles serialization and deserialization, so we just need 

362 # to pass in the functions for each. 

363 if "create_subscription" not in self._stubs: 

364 self._stubs["create_subscription"] = self._logged_channel.unary_unary( 

365 "/google.pubsub.v1.Subscriber/CreateSubscription", 

366 request_serializer=pubsub.Subscription.serialize, 

367 response_deserializer=pubsub.Subscription.deserialize, 

368 ) 

369 return self._stubs["create_subscription"] 

370 

371 @property 

372 def get_subscription( 

373 self, 

374 ) -> Callable[[pubsub.GetSubscriptionRequest], pubsub.Subscription]: 

375 r"""Return a callable for the get subscription method over gRPC. 

376 

377 Gets the configuration details of a subscription. 

378 

379 Returns: 

380 Callable[[~.GetSubscriptionRequest], 

381 ~.Subscription]: 

382 A function that, when called, will call the underlying RPC 

383 on the server. 

384 """ 

385 # Generate a "stub function" on-the-fly which will actually make 

386 # the request. 

387 # gRPC handles serialization and deserialization, so we just need 

388 # to pass in the functions for each. 

389 if "get_subscription" not in self._stubs: 

390 self._stubs["get_subscription"] = self._logged_channel.unary_unary( 

391 "/google.pubsub.v1.Subscriber/GetSubscription", 

392 request_serializer=pubsub.GetSubscriptionRequest.serialize, 

393 response_deserializer=pubsub.Subscription.deserialize, 

394 ) 

395 return self._stubs["get_subscription"] 

396 

397 @property 

398 def update_subscription( 

399 self, 

400 ) -> Callable[[pubsub.UpdateSubscriptionRequest], pubsub.Subscription]: 

401 r"""Return a callable for the update subscription method over gRPC. 

402 

403 Updates an existing subscription by updating the 

404 fields specified in the update mask. Note that certain 

405 properties of a subscription, such as its topic, are not 

406 modifiable. 

407 

408 Returns: 

409 Callable[[~.UpdateSubscriptionRequest], 

410 ~.Subscription]: 

411 A function that, when called, will call the underlying RPC 

412 on the server. 

413 """ 

414 # Generate a "stub function" on-the-fly which will actually make 

415 # the request. 

416 # gRPC handles serialization and deserialization, so we just need 

417 # to pass in the functions for each. 

418 if "update_subscription" not in self._stubs: 

419 self._stubs["update_subscription"] = self._logged_channel.unary_unary( 

420 "/google.pubsub.v1.Subscriber/UpdateSubscription", 

421 request_serializer=pubsub.UpdateSubscriptionRequest.serialize, 

422 response_deserializer=pubsub.Subscription.deserialize, 

423 ) 

424 return self._stubs["update_subscription"] 

425 

426 @property 

427 def list_subscriptions( 

428 self, 

429 ) -> Callable[[pubsub.ListSubscriptionsRequest], pubsub.ListSubscriptionsResponse]: 

430 r"""Return a callable for the list subscriptions method over gRPC. 

431 

432 Lists matching subscriptions. 

433 

434 Returns: 

435 Callable[[~.ListSubscriptionsRequest], 

436 ~.ListSubscriptionsResponse]: 

437 A function that, when called, will call the underlying RPC 

438 on the server. 

439 """ 

440 # Generate a "stub function" on-the-fly which will actually make 

441 # the request. 

442 # gRPC handles serialization and deserialization, so we just need 

443 # to pass in the functions for each. 

444 if "list_subscriptions" not in self._stubs: 

445 self._stubs["list_subscriptions"] = self._logged_channel.unary_unary( 

446 "/google.pubsub.v1.Subscriber/ListSubscriptions", 

447 request_serializer=pubsub.ListSubscriptionsRequest.serialize, 

448 response_deserializer=pubsub.ListSubscriptionsResponse.deserialize, 

449 ) 

450 return self._stubs["list_subscriptions"] 

451 

452 @property 

453 def delete_subscription( 

454 self, 

455 ) -> Callable[[pubsub.DeleteSubscriptionRequest], empty_pb2.Empty]: 

456 r"""Return a callable for the delete subscription method over gRPC. 

457 

458 Deletes an existing subscription. All messages retained in the 

459 subscription are immediately dropped. Calls to ``Pull`` after 

460 deletion will return ``NOT_FOUND``. After a subscription is 

461 deleted, a new one may be created with the same name, but the 

462 new one has no association with the old subscription or its 

463 topic unless the same topic is specified. 

464 

465 Returns: 

466 Callable[[~.DeleteSubscriptionRequest], 

467 ~.Empty]: 

468 A function that, when called, will call the underlying RPC 

469 on the server. 

470 """ 

471 # Generate a "stub function" on-the-fly which will actually make 

472 # the request. 

473 # gRPC handles serialization and deserialization, so we just need 

474 # to pass in the functions for each. 

475 if "delete_subscription" not in self._stubs: 

476 self._stubs["delete_subscription"] = self._logged_channel.unary_unary( 

477 "/google.pubsub.v1.Subscriber/DeleteSubscription", 

478 request_serializer=pubsub.DeleteSubscriptionRequest.serialize, 

479 response_deserializer=empty_pb2.Empty.FromString, 

480 ) 

481 return self._stubs["delete_subscription"] 

482 

483 @property 

484 def modify_ack_deadline( 

485 self, 

486 ) -> Callable[[pubsub.ModifyAckDeadlineRequest], empty_pb2.Empty]: 

487 r"""Return a callable for the modify ack deadline method over gRPC. 

488 

489 Modifies the ack deadline for a specific message. This method is 

490 useful to indicate that more time is needed to process a message 

491 by the subscriber, or to make the message available for 

492 redelivery if the processing was interrupted. Note that this 

493 does not modify the subscription-level ``ackDeadlineSeconds`` 

494 used for subsequent messages. 

495 

496 Returns: 

497 Callable[[~.ModifyAckDeadlineRequest], 

498 ~.Empty]: 

499 A function that, when called, will call the underlying RPC 

500 on the server. 

501 """ 

502 # Generate a "stub function" on-the-fly which will actually make 

503 # the request. 

504 # gRPC handles serialization and deserialization, so we just need 

505 # to pass in the functions for each. 

506 if "modify_ack_deadline" not in self._stubs: 

507 self._stubs["modify_ack_deadline"] = self._logged_channel.unary_unary( 

508 "/google.pubsub.v1.Subscriber/ModifyAckDeadline", 

509 request_serializer=pubsub.ModifyAckDeadlineRequest.serialize, 

510 response_deserializer=empty_pb2.Empty.FromString, 

511 ) 

512 return self._stubs["modify_ack_deadline"] 

513 

514 @property 

515 def acknowledge(self) -> Callable[[pubsub.AcknowledgeRequest], empty_pb2.Empty]: 

516 r"""Return a callable for the acknowledge method over gRPC. 

517 

518 Acknowledges the messages associated with the ``ack_ids`` in the 

519 ``AcknowledgeRequest``. The Pub/Sub system can remove the 

520 relevant messages from the subscription. 

521 

522 Acknowledging a message whose ack deadline has expired may 

523 succeed, but such a message may be redelivered later. 

524 Acknowledging a message more than once will not result in an 

525 error. 

526 

527 Returns: 

528 Callable[[~.AcknowledgeRequest], 

529 ~.Empty]: 

530 A function that, when called, will call the underlying RPC 

531 on the server. 

532 """ 

533 # Generate a "stub function" on-the-fly which will actually make 

534 # the request. 

535 # gRPC handles serialization and deserialization, so we just need 

536 # to pass in the functions for each. 

537 if "acknowledge" not in self._stubs: 

538 self._stubs["acknowledge"] = self._logged_channel.unary_unary( 

539 "/google.pubsub.v1.Subscriber/Acknowledge", 

540 request_serializer=pubsub.AcknowledgeRequest.serialize, 

541 response_deserializer=empty_pb2.Empty.FromString, 

542 ) 

543 return self._stubs["acknowledge"] 

544 

545 @property 

546 def pull(self) -> Callable[[pubsub.PullRequest], pubsub.PullResponse]: 

547 r"""Return a callable for the pull method over gRPC. 

548 

549 Pulls messages from the server. 

550 

551 Returns: 

552 Callable[[~.PullRequest], 

553 ~.PullResponse]: 

554 A function that, when called, will call the underlying RPC 

555 on the server. 

556 """ 

557 # Generate a "stub function" on-the-fly which will actually make 

558 # the request. 

559 # gRPC handles serialization and deserialization, so we just need 

560 # to pass in the functions for each. 

561 if "pull" not in self._stubs: 

562 self._stubs["pull"] = self._logged_channel.unary_unary( 

563 "/google.pubsub.v1.Subscriber/Pull", 

564 request_serializer=pubsub.PullRequest.serialize, 

565 response_deserializer=pubsub.PullResponse.deserialize, 

566 ) 

567 return self._stubs["pull"] 

568 

569 @property 

570 def streaming_pull( 

571 self, 

572 ) -> Callable[[pubsub.StreamingPullRequest], pubsub.StreamingPullResponse]: 

573 r"""Return a callable for the streaming pull method over gRPC. 

574 

575 Establishes a stream with the server, which sends messages down 

576 to the client. The client streams acknowledgments and ack 

577 deadline modifications back to the server. The server will close 

578 the stream and return the status on any error. The server may 

579 close the stream with status ``UNAVAILABLE`` to reassign 

580 server-side resources, in which case, the client should 

581 re-establish the stream. Flow control can be achieved by 

582 configuring the underlying RPC channel. 

583 

584 Returns: 

585 Callable[[~.StreamingPullRequest], 

586 ~.StreamingPullResponse]: 

587 A function that, when called, will call the underlying RPC 

588 on the server. 

589 """ 

590 # Generate a "stub function" on-the-fly which will actually make 

591 # the request. 

592 # gRPC handles serialization and deserialization, so we just need 

593 # to pass in the functions for each. 

594 if "streaming_pull" not in self._stubs: 

595 self._stubs["streaming_pull"] = self._logged_channel.stream_stream( 

596 "/google.pubsub.v1.Subscriber/StreamingPull", 

597 request_serializer=pubsub.StreamingPullRequest.serialize, 

598 response_deserializer=pubsub.StreamingPullResponse.deserialize, 

599 ) 

600 return self._stubs["streaming_pull"] 

601 

602 @property 

603 def modify_push_config( 

604 self, 

605 ) -> Callable[[pubsub.ModifyPushConfigRequest], empty_pb2.Empty]: 

606 r"""Return a callable for the modify push config method over gRPC. 

607 

608 Modifies the ``PushConfig`` for a specified subscription. 

609 

610 This may be used to change a push subscription to a pull one 

611 (signified by an empty ``PushConfig``) or vice versa, or change 

612 the endpoint URL and other attributes of a push subscription. 

613 Messages will accumulate for delivery continuously through the 

614 call regardless of changes to the ``PushConfig``. 

615 

616 Returns: 

617 Callable[[~.ModifyPushConfigRequest], 

618 ~.Empty]: 

619 A function that, when called, will call the underlying RPC 

620 on the server. 

621 """ 

622 # Generate a "stub function" on-the-fly which will actually make 

623 # the request. 

624 # gRPC handles serialization and deserialization, so we just need 

625 # to pass in the functions for each. 

626 if "modify_push_config" not in self._stubs: 

627 self._stubs["modify_push_config"] = self._logged_channel.unary_unary( 

628 "/google.pubsub.v1.Subscriber/ModifyPushConfig", 

629 request_serializer=pubsub.ModifyPushConfigRequest.serialize, 

630 response_deserializer=empty_pb2.Empty.FromString, 

631 ) 

632 return self._stubs["modify_push_config"] 

633 

634 @property 

635 def get_snapshot(self) -> Callable[[pubsub.GetSnapshotRequest], pubsub.Snapshot]: 

636 r"""Return a callable for the get snapshot method over gRPC. 

637 

638 Gets the configuration details of a snapshot. Snapshots are used 

639 in 

640 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

641 operations, which allow you to manage message acknowledgments in 

642 bulk. That is, you can set the acknowledgment state of messages 

643 in an existing subscription to the state captured by a snapshot. 

644 

645 Returns: 

646 Callable[[~.GetSnapshotRequest], 

647 ~.Snapshot]: 

648 A function that, when called, will call the underlying RPC 

649 on the server. 

650 """ 

651 # Generate a "stub function" on-the-fly which will actually make 

652 # the request. 

653 # gRPC handles serialization and deserialization, so we just need 

654 # to pass in the functions for each. 

655 if "get_snapshot" not in self._stubs: 

656 self._stubs["get_snapshot"] = self._logged_channel.unary_unary( 

657 "/google.pubsub.v1.Subscriber/GetSnapshot", 

658 request_serializer=pubsub.GetSnapshotRequest.serialize, 

659 response_deserializer=pubsub.Snapshot.deserialize, 

660 ) 

661 return self._stubs["get_snapshot"] 

662 

663 @property 

664 def list_snapshots( 

665 self, 

666 ) -> Callable[[pubsub.ListSnapshotsRequest], pubsub.ListSnapshotsResponse]: 

667 r"""Return a callable for the list snapshots method over gRPC. 

668 

669 Lists the existing snapshots. Snapshots are used in 

670 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

671 operations, which allow you to manage message acknowledgments in 

672 bulk. That is, you can set the acknowledgment state of messages 

673 in an existing subscription to the state captured by a snapshot. 

674 

675 Returns: 

676 Callable[[~.ListSnapshotsRequest], 

677 ~.ListSnapshotsResponse]: 

678 A function that, when called, will call the underlying RPC 

679 on the server. 

680 """ 

681 # Generate a "stub function" on-the-fly which will actually make 

682 # the request. 

683 # gRPC handles serialization and deserialization, so we just need 

684 # to pass in the functions for each. 

685 if "list_snapshots" not in self._stubs: 

686 self._stubs["list_snapshots"] = self._logged_channel.unary_unary( 

687 "/google.pubsub.v1.Subscriber/ListSnapshots", 

688 request_serializer=pubsub.ListSnapshotsRequest.serialize, 

689 response_deserializer=pubsub.ListSnapshotsResponse.deserialize, 

690 ) 

691 return self._stubs["list_snapshots"] 

692 

693 @property 

694 def create_snapshot( 

695 self, 

696 ) -> Callable[[pubsub.CreateSnapshotRequest], pubsub.Snapshot]: 

697 r"""Return a callable for the create snapshot method over gRPC. 

698 

699 Creates a snapshot from the requested subscription. Snapshots 

700 are used in 

701 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

702 operations, which allow you to manage message acknowledgments in 

703 bulk. That is, you can set the acknowledgment state of messages 

704 in an existing subscription to the state captured by a snapshot. 

705 If the snapshot already exists, returns ``ALREADY_EXISTS``. If 

706 the requested subscription doesn't exist, returns ``NOT_FOUND``. 

707 If the backlog in the subscription is too old -- and the 

708 resulting snapshot would expire in less than 1 hour -- then 

709 ``FAILED_PRECONDITION`` is returned. See also the 

710 ``Snapshot.expire_time`` field. If the name is not provided in 

711 the request, the server will assign a random name for this 

712 snapshot on the same project as the subscription, conforming to 

713 the [resource name format] 

714 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 

715 The generated name is populated in the returned Snapshot object. 

716 Note that for REST API requests, you must specify a name in the 

717 request. 

718 

719 Returns: 

720 Callable[[~.CreateSnapshotRequest], 

721 ~.Snapshot]: 

722 A function that, when called, will call the underlying RPC 

723 on the server. 

724 """ 

725 # Generate a "stub function" on-the-fly which will actually make 

726 # the request. 

727 # gRPC handles serialization and deserialization, so we just need 

728 # to pass in the functions for each. 

729 if "create_snapshot" not in self._stubs: 

730 self._stubs["create_snapshot"] = self._logged_channel.unary_unary( 

731 "/google.pubsub.v1.Subscriber/CreateSnapshot", 

732 request_serializer=pubsub.CreateSnapshotRequest.serialize, 

733 response_deserializer=pubsub.Snapshot.deserialize, 

734 ) 

735 return self._stubs["create_snapshot"] 

736 

737 @property 

738 def update_snapshot( 

739 self, 

740 ) -> Callable[[pubsub.UpdateSnapshotRequest], pubsub.Snapshot]: 

741 r"""Return a callable for the update snapshot method over gRPC. 

742 

743 Updates an existing snapshot by updating the fields specified in 

744 the update mask. Snapshots are used in 

745 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

746 operations, which allow you to manage message acknowledgments in 

747 bulk. That is, you can set the acknowledgment state of messages 

748 in an existing subscription to the state captured by a snapshot. 

749 

750 Returns: 

751 Callable[[~.UpdateSnapshotRequest], 

752 ~.Snapshot]: 

753 A function that, when called, will call the underlying RPC 

754 on the server. 

755 """ 

756 # Generate a "stub function" on-the-fly which will actually make 

757 # the request. 

758 # gRPC handles serialization and deserialization, so we just need 

759 # to pass in the functions for each. 

760 if "update_snapshot" not in self._stubs: 

761 self._stubs["update_snapshot"] = self._logged_channel.unary_unary( 

762 "/google.pubsub.v1.Subscriber/UpdateSnapshot", 

763 request_serializer=pubsub.UpdateSnapshotRequest.serialize, 

764 response_deserializer=pubsub.Snapshot.deserialize, 

765 ) 

766 return self._stubs["update_snapshot"] 

767 

768 @property 

769 def delete_snapshot( 

770 self, 

771 ) -> Callable[[pubsub.DeleteSnapshotRequest], empty_pb2.Empty]: 

772 r"""Return a callable for the delete snapshot method over gRPC. 

773 

774 Removes an existing snapshot. Snapshots are used in [Seek] 

775 (https://cloud.google.com/pubsub/docs/replay-overview) 

776 operations, which allow you to manage message acknowledgments in 

777 bulk. That is, you can set the acknowledgment state of messages 

778 in an existing subscription to the state captured by a snapshot. 

779 When the snapshot is deleted, all messages retained in the 

780 snapshot are immediately dropped. After a snapshot is deleted, a 

781 new one may be created with the same name, but the new one has 

782 no association with the old snapshot or its subscription, unless 

783 the same subscription is specified. 

784 

785 Returns: 

786 Callable[[~.DeleteSnapshotRequest], 

787 ~.Empty]: 

788 A function that, when called, will call the underlying RPC 

789 on the server. 

790 """ 

791 # Generate a "stub function" on-the-fly which will actually make 

792 # the request. 

793 # gRPC handles serialization and deserialization, so we just need 

794 # to pass in the functions for each. 

795 if "delete_snapshot" not in self._stubs: 

796 self._stubs["delete_snapshot"] = self._logged_channel.unary_unary( 

797 "/google.pubsub.v1.Subscriber/DeleteSnapshot", 

798 request_serializer=pubsub.DeleteSnapshotRequest.serialize, 

799 response_deserializer=empty_pb2.Empty.FromString, 

800 ) 

801 return self._stubs["delete_snapshot"] 

802 

803 @property 

804 def seek(self) -> Callable[[pubsub.SeekRequest], pubsub.SeekResponse]: 

805 r"""Return a callable for the seek method over gRPC. 

806 

807 Seeks an existing subscription to a point in time or to a given 

808 snapshot, whichever is provided in the request. Snapshots are 

809 used in [Seek] 

810 (https://cloud.google.com/pubsub/docs/replay-overview) 

811 operations, which allow you to manage message acknowledgments in 

812 bulk. That is, you can set the acknowledgment state of messages 

813 in an existing subscription to the state captured by a snapshot. 

814 Note that both the subscription and the snapshot must be on the 

815 same topic. 

816 

817 Returns: 

818 Callable[[~.SeekRequest], 

819 ~.SeekResponse]: 

820 A function that, when called, will call the underlying RPC 

821 on the server. 

822 """ 

823 # Generate a "stub function" on-the-fly which will actually make 

824 # the request. 

825 # gRPC handles serialization and deserialization, so we just need 

826 # to pass in the functions for each. 

827 if "seek" not in self._stubs: 

828 self._stubs["seek"] = self._logged_channel.unary_unary( 

829 "/google.pubsub.v1.Subscriber/Seek", 

830 request_serializer=pubsub.SeekRequest.serialize, 

831 response_deserializer=pubsub.SeekResponse.deserialize, 

832 ) 

833 return self._stubs["seek"] 

834 

835 def close(self): 

836 self._logged_channel.close() 

837 

838 @property 

839 def set_iam_policy( 

840 self, 

841 ) -> Callable[[iam_policy_pb2.SetIamPolicyRequest], policy_pb2.Policy]: 

842 r"""Return a callable for the set iam policy method over gRPC. 

843 Sets the IAM access control policy on the specified 

844 function. Replaces any existing policy. 

845 Returns: 

846 Callable[[~.SetIamPolicyRequest], 

847 ~.Policy]: 

848 A function that, when called, will call the underlying RPC 

849 on the server. 

850 """ 

851 # Generate a "stub function" on-the-fly which will actually make 

852 # the request. 

853 # gRPC handles serialization and deserialization, so we just need 

854 # to pass in the functions for each. 

855 if "set_iam_policy" not in self._stubs: 

856 self._stubs["set_iam_policy"] = self._logged_channel.unary_unary( 

857 "/google.iam.v1.IAMPolicy/SetIamPolicy", 

858 request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString, 

859 response_deserializer=policy_pb2.Policy.FromString, 

860 ) 

861 return self._stubs["set_iam_policy"] 

862 

863 @property 

864 def get_iam_policy( 

865 self, 

866 ) -> Callable[[iam_policy_pb2.GetIamPolicyRequest], policy_pb2.Policy]: 

867 r"""Return a callable for the get iam policy method over gRPC. 

868 Gets the IAM access control policy for a function. 

869 Returns an empty policy if the function exists and does 

870 not have a policy set. 

871 Returns: 

872 Callable[[~.GetIamPolicyRequest], 

873 ~.Policy]: 

874 A function that, when called, will call the underlying RPC 

875 on the server. 

876 """ 

877 # Generate a "stub function" on-the-fly which will actually make 

878 # the request. 

879 # gRPC handles serialization and deserialization, so we just need 

880 # to pass in the functions for each. 

881 if "get_iam_policy" not in self._stubs: 

882 self._stubs["get_iam_policy"] = self._logged_channel.unary_unary( 

883 "/google.iam.v1.IAMPolicy/GetIamPolicy", 

884 request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString, 

885 response_deserializer=policy_pb2.Policy.FromString, 

886 ) 

887 return self._stubs["get_iam_policy"] 

888 

889 @property 

890 def test_iam_permissions( 

891 self, 

892 ) -> Callable[ 

893 [iam_policy_pb2.TestIamPermissionsRequest], 

894 iam_policy_pb2.TestIamPermissionsResponse, 

895 ]: 

896 r"""Return a callable for the test iam permissions method over gRPC. 

897 Tests the specified permissions against the IAM access control 

898 policy for a function. If the function does not exist, this will 

899 return an empty set of permissions, not a NOT_FOUND error. 

900 Returns: 

901 Callable[[~.TestIamPermissionsRequest], 

902 ~.TestIamPermissionsResponse]: 

903 A function that, when called, will call the underlying RPC 

904 on the server. 

905 """ 

906 # Generate a "stub function" on-the-fly which will actually make 

907 # the request. 

908 # gRPC handles serialization and deserialization, so we just need 

909 # to pass in the functions for each. 

910 if "test_iam_permissions" not in self._stubs: 

911 self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary( 

912 "/google.iam.v1.IAMPolicy/TestIamPermissions", 

913 request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString, 

914 response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString, 

915 ) 

916 return self._stubs["test_iam_permissions"] 

917 

918 @property 

919 def kind(self) -> str: 

920 return "grpc" 

921 

922 

923__all__ = ("SubscriberGrpcTransport",)