Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/pubsub_v1/services/subscriber/transports/grpc.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

161 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import json 

17import logging as std_logging 

18import pickle 

19import warnings 

20from typing import Callable, Dict, Optional, Sequence, Tuple, Union 

21 

22from google.api_core import grpc_helpers 

23from google.api_core import gapic_v1 

24import google.auth # type: ignore 

25from google.auth import credentials as ga_credentials # type: ignore 

26from google.auth.transport.grpc import SslCredentials # type: ignore 

27from google.protobuf.json_format import MessageToJson 

28import google.protobuf.message 

29 

30import grpc # type: ignore 

31import proto # type: ignore 

32 

33from google.iam.v1 import iam_policy_pb2 # type: ignore 

34from google.iam.v1 import policy_pb2 # type: ignore 

35from google.protobuf import empty_pb2 # type: ignore 

36from google.pubsub_v1.types import pubsub 

37from .base import SubscriberTransport, DEFAULT_CLIENT_INFO 

38 

39try: 

40 from google.api_core import client_logging # type: ignore 

41 

42 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

43except ImportError: # pragma: NO COVER 

44 CLIENT_LOGGING_SUPPORTED = False 

45 

46_LOGGER = std_logging.getLogger(__name__) 

47 

48 

49class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER 

50 def intercept_unary_unary(self, continuation, client_call_details, request): 

51 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

52 std_logging.DEBUG 

53 ) 

54 if logging_enabled: # pragma: NO COVER 

55 request_metadata = client_call_details.metadata 

56 if isinstance(request, proto.Message): 

57 request_payload = type(request).to_json(request) 

58 elif isinstance(request, google.protobuf.message.Message): 

59 request_payload = MessageToJson(request) 

60 else: 

61 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" 

62 

63 request_metadata = { 

64 key: value.decode("utf-8") if isinstance(value, bytes) else value 

65 for key, value in request_metadata 

66 } 

67 grpc_request = { 

68 "payload": request_payload, 

69 "requestMethod": "grpc", 

70 "metadata": dict(request_metadata), 

71 } 

72 _LOGGER.debug( 

73 f"Sending request for {client_call_details.method}", 

74 extra={ 

75 "serviceName": "google.pubsub.v1.Subscriber", 

76 "rpcName": str(client_call_details.method), 

77 "request": grpc_request, 

78 "metadata": grpc_request["metadata"], 

79 }, 

80 ) 

81 response = continuation(client_call_details, request) 

82 if logging_enabled: # pragma: NO COVER 

83 response_metadata = response.trailing_metadata() 

84 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples 

85 metadata = ( 

86 dict([(k, str(v)) for k, v in response_metadata]) 

87 if response_metadata 

88 else None 

89 ) 

90 result = response.result() 

91 if isinstance(result, proto.Message): 

92 response_payload = type(result).to_json(result) 

93 elif isinstance(result, google.protobuf.message.Message): 

94 response_payload = MessageToJson(result) 

95 else: 

96 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" 

97 grpc_response = { 

98 "payload": response_payload, 

99 "metadata": metadata, 

100 "status": "OK", 

101 } 

102 _LOGGER.debug( 

103 f"Received response for {client_call_details.method}.", 

104 extra={ 

105 "serviceName": "google.pubsub.v1.Subscriber", 

106 "rpcName": client_call_details.method, 

107 "response": grpc_response, 

108 "metadata": grpc_response["metadata"], 

109 }, 

110 ) 

111 return response 

112 

113 

114class SubscriberGrpcTransport(SubscriberTransport): 

115 """gRPC backend transport for Subscriber. 

116 

117 The service that an application uses to manipulate subscriptions and 

118 to consume messages from a subscription via the ``Pull`` method or 

119 by establishing a bi-directional stream using the ``StreamingPull`` 

120 method. 

121 

122 This class defines the same methods as the primary client, so the 

123 primary client can load the underlying transport implementation 

124 and call it. 

125 

126 It sends protocol buffers over the wire using gRPC (which is built on 

127 top of HTTP/2); the ``grpcio`` package must be installed. 

128 """ 

129 

130 _stubs: Dict[str, Callable] 

131 

132 def __init__( 

133 self, 

134 *, 

135 host: str = "pubsub.googleapis.com", 

136 credentials: Optional[ga_credentials.Credentials] = None, 

137 credentials_file: Optional[str] = None, 

138 scopes: Optional[Sequence[str]] = None, 

139 channel: Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]] = None, 

140 api_mtls_endpoint: Optional[str] = None, 

141 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

142 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 

143 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

144 quota_project_id: Optional[str] = None, 

145 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

146 always_use_jwt_access: Optional[bool] = False, 

