Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/firestore_v1/services/firestore/transports/grpc_asyncio.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

184 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import inspect 

17import json 

18import pickle 

19import logging as std_logging 

20import warnings 

21from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union 

22 

23from google.api_core import gapic_v1 

24from google.api_core import grpc_helpers_async 

25from google.api_core import exceptions as core_exceptions 

26from google.api_core import retry_async as retries 

27from google.auth import credentials as ga_credentials # type: ignore 

28from google.auth.transport.grpc import SslCredentials # type: ignore 

29from google.protobuf.json_format import MessageToJson 

30import google.protobuf.message 

31 

32import grpc # type: ignore 

33import proto # type: ignore 

34from grpc.experimental import aio # type: ignore 

35 

36from google.cloud.firestore_v1.types import document 

37from google.cloud.firestore_v1.types import document as gf_document 

38from google.cloud.firestore_v1.types import firestore 

39from google.cloud.location import locations_pb2 # type: ignore 

40from google.longrunning import operations_pb2 # type: ignore 

41from google.protobuf import empty_pb2 # type: ignore 

42from .base import FirestoreTransport, DEFAULT_CLIENT_INFO 

43from .grpc import FirestoreGrpcTransport 

44 

45try: 

46 from google.api_core import client_logging # type: ignore 

47 

48 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

49except ImportError: # pragma: NO COVER 

50 CLIENT_LOGGING_SUPPORTED = False 

51 

52_LOGGER = std_logging.getLogger(__name__) 

53 

54 

55class _LoggingClientAIOInterceptor( 

56 grpc.aio.UnaryUnaryClientInterceptor 

57): # pragma: NO COVER 

58 async def intercept_unary_unary(self, continuation, client_call_details, request): 

59 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

60 std_logging.DEBUG 

61 ) 

62 if logging_enabled: # pragma: NO COVER 

63 request_metadata = client_call_details.metadata 

64 if isinstance(request, proto.Message): 

65 request_payload = type(request).to_json(request) 

66 elif isinstance(request, google.protobuf.message.Message): 

67 request_payload = MessageToJson(request) 

68 else: 

69 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" 

70 

71 request_metadata = { 

72 key: value.decode("utf-8") if isinstance(value, bytes) else value 

73 for key, value in request_metadata 

74 } 

75 grpc_request = { 

76 "payload": request_payload, 

77 "requestMethod": "grpc", 

78 "metadata": dict(request_metadata), 

79 } 

80 _LOGGER.debug( 

81 f"Sending request for {client_call_details.method}", 

82 extra={ 

83 "serviceName": "google.firestore.v1.Firestore", 

84 "rpcName": str(client_call_details.method), 

85 "request": grpc_request, 

86 "metadata": grpc_request["metadata"], 

87 }, 

88 ) 

89 response = await continuation(client_call_details, request) 

90 if logging_enabled: # pragma: NO COVER 

91 response_metadata = await response.trailing_metadata() 

92 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples 

93 metadata = ( 

94 dict([(k, str(v)) for k, v in response_metadata]) 

95 if response_metadata 

96 else None 

97 ) 

98 result = await response 

99 if isinstance(result, proto.Message): 

100 response_payload = type(result).to_json(result) 

101 elif isinstance(result, google.protobuf.message.Message): 

102 response_payload = MessageToJson(result) 

103 else: 

104 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" 

105 grpc_response = { 

106 "payload": response_payload, 

107 "metadata": metadata, 

108 "status": "OK", 

109 } 

110 _LOGGER.debug( 

111 f"Received response to rpc {client_call_details.method}.", 

112 extra={ 

113 "serviceName": "google.firestore.v1.Firestore", 

114 "rpcName": str(client_call_details.method), 

115 "response": grpc_response, 

116 "metadata": grpc_response["metadata"], 

117 }, 

118 ) 

119 return response 

120 

121 

122class FirestoreGrpcAsyncIOTransport(FirestoreTransport): 

123 """gRPC AsyncIO backend transport for Firestore. 

124 

125 The Cloud Firestore service. 

126 

127 Cloud Firestore is a fast, fully managed, serverless, 

128 cloud-native NoSQL document database that simplifies storing, 

129 syncing, and querying data for your mobile, web, and IoT apps at 

130 global scale. Its client libraries provide live synchronization 

131 and offline support, while its security features and 

132 integrations with Firebase and Google Cloud Platform accelerate 

133 building truly serverless apps. 

134 

135 This class defines the same methods as the primary client, so the 

136 primary client can load the underlying transport implementation 

137 and call it. 

138 

139 It sends protocol buffers over the wire using gRPC (which is built on 

140 top of HTTP/2); the ``grpcio`` package must be installed. 

141 """ 

142 

143 _grpc_channel: aio.Channel 

144 _stubs: Dict[str, Callable] = {} 

145 

146 @classmethod 

147 def create_channel( 

148 cls, 

149 host: str = "firestore.googleapis.com", 

150 credentials: Optional[ga_credentials.Credentials] = None, 

151 credentials_file: Optional[str] = None, 

152 scopes: Optional[Sequence[str]] = None, 

153 quota_project_id: Optional[str] = None, 

154 **kwargs, 

155 ) -> aio.Channel: 

156 """Create and return a gRPC AsyncIO channel object. 

157 Args: 

158 host (Optional[str]): The host for the channel to use. 

159 credentials (Optional[~.Credentials]): The 

160 authorization credentials to attach to requests. These 

161 credentials identify this application to the service. If 

162 none are specified, the client will attempt to ascertain 

163 the credentials from the environment. 

164 credentials_file (Optional[str]): Deprecated. A file with credentials that can 

165 be loaded with :func:`google.auth.load_credentials_from_file`. This argument will be 

166 removed in the next major version of this library. 

167 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

168 service. These are only used when credentials are not specified and 

169 are passed to :func:`google.auth.default`. 

170 quota_project_id (Optional[str]): An optional project to use for billing 

171 and quota. 

172 kwargs (Optional[dict]): Keyword arguments, which are passed to the 

173 channel creation. 

174 Returns: 

175 aio.Channel: A gRPC AsyncIO channel object. 

176 """ 

177 

178 return grpc_helpers_async.create_channel( 

179 host, 

180 credentials=credentials, 

181 credentials_file=credentials_file, 

182 quota_project_id=quota_project_id, 

183 default_scopes=cls.AUTH_SCOPES, 

184 scopes=scopes, 

185 default_host=cls.DEFAULT_HOST, 

186 **kwargs, 

187 ) 

188 

189 def __init__( 

190 self, 

191 *, 

192 host: str = "firestore.googleapis.com", 

193 credentials: Optional[ga_credentials.Credentials] = None, 

194 credentials_file: Optional[str] = None, 

195 scopes: Optional[Sequence[str]] = None, 

196 channel: Optional[Union[aio.Channel, Callable[..., aio.Channel]]] = None, 

197 api_mtls_endpoint: Optional[str] = None, 

198 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

199 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 

200 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

201 quota_project_id: Optional[str] = None, 

202 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

203 always_use_jwt_access: Optional[bool] = False, 

204 api_audience: Optional[str] = None, 

205 ) -> None: 

206 """Instantiate the transport. 

207 

208 Args: 

209 host (Optional[str]): 

210 The hostname to connect to (default: 'firestore.googleapis.com'). 

211 credentials (Optional[google.auth.credentials.Credentials]): The 

212 authorization credentials to attach to requests. These 

213 credentials identify the application to the service; if none 

214 are specified, the client will attempt to ascertain the 

215 credentials from the environment. 

216 This argument is ignored if a ``channel`` instance is provided. 

217 credentials_file (Optional[str]): Deprecated. A file with credentials that can 

218 be loaded with :func:`google.auth.load_credentials_from_file`. 

219 This argument is ignored if a ``channel`` instance is provided. 

220 This argument will be removed in the next major version of this library. 

221 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

222 service. These are only used when credentials are not specified and 

223 are passed to :func:`google.auth.default`. 

224 channel (Optional[Union[aio.Channel, Callable[..., aio.Channel]]]): 

225 A ``Channel`` instance through which to make calls, or a Callable 

226 that constructs and returns one. If set to None, ``self.create_channel`` 

227 is used to create the channel. If a Callable is given, it will be called 

228 with the same arguments as used in ``self.create_channel``. 

229 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 

230 If provided, it overrides the ``host`` argument and tries to create 

231 a mutual TLS channel with client SSL credentials from 

232 ``client_cert_source`` or application default SSL credentials. 

233 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 

234 Deprecated. A callback to provide client SSL certificate bytes and 

235 private key bytes, both in PEM format. It is ignored if 

236 ``api_mtls_endpoint`` is None. 

237 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 

238 for the grpc channel. It is ignored if a ``channel`` instance is provided. 

239 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 

240 A callback to provide client certificate bytes and private key bytes, 

241 both in PEM format. It is used to configure a mutual TLS channel. It is 

242 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided. 

243 quota_project_id (Optional[str]): An optional project to use for billing 

244 and quota. 

245 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

246 The client info used to send a user-agent string along with 

247 API requests. If ``None``, then default info will be used. 

248 Generally, you only need to set this if you're developing 

249 your own client library. 

250 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

251 be used for service account credentials. 

252 

253 Raises: 

254 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport 

255 creation failed for any reason. 

256 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

257 and ``credentials_file`` are passed. 

258 """ 

259 self._grpc_channel = None 

260 self._ssl_channel_credentials = ssl_channel_credentials 

261 self._stubs: Dict[str, Callable] = {} 

262 

263 if api_mtls_endpoint: 

264 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 

265 if client_cert_source: 

266 warnings.warn("client_cert_source is deprecated", DeprecationWarning) 

267 

268 if isinstance(channel, aio.Channel): 

269 # Ignore credentials if a channel was passed. 

270 credentials = None 

271 self._ignore_credentials = True 

272 # If a channel was explicitly provided, set it. 

273 self._grpc_channel = channel 

274 self._ssl_channel_credentials = None 

275 else: 

276 if api_mtls_endpoint: 

277 host = api_mtls_endpoint 

278 

279 # Create SSL credentials with client_cert_source or application 

280 # default SSL credentials. 

281 if client_cert_source: 

282 cert, key = client_cert_source() 

283 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

284 certificate_chain=cert, private_key=key 

285 ) 

286 else: 

287 self._ssl_channel_credentials = SslCredentials().ssl_credentials 

288 

289 else: 

290 if client_cert_source_for_mtls and not ssl_channel_credentials: 

291 cert, key = client_cert_source_for_mtls() 

292 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

293 certificate_chain=cert, private_key=key 

294 ) 

295 

296 # The base transport sets the host, credentials and scopes 

297 super().__init__( 

298 host=host, 

299 credentials=credentials, 

300 credentials_file=credentials_file, 

301 scopes=scopes, 

302 quota_project_id=quota_project_id, 

303 client_info=client_info, 

304 always_use_jwt_access=always_use_jwt_access, 

305 api_audience=api_audience, 

306 ) 

307 

308 if not self._grpc_channel: 

309 # initialize with the provided callable or the default channel 

310 channel_init = channel or type(self).create_channel 

311 self._grpc_channel = channel_init( 

312 self._host, 

313 # use the credentials which are saved 

314 credentials=self._credentials, 

315 # Set ``credentials_file`` to ``None`` here as 

316 # the credentials that we saved earlier should be used. 

317 credentials_file=None, 

318 scopes=self._scopes, 

319 ssl_credentials=self._ssl_channel_credentials, 

320 quota_project_id=quota_project_id, 

321 options=[ 

322 ("grpc.max_send_message_length", -1), 

323 ("grpc.max_receive_message_length", -1), 

324 ], 

325 ) 