147 api_audience: Optional[str] = None, 

148 ) -> None: 

149 """Instantiate the transport. 

150 

151 Args: 

152 host (Optional[str]): 

153 The hostname to connect to (default: 'pubsub.googleapis.com'). 

154 credentials (Optional[google.auth.credentials.Credentials]): The 

155 authorization credentials to attach to requests. These 

156 credentials identify the application to the service; if none 

157 are specified, the client will attempt to ascertain the 

158 credentials from the environment. 

159 This argument is ignored if a ``channel`` instance is provided. 

160 credentials_file (Optional[str]): A file with credentials that can 

161 be loaded with :func:`google.auth.load_credentials_from_file`. 

162 This argument is ignored if a ``channel`` instance is provided. 

163 scopes (Optional(Sequence[str])): A list of scopes. This argument is 

164 ignored if a ``channel`` instance is provided. 

165 channel (Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]]): 

166 A ``Channel`` instance through which to make calls, or a Callable 

167 that constructs and returns one. If set to None, ``self.create_channel`` 

168 is used to create the channel. If a Callable is given, it will be called 

169 with the same arguments as used in ``self.create_channel``. 

170 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 

171 If provided, it overrides the ``host`` argument and tries to create 

172 a mutual TLS channel with client SSL credentials from 

173 ``client_cert_source`` or application default SSL credentials. 

174 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 

175 Deprecated. A callback to provide client SSL certificate bytes and 

176 private key bytes, both in PEM format. It is ignored if 

177 ``api_mtls_endpoint`` is None. 

178 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 

179 for the grpc channel. It is ignored if a ``channel`` instance is provided. 

180 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 

181 A callback to provide client certificate bytes and private key bytes, 

182 both in PEM format. It is used to configure a mutual TLS channel. It is 

183 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided. 

184 quota_project_id (Optional[str]): An optional project to use for billing 

185 and quota. 

186 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

187 The client info used to send a user-agent string along with 

188 API requests. If ``None``, then default info will be used. 

189 Generally, you only need to set this if you're developing 

190 your own client library. 

191 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

192 be used for service account credentials. 

193 

194 Raises: 

195 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport 

196 creation failed for any reason. 

197 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

198 and ``credentials_file`` are passed. 

199 """ 

200 self._grpc_channel = None 

201 self._ssl_channel_credentials = ssl_channel_credentials 

202 self._stubs: Dict[str, Callable] = {} 

203 

204 if api_mtls_endpoint: 

205 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 

206 if client_cert_source: 

207 warnings.warn("client_cert_source is deprecated", DeprecationWarning) 

208 

209 if isinstance(channel, grpc.Channel): 

210 # Ignore credentials if a channel was passed. 

211 credentials = None 

212 self._ignore_credentials = True 

213 # If a channel was explicitly provided, set it. 

214 self._grpc_channel = channel 

215 self._ssl_channel_credentials = None 

216 

217 else: 

218 if api_mtls_endpoint: 

219 host = api_mtls_endpoint 

220 

221 # Create SSL credentials with client_cert_source or application 

222 # default SSL credentials. 

223 if client_cert_source: 

224 cert, key = client_cert_source() 

225 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

226 certificate_chain=cert, private_key=key 

227 ) 

228 else: 

229 self._ssl_channel_credentials = SslCredentials().ssl_credentials 

230 

231 else: 

232 if client_cert_source_for_mtls and not ssl_channel_credentials: 

233 cert, key = client_cert_source_for_mtls() 

234 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

235 certificate_chain=cert, private_key=key 

236 ) 

237 

238 # The base transport sets the host, credentials and scopes 

239 super().__init__( 

240 host=host, 

241 credentials=credentials, 

242 credentials_file=credentials_file, 

243 scopes=scopes, 

244 quota_project_id=quota_project_id, 

245 client_info=client_info, 

246 always_use_jwt_access=always_use_jwt_access, 

247 api_audience=api_audience, 

248 ) 

249 

250 if not self._grpc_channel: 

251 # initialize with the provided callable or the default channel 

252 channel_init = channel or type(self).create_channel 

253 self._grpc_channel = channel_init( 

254 self._host, 

255 # use the credentials which are saved 

256 credentials=self._credentials, 

257 # Set ``credentials_file`` to ``None`` here as 

258 # the credentials that we saved earlier should be used. 

259 credentials_file=None, 

260 scopes=self._scopes, 

261 ssl_credentials=self._ssl_channel_credentials, 

262 quota_project_id=quota_project_id, 

263 options=[ 

264 ("grpc.max_send_message_length", -1), 

265 ("grpc.max_receive_message_length", -1), 

266 ("grpc.max_metadata_size", 4 * 1024 * 1024), 

267 ("grpc.keepalive_time_ms", 30000), 

268 ], 

269 ) 

270 

271 self._interceptor = _LoggingClientInterceptor() 

272 self._logged_channel = grpc.intercept_channel( 

273 self._grpc_channel, self._interceptor 

274 ) 

275 

276 # Wrap messages. This must be done after self._logged_channel exists 

277 self._prep_wrapped_messages(client_info) 

278 

279 @classmethod 

280 def create_channel( 

281 cls, 

282 host: str = "pubsub.googleapis.com", 

283 credentials: Optional[ga_credentials.Credentials] = None, 

284 credentials_file: Optional[str] = None, 

285 scopes: Optional[Sequence[str]] = None, 

286 quota_project_id: Optional[str] = None, 

287 **kwargs, 

288 ) -> grpc.Channel: 

289 """Create and return a gRPC channel object. 

290 Args: 

291 host (Optional[str]): The host for the channel to use. 

292 credentials (Optional[~.Credentials]): The 

293 authorization credentials to attach to requests. These 

294 credentials identify this application to the service. If 

295 none are specified, the client will attempt to ascertain 

296 the credentials from the environment. 

297 credentials_file (Optional[str]): A file with credentials that can 

298 be loaded with :func:`google.auth.load_credentials_from_file`. 

299 This argument is mutually exclusive with credentials. 

300 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

301 service. These are only used when credentials are not specified and 

302 are passed to :func:`google.auth.default`. 

303 quota_project_id (Optional[str]): An optional project to use for billing 

304 and quota. 

305 kwargs (Optional[dict]): Keyword arguments, which are passed to the 

306 channel creation. 

307 Returns: 

308 grpc.Channel: A gRPC channel object. 

309 

310 Raises: 

311 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

312 and ``credentials_file`` are passed. 

313 """ 

314 

315 return grpc_helpers.create_channel( 

316 host, 

317 credentials=credentials, 

318 credentials_file=credentials_file, 

319 quota_project_id=quota_project_id, 

320 default_scopes=cls.AUTH_SCOPES, 

321 scopes=scopes, 

322 default_host=cls.DEFAULT_HOST, 

323 **kwargs, 

324 ) 

325 

326 @property 

327 def grpc_channel(self) -> grpc.Channel: 

328 """Return the channel designed to connect to this service.""" 

329 return self._grpc_channel 

330 

331 @property 

332 def create_subscription( 

333 self, 

334 ) -> Callable[[pubsub.Subscription], pubsub.Subscription]: 

335 r"""Return a callable for the create subscription method over gRPC. 

336 

337 Creates a subscription to a given topic. See the [resource name 

338 rules] 

339 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 

340 If the subscription already exists, returns ``ALREADY_EXISTS``. 

341 If the corresponding topic doesn't exist, returns ``NOT_FOUND``. 

342 

343 If the name is not provided in the request, the server will 

344 assign a random name for this subscription on the same project 

345 as the topic, conforming to the [resource name format] 

346 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 

347 The generated name is populated in the returned Subscription 

348 object. Note that for REST API requests, you must specify a name 

349 in the request. 

350 

351 Returns: 

352 Callable[[~.Subscription], 

353 ~.Subscription]: 

354 A function that, when called, will call the underlying RPC 

355 on the server. 

356 """ 

357 # Generate a "stub function" on-the-fly which will actually make 

358 # the request. 

359 # gRPC handles serialization and deserialization, so we just need 

360 # to pass in the functions for each. 

361 if "create_subscription" not in self._stubs: 

362 self._stubs["create_subscription"] = self._logged_channel.unary_unary( 

363 "/google.pubsub.v1.Subscriber/CreateSubscription", 

364 request_serializer=pubsub.Subscription.serialize, 

365 response_deserializer=pubsub.Subscription.deserialize, 

366 ) 

367 return self._stubs["create_subscription"] 

368 

369 @property 

370 def get_subscription( 

371 self, 

372 ) -> Callable[[pubsub.GetSubscriptionRequest], pubsub.Subscription]: 

373 r"""Return a callable for the get subscription method over gRPC. 

374 

375 Gets the configuration details of a subscription. 

376 

377 Returns: 

378 Callable[[~.GetSubscriptionRequest], 

379 ~.Subscription]: 

380 A function that, when called, will call the underlying RPC 

381 on the server. 

382 """ 

383 # Generate a "stub function" on-the-fly which will actually make 

384 # the request. 

385 # gRPC handles serialization and deserialization, so we just need 

386 # to pass in the functions for each. 

387 if "get_subscription" not in self._stubs: 