326 

327 self._interceptor = _LoggingClientAIOInterceptor() 

328 self._grpc_channel._unary_unary_interceptors.append(self._interceptor) 

329 self._logged_channel = self._grpc_channel 

330 self._wrap_with_kind = ( 

331 "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters 

332 ) 

333 # Wrap messages. This must be done after self._logged_channel exists 

334 self._prep_wrapped_messages(client_info) 

335 

336 @property 

337 def grpc_channel(self) -> aio.Channel: 

338 """Create the channel designed to connect to this service. 

339 

340 This property caches on the instance; repeated calls return 

341 the same channel. 

342 """ 

343 # Return the channel from cache. 

344 return self._grpc_channel 

345 

346 @property 

347 def get_document( 

348 self, 

349 ) -> Callable[[firestore.GetDocumentRequest], Awaitable[document.Document]]: 

350 r"""Return a callable for the get document method over gRPC. 

351 

352 Gets a single document. 

353 

354 Returns: 

355 Callable[[~.GetDocumentRequest], 

356 Awaitable[~.Document]]: 

357 A function that, when called, will call the underlying RPC 

358 on the server. 

359 """ 

360 # Generate a "stub function" on-the-fly which will actually make 

361 # the request. 

362 # gRPC handles serialization and deserialization, so we just need 

363 # to pass in the functions for each. 

364 if "get_document" not in self._stubs: 

365 self._stubs["get_document"] = self._logged_channel.unary_unary( 

366 "/google.firestore.v1.Firestore/GetDocument", 

367 request_serializer=firestore.GetDocumentRequest.serialize, 

368 response_deserializer=document.Document.deserialize, 

369 ) 

370 return self._stubs["get_document"] 

371 

372 @property 

373 def list_documents( 

374 self, 

375 ) -> Callable[ 

376 [firestore.ListDocumentsRequest], Awaitable[firestore.ListDocumentsResponse] 

377 ]: 

378 r"""Return a callable for the list documents method over gRPC. 

379 

380 Lists documents. 

381 

382 Returns: 

383 Callable[[~.ListDocumentsRequest], 

384 Awaitable[~.ListDocumentsResponse]]: 

385 A function that, when called, will call the underlying RPC 

386 on the server. 

387 """ 

388 # Generate a "stub function" on-the-fly which will actually make 

389 # the request. 

390 # gRPC handles serialization and deserialization, so we just need 

391 # to pass in the functions for each. 

392 if "list_documents" not in self._stubs: 

393 self._stubs["list_documents"] = self._logged_channel.unary_unary( 

394 "/google.firestore.v1.Firestore/ListDocuments", 

395 request_serializer=firestore.ListDocumentsRequest.serialize, 

396 response_deserializer=firestore.ListDocumentsResponse.deserialize, 

397 ) 

398 return self._stubs["list_documents"] 

399 

400 @property 

401 def update_document( 

402 self, 

403 ) -> Callable[[firestore.UpdateDocumentRequest], Awaitable[gf_document.Document]]: 

404 r"""Return a callable for the update document method over gRPC. 

405 

406 Updates or inserts a document. 

407 

408 Returns: 

409 Callable[[~.UpdateDocumentRequest], 

410 Awaitable[~.Document]]: 

411 A function that, when called, will call the underlying RPC 

412 on the server. 

413 """ 

414 # Generate a "stub function" on-the-fly which will actually make 

415 # the request. 

416 # gRPC handles serialization and deserialization, so we just need 

417 # to pass in the functions for each. 

418 if "update_document" not in self._stubs: 

419 self._stubs["update_document"] = self._logged_channel.unary_unary( 

420 "/google.firestore.v1.Firestore/UpdateDocument", 

421 request_serializer=firestore.UpdateDocumentRequest.serialize, 

422 response_deserializer=gf_document.Document.deserialize, 

423 ) 

424 return self._stubs["update_document"] 

425 

426 @property 

427 def delete_document( 

428 self, 

429 ) -> Callable[[firestore.DeleteDocumentRequest], Awaitable[empty_pb2.Empty]]: 

430 r"""Return a callable for the delete document method over gRPC. 

431 

432 Deletes a document. 

433 

434 Returns: 

435 Callable[[~.DeleteDocumentRequest], 

436 Awaitable[~.Empty]]: 

437 A function that, when called, will call the underlying RPC 

438 on the server. 

439 """ 

440 # Generate a "stub function" on-the-fly which will actually make 

441 # the request. 

442 # gRPC handles serialization and deserialization, so we just need 

443 # to pass in the functions for each. 

444 if "delete_document" not in self._stubs: 

445 self._stubs["delete_document"] = self._logged_channel.unary_unary( 

446 "/google.firestore.v1.Firestore/DeleteDocument", 

447 request_serializer=firestore.DeleteDocumentRequest.serialize, 

448 response_deserializer=empty_pb2.Empty.FromString, 

449 ) 

450 return self._stubs["delete_document"] 

451 

452 @property 

453 def batch_get_documents( 

454 self, 

455 ) -> Callable[ 

456 [firestore.BatchGetDocumentsRequest], 

457 Awaitable[firestore.BatchGetDocumentsResponse], 

458 ]: 

459 r"""Return a callable for the batch get documents method over gRPC. 

460 

461 Gets multiple documents. 

462 

463 Documents returned by this method are not guaranteed to 

464 be returned in the same order that they were requested. 

465 

466 Returns: 

467 Callable[[~.BatchGetDocumentsRequest], 

468 Awaitable[~.BatchGetDocumentsResponse]]: 

469 A function that, when called, will call the underlying RPC 

470 on the server. 

471 """ 

472 # Generate a "stub function" on-the-fly which will actually make 

473 # the request. 

474 # gRPC handles serialization and deserialization, so we just need 

475 # to pass in the functions for each. 