388 self._stubs["get_subscription"] = self._logged_channel.unary_unary( 

389 "/google.pubsub.v1.Subscriber/GetSubscription", 

390 request_serializer=pubsub.GetSubscriptionRequest.serialize, 

391 response_deserializer=pubsub.Subscription.deserialize, 

392 ) 

393 return self._stubs["get_subscription"] 

394 

395 @property 

396 def update_subscription( 

397 self, 

398 ) -> Callable[[pubsub.UpdateSubscriptionRequest], pubsub.Subscription]: 

399 r"""Return a callable for the update subscription method over gRPC. 

400 

401 Updates an existing subscription by updating the 

402 fields specified in the update mask. Note that certain 

403 properties of a subscription, such as its topic, are not 

404 modifiable. 

405 

406 Returns: 

407 Callable[[~.UpdateSubscriptionRequest], 

408 ~.Subscription]: 

409 A function that, when called, will call the underlying RPC 

410 on the server. 

411 """ 

412 # Generate a "stub function" on-the-fly which will actually make 

413 # the request. 

414 # gRPC handles serialization and deserialization, so we just need 

415 # to pass in the functions for each. 

416 if "update_subscription" not in self._stubs: 

417 self._stubs["update_subscription"] = self._logged_channel.unary_unary( 

418 "/google.pubsub.v1.Subscriber/UpdateSubscription", 

419 request_serializer=pubsub.UpdateSubscriptionRequest.serialize, 

420 response_deserializer=pubsub.Subscription.deserialize, 

421 ) 

422 return self._stubs["update_subscription"] 

423 

424 @property 

425 def list_subscriptions( 

426 self, 

427 ) -> Callable[[pubsub.ListSubscriptionsRequest], pubsub.ListSubscriptionsResponse]: 

428 r"""Return a callable for the list subscriptions method over gRPC. 

429 

430 Lists matching subscriptions. 

431 

432 Returns: 

433 Callable[[~.ListSubscriptionsRequest], 

434 ~.ListSubscriptionsResponse]: 

435 A function that, when called, will call the underlying RPC 

436 on the server. 

437 """ 

438 # Generate a "stub function" on-the-fly which will actually make 

439 # the request. 

440 # gRPC handles serialization and deserialization, so we just need 

441 # to pass in the functions for each. 

442 if "list_subscriptions" not in self._stubs: 

443 self._stubs["list_subscriptions"] = self._logged_channel.unary_unary( 

444 "/google.pubsub.v1.Subscriber/ListSubscriptions", 

445 request_serializer=pubsub.ListSubscriptionsRequest.serialize, 

446 response_deserializer=pubsub.ListSubscriptionsResponse.deserialize, 

447 ) 

448 return self._stubs["list_subscriptions"] 

449 

450 @property 

451 def delete_subscription( 

452 self, 

453 ) -> Callable[[pubsub.DeleteSubscriptionRequest], empty_pb2.Empty]: 

454 r"""Return a callable for the delete subscription method over gRPC. 

455 

456 Deletes an existing subscription. All messages retained in the 

457 subscription are immediately dropped. Calls to ``Pull`` after 

458 deletion will return ``NOT_FOUND``. After a subscription is 

459 deleted, a new one may be created with the same name, but the 

460 new one has no association with the old subscription or its 

461 topic unless the same topic is specified. 

462 

463 Returns: 

464 Callable[[~.DeleteSubscriptionRequest], 

465 ~.Empty]: 

466 A function that, when called, will call the underlying RPC 

467 on the server. 

468 """ 

469 # Generate a "stub function" on-the-fly which will actually make 

470 # the request. 

471 # gRPC handles serialization and deserialization, so we just need 

472 # to pass in the functions for each. 

473 if "delete_subscription" not in self._stubs: 

474 self._stubs["delete_subscription"] = self._logged_channel.unary_unary( 

475 "/google.pubsub.v1.Subscriber/DeleteSubscription", 

476 request_serializer=pubsub.DeleteSubscriptionRequest.serialize, 

477 response_deserializer=empty_pb2.Empty.FromString, 

478 ) 

479 return self._stubs["delete_subscription"] 

480 

481 @property 

482 def modify_ack_deadline( 

483 self, 

484 ) -> Callable[[pubsub.ModifyAckDeadlineRequest], empty_pb2.Empty]: 

485 r"""Return a callable for the modify ack deadline method over gRPC. 

486 

487 Modifies the ack deadline for a specific message. This method is 

488 useful to indicate that more time is needed to process a message 

489 by the subscriber, or to make the message available for 

490 redelivery if the processing was interrupted. Note that this 

491 does not modify the subscription-level ``ackDeadlineSeconds`` 

492 used for subsequent messages. 

493 

494 Returns: 

495 Callable[[~.ModifyAckDeadlineRequest], 

496 ~.Empty]: 

497 A function that, when called, will call the underlying RPC 

498 on the server. 

499 """ 

500 # Generate a "stub function" on-the-fly which will actually make 

501 # the request. 

502 # gRPC handles serialization and deserialization, so we just need 

503 # to pass in the functions for each. 

504 if "modify_ack_deadline" not in self._stubs: 

505 self._stubs["modify_ack_deadline"] = self._logged_channel.unary_unary( 

506 "/google.pubsub.v1.Subscriber/ModifyAckDeadline", 

507 request_serializer=pubsub.ModifyAckDeadlineRequest.serialize, 

508 response_deserializer=empty_pb2.Empty.FromString, 

509 ) 

510 return self._stubs["modify_ack_deadline"] 

511 

512 @property 

513 def acknowledge(self) -> Callable[[pubsub.AcknowledgeRequest], empty_pb2.Empty]: 

514 r"""Return a callable for the acknowledge method over gRPC. 

515 

516 Acknowledges the messages associated with the ``ack_ids`` in the 

517 ``AcknowledgeRequest``. The Pub/Sub system can remove the 

518 relevant messages from the subscription. 

519 

520 Acknowledging a message whose ack deadline has expired may 

521 succeed, but such a message may be redelivered later. 

522 Acknowledging a message more than once will not result in an 

523 error. 

524 

525 Returns: 

526 Callable[[~.AcknowledgeRequest], 

527 ~.Empty]: 

528 A function that, when called, will call the underlying RPC 

529 on the server. 

530 """ 

531 # Generate a "stub function" on-the-fly which will actually make 

532 # the request. 

533 # gRPC handles serialization and deserialization, so we just need 

534 # to pass in the functions for each. 

535 if "acknowledge" not in self._stubs: 

536 self._stubs["acknowledge"] = self._logged_channel.unary_unary( 

537 "/google.pubsub.v1.Subscriber/Acknowledge", 

538 request_serializer=pubsub.AcknowledgeRequest.serialize, 

539 response_deserializer=empty_pb2.Empty.FromString, 

540 ) 

541 return self._stubs["acknowledge"] 

542 

543 @property 

544 def pull(self) -> Callable[[pubsub.PullRequest], pubsub.PullResponse]: 

545 r"""Return a callable for the pull method over gRPC. 

546 

547 Pulls messages from the server. 

548 

549 Returns: 

550 Callable[[~.PullRequest], 

551 ~.PullResponse]: 

552 A function that, when called, will call the underlying RPC 

553 on the server. 

554 """ 

555 # Generate a "stub function" on-the-fly which will actually make 

556 # the request. 

557 # gRPC handles serialization and deserialization, so we just need 

558 # to pass in the functions for each. 

559 if "pull" not in self._stubs: 

560 self._stubs["pull"] = self._logged_channel.unary_unary( 

561 "/google.pubsub.v1.Subscriber/Pull", 

562 request_serializer=pubsub.PullRequest.serialize, 

563 response_deserializer=pubsub.PullResponse.deserialize, 

564 ) 

565 return self._stubs["pull"] 

566 

567 @property 

568 def streaming_pull( 

569 self, 

570 ) -> Callable[[pubsub.StreamingPullRequest], pubsub.StreamingPullResponse]: 

571 r"""Return a callable for the streaming pull method over gRPC. 

572 

573 Establishes a stream with the server, which sends messages down 

574 to the client. The client streams acknowledgments and ack 

575 deadline modifications back to the server. The server will close 

576 the stream and return the status on any error. The server may 

577 close the stream with status ``UNAVAILABLE`` to reassign 

578 server-side resources, in which case, the client should 

579 re-establish the stream. Flow control can be achieved by 

580 configuring the underlying RPC channel. 

581 

582 Returns: 

583 Callable[[~.StreamingPullRequest], 

584 ~.StreamingPullResponse]: 

585 A function that, when called, will call the underlying RPC 

586 on the server. 

587 """ 

588 # Generate a "stub function" on-the-fly which will actually make 

589 # the request. 

590 # gRPC handles serialization and deserialization, so we just need 

591 # to pass in the functions for each. 

592 if "streaming_pull" not in self._stubs: 

593 self._stubs["streaming_pull"] = self._logged_channel.stream_stream( 

594 "/google.pubsub.v1.Subscriber/StreamingPull", 

595 request_serializer=pubsub.StreamingPullRequest.serialize, 

596 response_deserializer=pubsub.StreamingPullResponse.deserialize, 

597 ) 

598 return self._stubs["streaming_pull"] 

599 

600 @property 