476 if "batch_get_documents" not in self._stubs: 

477 self._stubs["batch_get_documents"] = self._logged_channel.unary_stream( 

478 "/google.firestore.v1.Firestore/BatchGetDocuments", 

479 request_serializer=firestore.BatchGetDocumentsRequest.serialize, 

480 response_deserializer=firestore.BatchGetDocumentsResponse.deserialize, 

481 ) 

482 return self._stubs["batch_get_documents"] 

483 

484 @property 

485 def begin_transaction( 

486 self, 

487 ) -> Callable[ 

488 [firestore.BeginTransactionRequest], 

489 Awaitable[firestore.BeginTransactionResponse], 

490 ]: 

491 r"""Return a callable for the begin transaction method over gRPC. 

492 

493 Starts a new transaction. 

494 

495 Returns: 

496 Callable[[~.BeginTransactionRequest], 

497 Awaitable[~.BeginTransactionResponse]]: 

498 A function that, when called, will call the underlying RPC 

499 on the server. 

500 """ 

501 # Generate a "stub function" on-the-fly which will actually make 

502 # the request. 

503 # gRPC handles serialization and deserialization, so we just need 

504 # to pass in the functions for each. 

505 if "begin_transaction" not in self._stubs: 

506 self._stubs["begin_transaction"] = self._logged_channel.unary_unary( 

507 "/google.firestore.v1.Firestore/BeginTransaction", 

508 request_serializer=firestore.BeginTransactionRequest.serialize, 

509 response_deserializer=firestore.BeginTransactionResponse.deserialize, 

510 ) 

511 return self._stubs["begin_transaction"] 

512 

513 @property 

514 def commit( 

515 self, 

516 ) -> Callable[[firestore.CommitRequest], Awaitable[firestore.CommitResponse]]: 

517 r"""Return a callable for the commit method over gRPC. 

518 

519 Commits a transaction, while optionally updating 

520 documents. 

521 

522 Returns: 

523 Callable[[~.CommitRequest], 

524 Awaitable[~.CommitResponse]]: 

525 A function that, when called, will call the underlying RPC 

526 on the server. 

527 """ 

528 # Generate a "stub function" on-the-fly which will actually make 

529 # the request. 

530 # gRPC handles serialization and deserialization, so we just need 

531 # to pass in the functions for each. 

532 if "commit" not in self._stubs: 

533 self._stubs["commit"] = self._logged_channel.unary_unary( 

534 "/google.firestore.v1.Firestore/Commit", 

535 request_serializer=firestore.CommitRequest.serialize, 

536 response_deserializer=firestore.CommitResponse.deserialize, 

537 ) 

538 return self._stubs["commit"] 

539 

540 @property 

541 def rollback( 

542 self, 

543 ) -> Callable[[firestore.RollbackRequest], Awaitable[empty_pb2.Empty]]: 

544 r"""Return a callable for the rollback method over gRPC. 

545 

546 Rolls back a transaction. 

547 

548 Returns: 

549 Callable[[~.RollbackRequest], 

550 Awaitable[~.Empty]]: 

551 A function that, when called, will call the underlying RPC 

552 on the server. 

553 """ 

554 # Generate a "stub function" on-the-fly which will actually make 

555 # the request. 

556 # gRPC handles serialization and deserialization, so we just need 

557 # to pass in the functions for each. 

558 if "rollback" not in self._stubs: 

559 self._stubs["rollback"] = self._logged_channel.unary_unary( 

560 "/google.firestore.v1.Firestore/Rollback", 

561 request_serializer=firestore.RollbackRequest.serialize, 

562 response_deserializer=empty_pb2.Empty.FromString, 

563 ) 

564 return self._stubs["rollback"] 

565 

566 @property 

567 def run_query( 

568 self, 

569 ) -> Callable[[firestore.RunQueryRequest], Awaitable[firestore.RunQueryResponse]]: 

570 r"""Return a callable for the run query method over gRPC. 

571 

572 Runs a query. 

573 

574 Returns: 

575 Callable[[~.RunQueryRequest], 

576 Awaitable[~.RunQueryResponse]]: 

577 A function that, when called, will call the underlying RPC 

578 on the server. 

579 """ 

580 # Generate a "stub function" on-the-fly which will actually make 

581 # the request. 

582 # gRPC handles serialization and deserialization, so we just need 

583 # to pass in the functions for each. 

584 if "run_query" not in self._stubs: 

585 self._stubs["run_query"] = self._logged_channel.unary_stream( 

586 "/google.firestore.v1.Firestore/RunQuery", 

587 request_serializer=firestore.RunQueryRequest.serialize, 

588 response_deserializer=firestore.RunQueryResponse.deserialize, 

589 ) 

590 return self._stubs["run_query"] 

591 

592 @property 

593 def execute_pipeline( 

594 self, 

595 ) -> Callable[ 

596 [firestore.ExecutePipelineRequest], Awaitable[firestore.ExecutePipelineResponse] 

597 ]: 

598 r"""Return a callable for the execute pipeline method over gRPC. 

599 

600 Executes a pipeline query. 

601 

602 Returns: 

603 Callable[[~.ExecutePipelineRequest], 

604 Awaitable[~.ExecutePipelineResponse]]: 

605 A function that, when called, will call the underlying RPC 

606 on the server. 

607 """ 

608 # Generate a "stub function" on-the-fly which will actually make 

609 # the request. 

610 # gRPC handles serialization and deserialization, so we just need 

611 # to pass in the functions for each. 

612 if "execute_pipeline" not in self._stubs: 

613 self._stubs["execute_pipeline"] = self._logged_channel.unary_stream( 

614 "/google.firestore.v1.Firestore/ExecutePipeline", 

615 request_serializer=firestore.ExecutePipelineRequest.serialize, 

616 response_deserializer=firestore.ExecutePipelineResponse.deserialize, 

617 ) 

618 return self._stubs["execute_pipeline"] 