601 def modify_push_config( 

602 self, 

603 ) -> Callable[[pubsub.ModifyPushConfigRequest], empty_pb2.Empty]: 

604 r"""Return a callable for the modify push config method over gRPC. 

605 

606 Modifies the ``PushConfig`` for a specified subscription. 

607 

608 This may be used to change a push subscription to a pull one 

609 (signified by an empty ``PushConfig``) or vice versa, or change 

610 the endpoint URL and other attributes of a push subscription. 

611 Messages will accumulate for delivery continuously through the 

612 call regardless of changes to the ``PushConfig``. 

613 

614 Returns: 

615 Callable[[~.ModifyPushConfigRequest], 

616 ~.Empty]: 

617 A function that, when called, will call the underlying RPC 

618 on the server. 

619 """ 

620 # Generate a "stub function" on-the-fly which will actually make 

621 # the request. 

622 # gRPC handles serialization and deserialization, so we just need 

623 # to pass in the functions for each. 

624 if "modify_push_config" not in self._stubs: 

625 self._stubs["modify_push_config"] = self._logged_channel.unary_unary( 

626 "/google.pubsub.v1.Subscriber/ModifyPushConfig", 

627 request_serializer=pubsub.ModifyPushConfigRequest.serialize, 

628 response_deserializer=empty_pb2.Empty.FromString, 

629 ) 

630 return self._stubs["modify_push_config"] 

631 

632 @property 

633 def get_snapshot(self) -> Callable[[pubsub.GetSnapshotRequest], pubsub.Snapshot]: 

634 r"""Return a callable for the get snapshot method over gRPC. 

635 

636 Gets the configuration details of a snapshot. Snapshots are used 

637 in 

638 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

639 operations, which allow you to manage message acknowledgments in 

640 bulk. That is, you can set the acknowledgment state of messages 

641 in an existing subscription to the state captured by a snapshot. 

642 

643 Returns: 

644 Callable[[~.GetSnapshotRequest], 

645 ~.Snapshot]: 

646 A function that, when called, will call the underlying RPC 

647 on the server. 

648 """ 

649 # Generate a "stub function" on-the-fly which will actually make 

650 # the request. 

651 # gRPC handles serialization and deserialization, so we just need 

652 # to pass in the functions for each. 

653 if "get_snapshot" not in self._stubs: 

654 self._stubs["get_snapshot"] = self._logged_channel.unary_unary( 

655 "/google.pubsub.v1.Subscriber/GetSnapshot", 

656 request_serializer=pubsub.GetSnapshotRequest.serialize, 

657 response_deserializer=pubsub.Snapshot.deserialize, 

658 ) 

659 return self._stubs["get_snapshot"] 

660 

661 @property 

662 def list_snapshots( 

663 self, 

664 ) -> Callable[[pubsub.ListSnapshotsRequest], pubsub.ListSnapshotsResponse]: 

665 r"""Return a callable for the list snapshots method over gRPC. 

666 

667 Lists the existing snapshots. Snapshots are used in 

668 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

669 operations, which allow you to manage message acknowledgments in 

670 bulk. That is, you can set the acknowledgment state of messages 

671 in an existing subscription to the state captured by a snapshot. 

672 

673 Returns: 

674 Callable[[~.ListSnapshotsRequest], 

675 ~.ListSnapshotsResponse]: 

676 A function that, when called, will call the underlying RPC 

677 on the server. 

678 """ 

679 # Generate a "stub function" on-the-fly which will actually make 

680 # the request. 

681 # gRPC handles serialization and deserialization, so we just need 

682 # to pass in the functions for each. 

683 if "list_snapshots" not in self._stubs: 

684 self._stubs["list_snapshots"] = self._logged_channel.unary_unary( 

685 "/google.pubsub.v1.Subscriber/ListSnapshots", 

686 request_serializer=pubsub.ListSnapshotsRequest.serialize, 

687 response_deserializer=pubsub.ListSnapshotsResponse.deserialize, 

688 ) 

689 return self._stubs["list_snapshots"] 

690 

691 @property 

692 def create_snapshot( 

693 self, 

694 ) -> Callable[[pubsub.CreateSnapshotRequest], pubsub.Snapshot]: 

695 r"""Return a callable for the create snapshot method over gRPC. 

696 

697 Creates a snapshot from the requested subscription. Snapshots 

698 are used in 

699 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

700 operations, which allow you to manage message acknowledgments in 

701 bulk. That is, you can set the acknowledgment state of messages 

702 in an existing subscription to the state captured by a snapshot. 

703 If the snapshot already exists, returns ``ALREADY_EXISTS``. If 

704 the requested subscription doesn't exist, returns ``NOT_FOUND``. 

705 If the backlog in the subscription is too old -- and the 

706 resulting snapshot would expire in less than 1 hour -- then 

707 ``FAILED_PRECONDITION`` is returned. See also the 

708 ``Snapshot.expire_time`` field. If the name is not provided in 

709 the request, the server will assign a random name for this 

710 snapshot on the same project as the subscription, conforming to 

711 the [resource name format] 

712 (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names). 

713 The generated name is populated in the returned Snapshot object. 

714 Note that for REST API requests, you must specify a name in the 

715 request. 

716 

717 Returns: 

718 Callable[[~.CreateSnapshotRequest], 

719 ~.Snapshot]: 

720 A function that, when called, will call the underlying RPC 

721 on the server. 

722 """ 

723 # Generate a "stub function" on-the-fly which will actually make 

724 # the request. 

725 # gRPC handles serialization and deserialization, so we just need 

726 # to pass in the functions for each. 

727 if "create_snapshot" not in self._stubs: 

728 self._stubs["create_snapshot"] = self._logged_channel.unary_unary( 

729 "/google.pubsub.v1.Subscriber/CreateSnapshot", 

730 request_serializer=pubsub.CreateSnapshotRequest.serialize, 

731 response_deserializer=pubsub.Snapshot.deserialize, 

732 ) 

733 return self._stubs["create_snapshot"] 

734 

735 @property 

736 def update_snapshot( 

737 self, 

738 ) -> Callable[[pubsub.UpdateSnapshotRequest], pubsub.Snapshot]: 

739 r"""Return a callable for the update snapshot method over gRPC. 

740 

741 Updates an existing snapshot by updating the fields specified in 

742 the update mask. Snapshots are used in 

743 `Seek <https://cloud.google.com/pubsub/docs/replay-overview>`__ 

744 operations, which allow you to manage message acknowledgments in 

745 bulk. That is, you can set the acknowledgment state of messages 

746 in an existing subscription to the state captured by a snapshot. 

747 

748 Returns: 

749 Callable[[~.UpdateSnapshotRequest], 

750 ~.Snapshot]: 

751 A function that, when called, will call the underlying RPC 

752 on the server. 

753 """ 

754 # Generate a "stub function" on-the-fly which will actually make 

755 # the request. 

756 # gRPC handles serialization and deserialization, so we just need 

757 # to pass in the functions for each. 

758 if "update_snapshot" not in self._stubs: 

759 self._stubs["update_snapshot"] = self._logged_channel.unary_unary( 

760 "/google.pubsub.v1.Subscriber/UpdateSnapshot", 

761 request_serializer=pubsub.UpdateSnapshotRequest.serialize, 

762 response_deserializer=pubsub.Snapshot.deserialize, 

763 ) 

764 return self._stubs["update_snapshot"] 

765 

766 @property 

767 def delete_snapshot( 

768 self, 

769 ) -> Callable[[pubsub.DeleteSnapshotRequest], empty_pb2.Empty]: 

770 r"""Return a callable for the delete snapshot method over gRPC. 

771 

772 Removes an existing snapshot. Snapshots are used in [Seek] 

773 (https://cloud.google.com/pubsub/docs/replay-overview) 

774 operations, which allow you to manage message acknowledgments in 

775 bulk. That is, you can set the acknowledgment state of messages 

776 in an existing subscription to the state captured by a snapshot. 

777 When the snapshot is deleted, all messages retained in the 

778 snapshot are immediately dropped. After a snapshot is deleted, a 

779 new one may be created with the same name, but the new one has 

780 no association with the old snapshot or its subscription, unless 

781 the same subscription is specified. 

782 

783 Returns: 

784 Callable[[~.DeleteSnapshotRequest], 

785 ~.Empty]: 

786 A function that, when called, will call the underlying RPC 

787 on the server. 

788 """ 

789 # Generate a "stub function" on-the-fly which will actually make 

790 # the request. 

791 # gRPC handles serialization and deserialization, so we just need 

792 # to pass in the functions for each. 

793 if "delete_snapshot" not in self._stubs: 

794 self._stubs["delete_snapshot"] = self._logged_channel.unary_unary( 

795 "/google.pubsub.v1.Subscriber/DeleteSnapshot", 

796 request_serializer=pubsub.DeleteSnapshotRequest.serialize, 

797 response_deserializer=empty_pb2.Empty.FromString, 

798 ) 

799 return self._stubs["delete_snapshot"] 

800 

801 @property 

802 def seek(self) -> Callable[[pubsub.SeekRequest], pubsub.SeekResponse]: 