619 

620 @property 

621 def run_aggregation_query( 

622 self, 

623 ) -> Callable[ 

624 [firestore.RunAggregationQueryRequest], 

625 Awaitable[firestore.RunAggregationQueryResponse], 

626 ]: 

627 r"""Return a callable for the run aggregation query method over gRPC. 

628 

629 Runs an aggregation query. 

630 

631 Rather than producing [Document][google.firestore.v1.Document] 

632 results like 

633 [Firestore.RunQuery][google.firestore.v1.Firestore.RunQuery], 

634 this API allows running an aggregation to produce a series of 

635 [AggregationResult][google.firestore.v1.AggregationResult] 

636 server-side. 

637 

638 High-Level Example: 

639 

640 :: 

641 

642 -- Return the number of documents in table given a filter. 

643 SELECT COUNT(*) FROM ( SELECT * FROM k where a = true ); 

644 

645 Returns: 

646 Callable[[~.RunAggregationQueryRequest], 

647 Awaitable[~.RunAggregationQueryResponse]]: 

648 A function that, when called, will call the underlying RPC 

649 on the server. 

650 """ 

651 # Generate a "stub function" on-the-fly which will actually make 

652 # the request. 

653 # gRPC handles serialization and deserialization, so we just need 

654 # to pass in the functions for each. 

655 if "run_aggregation_query" not in self._stubs: 

656 self._stubs["run_aggregation_query"] = self._logged_channel.unary_stream( 

657 "/google.firestore.v1.Firestore/RunAggregationQuery", 

658 request_serializer=firestore.RunAggregationQueryRequest.serialize, 

659 response_deserializer=firestore.RunAggregationQueryResponse.deserialize, 

660 ) 

661 return self._stubs["run_aggregation_query"] 

662 

663 @property 

664 def partition_query( 

665 self, 

666 ) -> Callable[ 

667 [firestore.PartitionQueryRequest], Awaitable[firestore.PartitionQueryResponse] 

668 ]: 

669 r"""Return a callable for the partition query method over gRPC. 

670 

671 Partitions a query by returning partition cursors 

672 that can be used to run the query in parallel. The 

673 returned partition cursors are split points that can be 

674 used by RunQuery as starting/end points for the query 

675 results. 

676 

677 Returns: 

678 Callable[[~.PartitionQueryRequest], 

679 Awaitable[~.PartitionQueryResponse]]: 

680 A function that, when called, will call the underlying RPC 

681 on the server. 

682 """ 

683 # Generate a "stub function" on-the-fly which will actually make 

684 # the request. 

685 # gRPC handles serialization and deserialization, so we just need 

686 # to pass in the functions for each. 

687 if "partition_query" not in self._stubs: 

688 self._stubs["partition_query"] = self._logged_channel.unary_unary( 

689 "/google.firestore.v1.Firestore/PartitionQuery", 

690 request_serializer=firestore.PartitionQueryRequest.serialize, 

691 response_deserializer=firestore.PartitionQueryResponse.deserialize, 

692 ) 

693 return self._stubs["partition_query"] 

694 

695 @property 

696 def write( 

697 self, 

698 ) -> Callable[[firestore.WriteRequest], Awaitable[firestore.WriteResponse]]: 

699 r"""Return a callable for the write method over gRPC. 

700 

701 Streams batches of document updates and deletes, in 

702 order. This method is only available via gRPC or 

703 WebChannel (not REST). 

704 

705 Returns: 

706 Callable[[~.WriteRequest], 

707 Awaitable[~.WriteResponse]]: 

708 A function that, when called, will call the underlying RPC 

709 on the server. 

710 """ 

711 # Generate a "stub function" on-the-fly which will actually make 

712 # the request. 

713 # gRPC handles serialization and deserialization, so we just need 

714 # to pass in the functions for each. 

715 if "write" not in self._stubs: 

716 self._stubs["write"] = self._logged_channel.stream_stream( 

717 "/google.firestore.v1.Firestore/Write", 

718 request_serializer=firestore.WriteRequest.serialize, 

719 response_deserializer=firestore.WriteResponse.deserialize, 

720 ) 

721 return self._stubs["write"] 

722 

723 @property 

724 def listen( 

725 self, 

726 ) -> Callable[[firestore.ListenRequest], Awaitable[firestore.ListenResponse]]: 

727 r"""Return a callable for the listen method over gRPC. 

728 

729 Listens to changes. This method is only available via 

730 gRPC or WebChannel (not REST). 

731 

732 Returns: 

733 Callable[[~.ListenRequest], 

734 Awaitable[~.ListenResponse]]: 

735 A function that, when called, will call the underlying RPC 

736 on the server. 

737 """ 

738 # Generate a "stub function" on-the-fly which will actually make 

739 # the request. 

740 # gRPC handles serialization and deserialization, so we just need 

741 # to pass in the functions for each. 

742 if "listen" not in self._stubs: 

743 self._stubs["listen"] = self._logged_channel.stream_stream( 

744 "/google.firestore.v1.Firestore/Listen", 

745 request_serializer=firestore.ListenRequest.serialize, 

746 response_deserializer=firestore.ListenResponse.deserialize, 

747 ) 

748 return self._stubs["listen"] 

749 

750 @property 

751 def list_collection_ids( 

752 self, 

753 ) -> Callable[ 

754 [firestore.ListCollectionIdsRequest], 

755 Awaitable[firestore.ListCollectionIdsResponse], 

756 ]: 

757 r"""Return a callable for the list collection ids method over gRPC. 

758 

759 Lists all the collection IDs underneath a document. 

760 

761 Returns: 

762 Callable[[~.ListCollectionIdsRequest], 

763 Awaitable[~.ListCollectionIdsResponse]]: 

764 A function that, when called, will call the underlying RPC 

765 on the server. 

766 """ 

767 # Generate a "stub function" on-the-fly which will actually make 