803 r"""Return a callable for the seek method over gRPC. 

804 

805 Seeks an existing subscription to a point in time or to a given 

806 snapshot, whichever is provided in the request. Snapshots are 

807 used in [Seek] 

808 (https://cloud.google.com/pubsub/docs/replay-overview) 

809 operations, which allow you to manage message acknowledgments in 

810 bulk. That is, you can set the acknowledgment state of messages 

811 in an existing subscription to the state captured by a snapshot. 

812 Note that both the subscription and the snapshot must be on the 

813 same topic. 

814 

815 Returns: 

816 Callable[[~.SeekRequest], 

817 ~.SeekResponse]: 

818 A function that, when called, will call the underlying RPC 

819 on the server. 

820 """ 

821 # Generate a "stub function" on-the-fly which will actually make 

822 # the request. 

823 # gRPC handles serialization and deserialization, so we just need 

824 # to pass in the functions for each. 

825 if "seek" not in self._stubs: 

826 self._stubs["seek"] = self._logged_channel.unary_unary( 

827 "/google.pubsub.v1.Subscriber/Seek", 

828 request_serializer=pubsub.SeekRequest.serialize, 

829 response_deserializer=pubsub.SeekResponse.deserialize, 

830 ) 

831 return self._stubs["seek"] 

832 

833 def close(self): 

834 self._logged_channel.close() 

835 

836 @property 

837 def set_iam_policy( 

838 self, 

839 ) -> Callable[[iam_policy_pb2.SetIamPolicyRequest], policy_pb2.Policy]: 

840 r"""Return a callable for the set iam policy method over gRPC. 

841 Sets the IAM access control policy on the specified 

842 function. Replaces any existing policy. 

843 Returns: 

844 Callable[[~.SetIamPolicyRequest], 

845 ~.Policy]: 

846 A function that, when called, will call the underlying RPC 

847 on the server. 

848 """ 

849 # Generate a "stub function" on-the-fly which will actually make 

850 # the request. 

851 # gRPC handles serialization and deserialization, so we just need 

852 # to pass in the functions for each. 

853 if "set_iam_policy" not in self._stubs: 

854 self._stubs["set_iam_policy"] = self._logged_channel.unary_unary( 

855 "/google.iam.v1.IAMPolicy/SetIamPolicy", 

856 request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString, 

857 response_deserializer=policy_pb2.Policy.FromString, 

858 ) 

859 return self._stubs["set_iam_policy"] 

860 

861 @property 

862 def get_iam_policy( 

863 self, 

864 ) -> Callable[[iam_policy_pb2.GetIamPolicyRequest], policy_pb2.Policy]: 

865 r"""Return a callable for the get iam policy method over gRPC. 

866 Gets the IAM access control policy for a function. 

867 Returns an empty policy if the function exists and does 

868 not have a policy set. 

869 Returns: 

870 Callable[[~.GetIamPolicyRequest], 

871 ~.Policy]: 

872 A function that, when called, will call the underlying RPC 

873 on the server. 

874 """ 

875 # Generate a "stub function" on-the-fly which will actually make 

876 # the request. 

877 # gRPC handles serialization and deserialization, so we just need 

878 # to pass in the functions for each. 

879 if "get_iam_policy" not in self._stubs: 

880 self._stubs["get_iam_policy"] = self._logged_channel.unary_unary( 

881 "/google.iam.v1.IAMPolicy/GetIamPolicy", 

882 request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString, 

883 response_deserializer=policy_pb2.Policy.FromString, 

884 ) 

885 return self._stubs["get_iam_policy"] 

886 

887 @property 

888 def test_iam_permissions( 

889 self, 

890 ) -> Callable[ 

891 [iam_policy_pb2.TestIamPermissionsRequest], 

892 iam_policy_pb2.TestIamPermissionsResponse, 

893 ]: 

894 r"""Return a callable for the test iam permissions method over gRPC. 

895 Tests the specified permissions against the IAM access control 

896 policy for a function. If the function does not exist, this will 

897 return an empty set of permissions, not a NOT_FOUND error. 

898 Returns: 

899 Callable[[~.TestIamPermissionsRequest], 

900 ~.TestIamPermissionsResponse]: 

901 A function that, when called, will call the underlying RPC 

902 on the server. 

903 """ 

904 # Generate a "stub function" on-the-fly which will actually make 

905 # the request. 

906 # gRPC handles serialization and deserialization, so we just need 

907 # to pass in the functions for each. 

908 if "test_iam_permissions" not in self._stubs: 

909 self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary( 

910 "/google.iam.v1.IAMPolicy/TestIamPermissions", 

911 request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString, 

912 response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString, 

913 ) 

914 return self._stubs["test_iam_permissions"] 

915 

916 @property 

917 def kind(self) -> str: 

918 return "grpc" 

919 

920 

921__all__ = ("SubscriberGrpcTransport",)