768 # the request. 

769 # gRPC handles serialization and deserialization, so we just need 

770 # to pass in the functions for each. 

771 if "list_collection_ids" not in self._stubs: 

772 self._stubs["list_collection_ids"] = self._logged_channel.unary_unary( 

773 "/google.firestore.v1.Firestore/ListCollectionIds", 

774 request_serializer=firestore.ListCollectionIdsRequest.serialize, 

775 response_deserializer=firestore.ListCollectionIdsResponse.deserialize, 

776 ) 

777 return self._stubs["list_collection_ids"] 

778 

779 @property 

780 def batch_write( 

781 self, 

782 ) -> Callable[ 

783 [firestore.BatchWriteRequest], Awaitable[firestore.BatchWriteResponse] 

784 ]: 

785 r"""Return a callable for the batch write method over gRPC. 

786 

787 Applies a batch of write operations. 

788 

789 The BatchWrite method does not apply the write operations 

790 atomically and can apply them out of order. Method does not 

791 allow more than one write per document. Each write succeeds or 

792 fails independently. See the 

793 [BatchWriteResponse][google.firestore.v1.BatchWriteResponse] for 

794 the success status of each write. 

795 

796 If you require an atomically applied set of writes, use 

797 [Commit][google.firestore.v1.Firestore.Commit] instead. 

798 

799 Returns: 

800 Callable[[~.BatchWriteRequest], 

801 Awaitable[~.BatchWriteResponse]]: 

802 A function that, when called, will call the underlying RPC 

803 on the server. 

804 """ 

805 # Generate a "stub function" on-the-fly which will actually make 

806 # the request. 

807 # gRPC handles serialization and deserialization, so we just need 

808 # to pass in the functions for each. 

809 if "batch_write" not in self._stubs: 

810 self._stubs["batch_write"] = self._logged_channel.unary_unary( 

811 "/google.firestore.v1.Firestore/BatchWrite", 

812 request_serializer=firestore.BatchWriteRequest.serialize, 

813 response_deserializer=firestore.BatchWriteResponse.deserialize, 

814 ) 

815 return self._stubs["batch_write"] 

816 

817 @property 

818 def create_document( 

819 self, 

820 ) -> Callable[[firestore.CreateDocumentRequest], Awaitable[document.Document]]: 

821 r"""Return a callable for the create document method over gRPC. 

822 

823 Creates a new document. 

824 

825 Returns: 

826 Callable[[~.CreateDocumentRequest], 

827 Awaitable[~.Document]]: 

828 A function that, when called, will call the underlying RPC 

829 on the server. 

830 """ 

831 # Generate a "stub function" on-the-fly which will actually make 

832 # the request. 

833 # gRPC handles serialization and deserialization, so we just need 

834 # to pass in the functions for each. 

835 if "create_document" not in self._stubs: 

836 self._stubs["create_document"] = self._logged_channel.unary_unary( 

837 "/google.firestore.v1.Firestore/CreateDocument", 

838 request_serializer=firestore.CreateDocumentRequest.serialize, 

839 response_deserializer=document.Document.deserialize, 

840 ) 

841 return self._stubs["create_document"] 

842 

843 def _prep_wrapped_messages(self, client_info): 

844 """Precompute the wrapped methods, overriding the base class method to use async wrappers.""" 

845 self._wrapped_methods = { 

846 self.get_document: self._wrap_method( 

847 self.get_document, 

848 default_retry=retries.AsyncRetry( 

849 initial=0.1, 

850 maximum=60.0, 

851 multiplier=1.3, 

852 predicate=retries.if_exception_type( 

853 core_exceptions.DeadlineExceeded, 

854 core_exceptions.InternalServerError, 

855 core_exceptions.ResourceExhausted, 

856 core_exceptions.ServiceUnavailable, 

857 ), 

858 deadline=60.0, 

859 ), 

860 default_timeout=60.0, 

861 client_info=client_info, 

862 ), 

863 self.list_documents: self._wrap_method( 

864 self.list_documents, 

865 default_retry=retries.AsyncRetry( 

866 initial=0.1, 

867 maximum=60.0, 

868 multiplier=1.3, 

869 predicate=retries.if_exception_type( 

870 core_exceptions.DeadlineExceeded, 

871 core_exceptions.InternalServerError, 

872 core_exceptions.ResourceExhausted, 

873 core_exceptions.ServiceUnavailable, 

874 ), 

875 deadline=60.0, 

876 ), 

877 default_timeout=60.0, 

878 client_info=client_info, 

879 ), 

880 self.update_document: self._wrap_method( 

881 self.update_document, 

882 default_retry=retries.AsyncRetry( 

883 initial=0.1, 

884 maximum=60.0, 

885 multiplier=1.3, 

886 predicate=retries.if_exception_type( 

887 core_exceptions.ResourceExhausted, 

888 core_exceptions.ServiceUnavailable, 

889 ), 

890 deadline=60.0, 

891 ), 

892 default_timeout=60.0, 

893 client_info=client_info, 

894 ), 

895 self.delete_document: self._wrap_method( 

896 self.delete_document, 

897 default_retry=retries.AsyncRetry( 

898 initial=0.1, 

899 maximum=60.0, 

900 multiplier=1.3, 

901 predicate=retries.if_exception_type( 

902 core_exceptions.DeadlineExceeded, 

903 core_exceptions.InternalServerError, 

904 core_exceptions.ResourceExhausted, 

905 core_exceptions.ServiceUnavailable, 

906 ), 

907 deadline=60.0, 

908 ), 

909 default_timeout=60.0, 

910 client_info=client_info, 

911 ), 

912 self.batch_get_documents: self._wrap_method( 

913 self.batch_get_documents, 

914 default_retry=retries.AsyncRetry( 

915 initial=0.1, 

916 maximum=60.0, 

917 multiplier=1.3, 

918 predicate=retries.if_exception_type( 

919 core_exceptions.DeadlineExceeded, 

920 core_exceptions.InternalServerError, 

921 core_exceptions.ResourceExhausted, 

922 core_exceptions.ServiceUnavailable, 

923 ), 

924 deadline=300.0, 

925 ), 

926 default_timeout=300.0, 

927 client_info=client_info, 

928 ), 

929 self.begin_transaction: self._wrap_method( 

930 self.begin_transaction, 

931 default_retry=retries.AsyncRetry( 

932 initial=0.1, 

933 maximum=60.0, 

934 multiplier=1.3, 

935 predicate=retries.if_exception_type( 

936 core_exceptions.DeadlineExceeded, 

937 core_exceptions.InternalServerError, 

938 core_exceptions.ResourceExhausted, 

939 core_exceptions.ServiceUnavailable, 

940 ), 

941 deadline=60.0, 

942 ), 

943 default_timeout=60.0, 

944 client_info=client_info, 

945 ), 

946 self.commit: self._wrap_method( 

947 self.commit, 

948 default_retry=retries.AsyncRetry( 

949 initial=0.1, 

950 maximum=60.0, 

951 multiplier=1.3, 

952 predicate=retries.if_exception_type( 

953 core_exceptions.ResourceExhausted, 

954 core_exceptions.ServiceUnavailable, 

955 ), 

956 deadline=60.0, 

957 ), 

958 default_timeout=60.0, 

959 client_info=client_info, 

960 ), 

961 self.rollback: self._wrap_method( 

962 self.rollback, 

963 default_retry=retries.AsyncRetry( 

964 initial=0.1, 

965 maximum=60.0, 

966 multiplier=1.3, 

967 predicate=retries.if_exception_type( 

968 core_exceptions.DeadlineExceeded, 

969 core_exceptions.InternalServerError, 

970 core_exceptions.ResourceExhausted, 

971 core_exceptions.ServiceUnavailable, 

972 ), 

973 deadline=60.0, 

974 ), 

975 default_timeout=60.0, 

976 client_info=client_info, 

977 ), 

978 self.run_query: self._wrap_method( 

979 self.run_query, 

980 default_retry=retries.AsyncRetry( 

981 initial=0.1, 

982 maximum=60.0, 

983 multiplier=1.3, 

984 predicate=retries.if_exception_type( 

985 core_exceptions.DeadlineExceeded, 

986 core_exceptions.InternalServerError, 

987 core_exceptions.ResourceExhausted, 

988 core_exceptions.ServiceUnavailable, 

989 ), 

990 deadline=300.0, 

991 ), 

992 default_timeout=300.0, 

993 client_info=client_info, 

994 ), 

995 self.execute_pipeline: self._wrap_method( 

996 self.execute_pipeline, 

997 default_retry=retries.AsyncRetry( 

998 initial=0.1, 

999 maximum=60.0, 

1000 multiplier=1.3, 

1001 predicate=retries.if_exception_type( 

1002 core_exceptions.DeadlineExceeded, 

1003 core_exceptions.InternalServerError, 

1004 core_exceptions.ResourceExhausted, 

1005 core_exceptions.ServiceUnavailable, 

1006 ), 

1007 deadline=300.0, 

1008 ), 

1009 default_timeout=300.0, 

1010 client_info=client_info, 

1011 ), 

1012 self.run_aggregation_query: self._wrap_method( 

1013 self.run_aggregation_query, 

1014 default_retry=retries.AsyncRetry( 

1015 initial=0.1, 

1016 maximum=60.0, 

1017 multiplier=1.3, 

1018 predicate=retries.if_exception_type( 

1019 core_exceptions.DeadlineExceeded, 

1020 core_exceptions.InternalServerError, 

1021 core_exceptions.ResourceExhausted, 

1022 core_exceptions.ServiceUnavailable, 

1023 ), 

1024 deadline=300.0, 

1025 ), 

1026 default_timeout=300.0, 

1027 client_info=client_info, 

1028 ), 

1029 self.partition_query: self._wrap_method( 

1030 self.partition_query, 

1031 default_retry=retries.AsyncRetry( 

1032 initial=0.1, 

1033 maximum=60.0, 

1034 multiplier=1.3, 

1035 predicate=retries.if_exception_type( 

1036 core_exceptions.DeadlineExceeded, 

1037 core_exceptions.InternalServerError, 

1038 core_exceptions.ResourceExhausted, 

1039 core_exceptions.ServiceUnavailable, 

1040 ), 

1041 deadline=300.0, 

1042 ), 

1043 default_timeout=300.0, 

1044 client_info=client_info, 

1045 ), 

1046 self.write: self._wrap_method( 

1047 self.write, 

1048 default_timeout=86400.0, 

1049 client_info=client_info, 

1050 ), 

1051 self.listen: self._wrap_method( 

1052 self.listen, 

1053 default_retry=retries.AsyncRetry( 

1054 initial=0.1, 

1055 maximum=60.0, 

1056 multiplier=1.3, 

1057 predicate=retries.if_exception_type( 

1058 core_exceptions.DeadlineExceeded, 

1059 core_exceptions.InternalServerError, 

1060 core_exceptions.ResourceExhausted, 

1061 core_exceptions.ServiceUnavailable, 

1062 ), 

1063 deadline=86400.0, 

1064 ), 

1065 default_timeout=86400.0, 

1066 client_info=client_info, 

1067 ), 

1068 self.list_collection_ids: self._wrap_method( 

1069 self.list_collection_ids, 

1070 default_retry=retries.AsyncRetry( 

1071 initial=0.1, 

1072 maximum=60.0, 

1073 multiplier=1.3, 

1074 predicate=retries.if_exception_type( 

1075 core_exceptions.DeadlineExceeded, 

1076 core_exceptions.InternalServerError, 

1077 core_exceptions.ResourceExhausted, 

1078 core_exceptions.ServiceUnavailable, 

1079 ), 

1080 deadline=60.0, 

1081 ), 

1082 default_timeout=60.0, 

1083 client_info=client_info, 

1084 ), 

1085 self.batch_write: self._wrap_method( 

1086 self.batch_write, 

1087 default_retry=retries.AsyncRetry( 

1088 initial=0.1, 

1089 maximum=60.0, 

1090 multiplier=1.3, 

1091 predicate=retries.if_exception_type( 

1092 core_exceptions.Aborted, 

1093 core_exceptions.ResourceExhausted, 

1094 core_exceptions.ServiceUnavailable, 

1095 ), 

1096 deadline=60.0, 

1097 ), 

1098 default_timeout=60.0, 

1099 client_info=client_info, 

1100 ), 

1101 self.create_document: self._wrap_method( 

1102 self.create_document, 

1103 default_retry=retries.AsyncRetry( 

1104 initial=0.1, 

1105 maximum=60.0, 

1106 multiplier=1.3, 

1107 predicate=retries.if_exception_type( 

1108 core_exceptions.ResourceExhausted, 

1109 core_exceptions.ServiceUnavailable, 

1110 ), 

1111 deadline=60.0, 

1112 ), 

1113 default_timeout=60.0, 

1114 client_info=client_info, 

1115 ), 

1116 self.cancel_operation: self._wrap_method( 

1117 self.cancel_operation, 

1118 default_timeout=None, 

1119 client_info=client_info, 

1120 ), 

1121 self.delete_operation: self._wrap_method( 

1122 self.delete_operation, 

1123 default_timeout=None, 

1124 client_info=client_info, 

1125 ), 

1126 self.get_operation: self._wrap_method( 

1127 self.get_operation, 

1128 default_timeout=None, 

1129 client_info=client_info, 

1130 ), 

1131 self.list_operations: self._wrap_method( 

1132 self.list_operations, 

1133 default_timeout=None, 

1134 client_info=client_info, 

1135 ), 

1136 } 

1137 

1138 def _wrap_method(self, func, *args, **kwargs): 

1139 if self._wrap_with_kind: # pragma: NO COVER 

1140 kwargs["kind"] = self.kind 

1141 return gapic_v1.method_async.wrap_method(func, *args, **kwargs) 

1142 

1143 def close(self): 

1144 return self._logged_channel.close() 

1145 

1146 @property 

1147 def kind(self) -> str: 

1148 return "grpc_asyncio" 

1149 

1150 @property 

1151 def delete_operation( 

1152 self, 

1153 ) -> Callable[[operations_pb2.DeleteOperationRequest], None]: 

1154 r"""Return a callable for the delete_operation method over gRPC.""" 

1155 # Generate a "stub function" on-the-fly which will actually make 

1156 # the request. 

1157 # gRPC handles serialization and deserialization, so we just need 

1158 # to pass in the functions for each. 

1159 if "delete_operation" not in self._stubs: 

1160 self._stubs["delete_operation"] = self._logged_channel.unary_unary( 

1161 "/google.longrunning.Operations/DeleteOperation", 

1162 request_serializer=operations_pb2.DeleteOperationRequest.SerializeToString, 

1163 response_deserializer=None, 

1164 ) 

1165 return self._stubs["delete_operation"] 

1166 

1167 @property 

1168 def cancel_operation( 

1169 self, 

1170 ) -> Callable[[operations_pb2.CancelOperationRequest], None]: 

1171 r"""Return a callable for the cancel_operation method over gRPC.""" 

1172 # Generate a "stub function" on-the-fly which will actually make 

1173 # the request. 

1174 # gRPC handles serialization and deserialization, so we just need 

1175 # to pass in the functions for each. 

1176 if "cancel_operation" not in self._stubs: 

1177 self._stubs["cancel_operation"] = self._logged_channel.unary_unary( 

1178 "/google.longrunning.Operations/CancelOperation", 

1179 request_serializer=operations_pb2.CancelOperationRequest.SerializeToString, 

1180 response_deserializer=None, 

1181 ) 

1182 return self._stubs["cancel_operation"] 

1183 

1184 @property 

1185 def get_operation( 

1186 self, 

1187 ) -> Callable[[operations_pb2.GetOperationRequest], operations_pb2.Operation]: 

1188 r"""Return a callable for the get_operation method over gRPC.""" 

1189 # Generate a "stub function" on-the-fly which will actually make 

1190 # the request. 

1191 # gRPC handles serialization and deserialization, so we just need 

1192 # to pass in the functions for each. 

1193 if "get_operation" not in self._stubs: 

1194 self._stubs["get_operation"] = self._logged_channel.unary_unary( 

1195 "/google.longrunning.Operations/GetOperation", 

1196 request_serializer=operations_pb2.GetOperationRequest.SerializeToString, 

1197 response_deserializer=operations_pb2.Operation.FromString, 

1198 ) 

1199 return self._stubs["get_operation"] 

1200 

1201 @property 

1202 def list_operations( 

1203 self, 

1204 ) -> Callable[ 

1205 [operations_pb2.ListOperationsRequest], operations_pb2.ListOperationsResponse 

1206 ]: 

1207 r"""Return a callable for the list_operations method over gRPC.""" 

1208 # Generate a "stub function" on-the-fly which will actually make 

1209 # the request. 

1210 # gRPC handles serialization and deserialization, so we just need 

1211 # to pass in the functions for each. 

1212 if "list_operations" not in self._stubs: 

1213 self._stubs["list_operations"] = self._logged_channel.unary_unary( 

1214 "/google.longrunning.Operations/ListOperations", 

1215 request_serializer=operations_pb2.ListOperationsRequest.SerializeToString, 

1216 response_deserializer=operations_pb2.ListOperationsResponse.FromString, 

1217 ) 

1218 return self._stubs["list_operations"] 

1219 

1220 

1221__all__ = ("FirestoreGrpcAsyncIOTransport",)