Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/firestore_v1/services/firestore/transports/grpc_asyncio.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

179 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import inspect 

17import json 

18import pickle 

19import logging as std_logging 

20import warnings 

21from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union 

22 

23from google.api_core import gapic_v1 

24from google.api_core import grpc_helpers_async 

25from google.api_core import exceptions as core_exceptions 

26from google.api_core import retry_async as retries 

27from google.auth import credentials as ga_credentials # type: ignore 

28from google.auth.transport.grpc import SslCredentials # type: ignore 

29from google.protobuf.json_format import MessageToJson 

30import google.protobuf.message 

31 

32import grpc # type: ignore 

33import proto # type: ignore 

34from grpc.experimental import aio # type: ignore 

35 

36from google.cloud.firestore_v1.types import document 

37from google.cloud.firestore_v1.types import document as gf_document 

38from google.cloud.firestore_v1.types import firestore 

39from google.cloud.location import locations_pb2 # type: ignore 

40from google.longrunning import operations_pb2 # type: ignore 

41from google.protobuf import empty_pb2 # type: ignore 

42from .base import FirestoreTransport, DEFAULT_CLIENT_INFO 

43from .grpc import FirestoreGrpcTransport 

44 

45try: 

46 from google.api_core import client_logging # type: ignore 

47 

48 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

49except ImportError: # pragma: NO COVER 

50 CLIENT_LOGGING_SUPPORTED = False 

51 

52_LOGGER = std_logging.getLogger(__name__) 

53 

54 

55class _LoggingClientAIOInterceptor( 

56 grpc.aio.UnaryUnaryClientInterceptor 

57): # pragma: NO COVER 

58 async def intercept_unary_unary(self, continuation, client_call_details, request): 

59 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

60 std_logging.DEBUG 

61 ) 

62 if logging_enabled: # pragma: NO COVER 

63 request_metadata = client_call_details.metadata 

64 if isinstance(request, proto.Message): 

65 request_payload = type(request).to_json(request) 

66 elif isinstance(request, google.protobuf.message.Message): 

67 request_payload = MessageToJson(request) 

68 else: 

69 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" 

70 

71 request_metadata = { 

72 key: value.decode("utf-8") if isinstance(value, bytes) else value 

73 for key, value in request_metadata 

74 } 

75 grpc_request = { 

76 "payload": request_payload, 

77 "requestMethod": "grpc", 

78 "metadata": dict(request_metadata), 

79 } 

80 _LOGGER.debug( 

81 f"Sending request for {client_call_details.method}", 

82 extra={ 

83 "serviceName": "google.firestore.v1.Firestore", 

84 "rpcName": str(client_call_details.method), 

85 "request": grpc_request, 

86 "metadata": grpc_request["metadata"], 

87 }, 

88 ) 

89 response = await continuation(client_call_details, request) 

90 if logging_enabled: # pragma: NO COVER 

91 response_metadata = await response.trailing_metadata() 

92 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples 

93 metadata = ( 

94 dict([(k, str(v)) for k, v in response_metadata]) 

95 if response_metadata 

96 else None 

97 ) 

98 result = await response 

99 if isinstance(result, proto.Message): 

100 response_payload = type(result).to_json(result) 

101 elif isinstance(result, google.protobuf.message.Message): 

102 response_payload = MessageToJson(result) 

103 else: 

104 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}" 

105 grpc_response = { 

106 "payload": response_payload, 

107 "metadata": metadata, 

108 "status": "OK", 

109 } 

110 _LOGGER.debug( 

111 f"Received response to rpc {client_call_details.method}.", 

112 extra={ 

113 "serviceName": "google.firestore.v1.Firestore", 

114 "rpcName": str(client_call_details.method), 

115 "response": grpc_response, 

116 "metadata": grpc_response["metadata"], 

117 }, 

118 ) 

119 return response 

120 

121 

122class FirestoreGrpcAsyncIOTransport(FirestoreTransport): 

123 """gRPC AsyncIO backend transport for Firestore. 

124 

125 The Cloud Firestore service. 

126 

127 Cloud Firestore is a fast, fully managed, serverless, 

128 cloud-native NoSQL document database that simplifies storing, 

129 syncing, and querying data for your mobile, web, and IoT apps at 

130 global scale. Its client libraries provide live synchronization 

131 and offline support, while its security features and 

132 integrations with Firebase and Google Cloud Platform accelerate 

133 building truly serverless apps. 

134 

135 This class defines the same methods as the primary client, so the 

136 primary client can load the underlying transport implementation 

137 and call it. 

138 

139 It sends protocol buffers over the wire using gRPC (which is built on 

140 top of HTTP/2); the ``grpcio`` package must be installed. 

141 """ 

142 

143 _grpc_channel: aio.Channel 

144 _stubs: Dict[str, Callable] = {} 

145 

146 @classmethod 

147 def create_channel( 

148 cls, 

149 host: str = "firestore.googleapis.com", 

150 credentials: Optional[ga_credentials.Credentials] = None, 

151 credentials_file: Optional[str] = None, 

152 scopes: Optional[Sequence[str]] = None, 

153 quota_project_id: Optional[str] = None, 

154 **kwargs, 

155 ) -> aio.Channel: 

156 """Create and return a gRPC AsyncIO channel object. 

157 Args: 

158 host (Optional[str]): The host for the channel to use. 

159 credentials (Optional[~.Credentials]): The 

160 authorization credentials to attach to requests. These 

161 credentials identify this application to the service. If 

162 none are specified, the client will attempt to ascertain 

163 the credentials from the environment. 

164 credentials_file (Optional[str]): A file with credentials that can 

165 be loaded with :func:`google.auth.load_credentials_from_file`. 

166 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

167 service. These are only used when credentials are not specified and 

168 are passed to :func:`google.auth.default`. 

169 quota_project_id (Optional[str]): An optional project to use for billing 

170 and quota. 

171 kwargs (Optional[dict]): Keyword arguments, which are passed to the 

172 channel creation. 

173 Returns: 

174 aio.Channel: A gRPC AsyncIO channel object. 

175 """ 

176 

177 return grpc_helpers_async.create_channel( 

178 host, 

179 credentials=credentials, 

180 credentials_file=credentials_file, 

181 quota_project_id=quota_project_id, 

182 default_scopes=cls.AUTH_SCOPES, 

183 scopes=scopes, 

184 default_host=cls.DEFAULT_HOST, 

185 **kwargs, 

186 ) 

187 

188 def __init__( 

189 self, 

190 *, 

191 host: str = "firestore.googleapis.com", 

192 credentials: Optional[ga_credentials.Credentials] = None, 

193 credentials_file: Optional[str] = None, 

194 scopes: Optional[Sequence[str]] = None, 

195 channel: Optional[Union[aio.Channel, Callable[..., aio.Channel]]] = None, 

196 api_mtls_endpoint: Optional[str] = None, 

197 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

198 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 

199 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

200 quota_project_id: Optional[str] = None, 

201 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

202 always_use_jwt_access: Optional[bool] = False, 

203 api_audience: Optional[str] = None, 

204 ) -> None: 

205 """Instantiate the transport. 

206 

207 Args: 

208 host (Optional[str]): 

209 The hostname to connect to (default: 'firestore.googleapis.com'). 

210 credentials (Optional[google.auth.credentials.Credentials]): The 

211 authorization credentials to attach to requests. These 

212 credentials identify the application to the service; if none 

213 are specified, the client will attempt to ascertain the 

214 credentials from the environment. 

215 This argument is ignored if a ``channel`` instance is provided. 

216 credentials_file (Optional[str]): A file with credentials that can 

217 be loaded with :func:`google.auth.load_credentials_from_file`. 

218 This argument is ignored if a ``channel`` instance is provided. 

219 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

220 service. These are only used when credentials are not specified and 

221 are passed to :func:`google.auth.default`. 

222 channel (Optional[Union[aio.Channel, Callable[..., aio.Channel]]]): 

223 A ``Channel`` instance through which to make calls, or a Callable 

224 that constructs and returns one. If set to None, ``self.create_channel`` 

225 is used to create the channel. If a Callable is given, it will be called 

226 with the same arguments as used in ``self.create_channel``. 

227 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 

228 If provided, it overrides the ``host`` argument and tries to create 

229 a mutual TLS channel with client SSL credentials from 

230 ``client_cert_source`` or application default SSL credentials. 

231 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 

232 Deprecated. A callback to provide client SSL certificate bytes and 

233 private key bytes, both in PEM format. It is ignored if 

234 ``api_mtls_endpoint`` is None. 

235 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 

236 for the grpc channel. It is ignored if a ``channel`` instance is provided. 

237 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 

238 A callback to provide client certificate bytes and private key bytes, 

239 both in PEM format. It is used to configure a mutual TLS channel. It is 

240 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided. 

241 quota_project_id (Optional[str]): An optional project to use for billing 

242 and quota. 

243 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

244 The client info used to send a user-agent string along with 

245 API requests. If ``None``, then default info will be used. 

246 Generally, you only need to set this if you're developing 

247 your own client library. 

248 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

249 be used for service account credentials. 

250 

251 Raises: 

252 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport 

253 creation failed for any reason. 

254 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

255 and ``credentials_file`` are passed. 

256 """ 

257 self._grpc_channel = None 

258 self._ssl_channel_credentials = ssl_channel_credentials 

259 self._stubs: Dict[str, Callable] = {} 

260 

261 if api_mtls_endpoint: 

262 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 

263 if client_cert_source: 

264 warnings.warn("client_cert_source is deprecated", DeprecationWarning) 

265 

266 if isinstance(channel, aio.Channel): 

267 # Ignore credentials if a channel was passed. 

268 credentials = None 

269 self._ignore_credentials = True 

270 # If a channel was explicitly provided, set it. 

271 self._grpc_channel = channel 

272 self._ssl_channel_credentials = None 

273 else: 

274 if api_mtls_endpoint: 

275 host = api_mtls_endpoint 

276 

277 # Create SSL credentials with client_cert_source or application 

278 # default SSL credentials. 

279 if client_cert_source: 

280 cert, key = client_cert_source() 

281 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

282 certificate_chain=cert, private_key=key 

283 ) 

284 else: 

285 self._ssl_channel_credentials = SslCredentials().ssl_credentials 

286 

287 else: 

288 if client_cert_source_for_mtls and not ssl_channel_credentials: 

289 cert, key = client_cert_source_for_mtls() 

290 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

291 certificate_chain=cert, private_key=key 

292 ) 

293 

294 # The base transport sets the host, credentials and scopes 

295 super().__init__( 

296 host=host, 

297 credentials=credentials, 

298 credentials_file=credentials_file, 

299 scopes=scopes, 

300 quota_project_id=quota_project_id, 

301 client_info=client_info, 

302 always_use_jwt_access=always_use_jwt_access, 

303 api_audience=api_audience, 

304 ) 

305 

306 if not self._grpc_channel: 

307 # initialize with the provided callable or the default channel 

308 channel_init = channel or type(self).create_channel 

309 self._grpc_channel = channel_init( 

310 self._host, 

311 # use the credentials which are saved 

312 credentials=self._credentials, 

313 # Set ``credentials_file`` to ``None`` here as 

314 # the credentials that we saved earlier should be used. 

315 credentials_file=None, 

316 scopes=self._scopes, 

317 ssl_credentials=self._ssl_channel_credentials, 

318 quota_project_id=quota_project_id, 

319 options=[ 

320 ("grpc.max_send_message_length", -1), 

321 ("grpc.max_receive_message_length", -1), 

322 ], 

323 ) 

324 

325 self._interceptor = _LoggingClientAIOInterceptor() 

326 self._grpc_channel._unary_unary_interceptors.append(self._interceptor) 

327 self._logged_channel = self._grpc_channel 

328 self._wrap_with_kind = ( 

329 "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters 

330 ) 

331 # Wrap messages. This must be done after self._logged_channel exists 

332 self._prep_wrapped_messages(client_info) 

333 

334 @property 

335 def grpc_channel(self) -> aio.Channel: 

336 """Create the channel designed to connect to this service. 

337 

338 This property caches on the instance; repeated calls return 

339 the same channel. 

340 """ 

341 # Return the channel from cache. 

342 return self._grpc_channel 

343 

344 @property 

345 def get_document( 

346 self, 

347 ) -> Callable[[firestore.GetDocumentRequest], Awaitable[document.Document]]: 

348 r"""Return a callable for the get document method over gRPC. 

349 

350 Gets a single document. 

351 

352 Returns: 

353 Callable[[~.GetDocumentRequest], 

354 Awaitable[~.Document]]: 

355 A function that, when called, will call the underlying RPC 

356 on the server. 

357 """ 

358 # Generate a "stub function" on-the-fly which will actually make 

359 # the request. 

360 # gRPC handles serialization and deserialization, so we just need 

361 # to pass in the functions for each. 

362 if "get_document" not in self._stubs: 

363 self._stubs["get_document"] = self._logged_channel.unary_unary( 

364 "/google.firestore.v1.Firestore/GetDocument", 

365 request_serializer=firestore.GetDocumentRequest.serialize, 

366 response_deserializer=document.Document.deserialize, 

367 ) 

368 return self._stubs["get_document"] 

369 

370 @property 

371 def list_documents( 

372 self, 

373 ) -> Callable[ 

374 [firestore.ListDocumentsRequest], Awaitable[firestore.ListDocumentsResponse] 

375 ]: 

376 r"""Return a callable for the list documents method over gRPC. 

377 

378 Lists documents. 

379 

380 Returns: 

381 Callable[[~.ListDocumentsRequest], 

382 Awaitable[~.ListDocumentsResponse]]: 

383 A function that, when called, will call the underlying RPC 

384 on the server. 

385 """ 

386 # Generate a "stub function" on-the-fly which will actually make 

387 # the request. 

388 # gRPC handles serialization and deserialization, so we just need 

389 # to pass in the functions for each. 

390 if "list_documents" not in self._stubs: 

391 self._stubs["list_documents"] = self._logged_channel.unary_unary( 

392 "/google.firestore.v1.Firestore/ListDocuments", 

393 request_serializer=firestore.ListDocumentsRequest.serialize, 

394 response_deserializer=firestore.ListDocumentsResponse.deserialize, 

395 ) 

396 return self._stubs["list_documents"] 

397 

398 @property 

399 def update_document( 

400 self, 

401 ) -> Callable[[firestore.UpdateDocumentRequest], Awaitable[gf_document.Document]]: 

402 r"""Return a callable for the update document method over gRPC. 

403 

404 Updates or inserts a document. 

405 

406 Returns: 

407 Callable[[~.UpdateDocumentRequest], 

408 Awaitable[~.Document]]: 

409 A function that, when called, will call the underlying RPC 

410 on the server. 

411 """ 

412 # Generate a "stub function" on-the-fly which will actually make 

413 # the request. 

414 # gRPC handles serialization and deserialization, so we just need 

415 # to pass in the functions for each. 

416 if "update_document" not in self._stubs: 

417 self._stubs["update_document"] = self._logged_channel.unary_unary( 

418 "/google.firestore.v1.Firestore/UpdateDocument", 

419 request_serializer=firestore.UpdateDocumentRequest.serialize, 

420 response_deserializer=gf_document.Document.deserialize, 

421 ) 

422 return self._stubs["update_document"] 

423 

424 @property 

425 def delete_document( 

426 self, 

427 ) -> Callable[[firestore.DeleteDocumentRequest], Awaitable[empty_pb2.Empty]]: 

428 r"""Return a callable for the delete document method over gRPC. 

429 

430 Deletes a document. 

431 

432 Returns: 

433 Callable[[~.DeleteDocumentRequest], 

434 Awaitable[~.Empty]]: 

435 A function that, when called, will call the underlying RPC 

436 on the server. 

437 """ 

438 # Generate a "stub function" on-the-fly which will actually make 

439 # the request. 

440 # gRPC handles serialization and deserialization, so we just need 

441 # to pass in the functions for each. 

442 if "delete_document" not in self._stubs: 

443 self._stubs["delete_document"] = self._logged_channel.unary_unary( 

444 "/google.firestore.v1.Firestore/DeleteDocument", 

445 request_serializer=firestore.DeleteDocumentRequest.serialize, 

446 response_deserializer=empty_pb2.Empty.FromString, 

447 ) 

448 return self._stubs["delete_document"] 

449 

450 @property 

451 def batch_get_documents( 

452 self, 

453 ) -> Callable[ 

454 [firestore.BatchGetDocumentsRequest], 

455 Awaitable[firestore.BatchGetDocumentsResponse], 

456 ]: 

457 r"""Return a callable for the batch get documents method over gRPC. 

458 

459 Gets multiple documents. 

460 

461 Documents returned by this method are not guaranteed to 

462 be returned in the same order that they were requested. 

463 

464 Returns: 

465 Callable[[~.BatchGetDocumentsRequest], 

466 Awaitable[~.BatchGetDocumentsResponse]]: 

467 A function that, when called, will call the underlying RPC 

468 on the server. 

469 """ 

470 # Generate a "stub function" on-the-fly which will actually make 

471 # the request. 

472 # gRPC handles serialization and deserialization, so we just need 

473 # to pass in the functions for each. 

474 if "batch_get_documents" not in self._stubs: 

475 self._stubs["batch_get_documents"] = self._logged_channel.unary_stream( 

476 "/google.firestore.v1.Firestore/BatchGetDocuments", 

477 request_serializer=firestore.BatchGetDocumentsRequest.serialize, 

478 response_deserializer=firestore.BatchGetDocumentsResponse.deserialize, 

479 ) 

480 return self._stubs["batch_get_documents"] 

481 

482 @property 

483 def begin_transaction( 

484 self, 

485 ) -> Callable[ 

486 [firestore.BeginTransactionRequest], 

487 Awaitable[firestore.BeginTransactionResponse], 

488 ]: 

489 r"""Return a callable for the begin transaction method over gRPC. 

490 

491 Starts a new transaction. 

492 

493 Returns: 

494 Callable[[~.BeginTransactionRequest], 

495 Awaitable[~.BeginTransactionResponse]]: 

496 A function that, when called, will call the underlying RPC 

497 on the server. 

498 """ 

499 # Generate a "stub function" on-the-fly which will actually make 

500 # the request. 

501 # gRPC handles serialization and deserialization, so we just need 

502 # to pass in the functions for each. 

503 if "begin_transaction" not in self._stubs: 

504 self._stubs["begin_transaction"] = self._logged_channel.unary_unary( 

505 "/google.firestore.v1.Firestore/BeginTransaction", 

506 request_serializer=firestore.BeginTransactionRequest.serialize, 

507 response_deserializer=firestore.BeginTransactionResponse.deserialize, 

508 ) 

509 return self._stubs["begin_transaction"] 

510 

511 @property 

512 def commit( 

513 self, 

514 ) -> Callable[[firestore.CommitRequest], Awaitable[firestore.CommitResponse]]: 

515 r"""Return a callable for the commit method over gRPC. 

516 

517 Commits a transaction, while optionally updating 

518 documents. 

519 

520 Returns: 

521 Callable[[~.CommitRequest], 

522 Awaitable[~.CommitResponse]]: 

523 A function that, when called, will call the underlying RPC 

524 on the server. 

525 """ 

526 # Generate a "stub function" on-the-fly which will actually make 

527 # the request. 

528 # gRPC handles serialization and deserialization, so we just need 

529 # to pass in the functions for each. 

530 if "commit" not in self._stubs: 

531 self._stubs["commit"] = self._logged_channel.unary_unary( 

532 "/google.firestore.v1.Firestore/Commit", 

533 request_serializer=firestore.CommitRequest.serialize, 

534 response_deserializer=firestore.CommitResponse.deserialize, 

535 ) 

536 return self._stubs["commit"] 

537 

538 @property 

539 def rollback( 

540 self, 

541 ) -> Callable[[firestore.RollbackRequest], Awaitable[empty_pb2.Empty]]: 

542 r"""Return a callable for the rollback method over gRPC. 

543 

544 Rolls back a transaction. 

545 

546 Returns: 

547 Callable[[~.RollbackRequest], 

548 Awaitable[~.Empty]]: 

549 A function that, when called, will call the underlying RPC 

550 on the server. 

551 """ 

552 # Generate a "stub function" on-the-fly which will actually make 

553 # the request. 

554 # gRPC handles serialization and deserialization, so we just need 

555 # to pass in the functions for each. 

556 if "rollback" not in self._stubs: 

557 self._stubs["rollback"] = self._logged_channel.unary_unary( 

558 "/google.firestore.v1.Firestore/Rollback", 

559 request_serializer=firestore.RollbackRequest.serialize, 

560 response_deserializer=empty_pb2.Empty.FromString, 

561 ) 

562 return self._stubs["rollback"] 

563 

564 @property 

565 def run_query( 

566 self, 

567 ) -> Callable[[firestore.RunQueryRequest], Awaitable[firestore.RunQueryResponse]]: 

568 r"""Return a callable for the run query method over gRPC. 

569 

570 Runs a query. 

571 

572 Returns: 

573 Callable[[~.RunQueryRequest], 

574 Awaitable[~.RunQueryResponse]]: 

575 A function that, when called, will call the underlying RPC 

576 on the server. 

577 """ 

578 # Generate a "stub function" on-the-fly which will actually make 

579 # the request. 

580 # gRPC handles serialization and deserialization, so we just need 

581 # to pass in the functions for each. 

582 if "run_query" not in self._stubs: 

583 self._stubs["run_query"] = self._logged_channel.unary_stream( 

584 "/google.firestore.v1.Firestore/RunQuery", 

585 request_serializer=firestore.RunQueryRequest.serialize, 

586 response_deserializer=firestore.RunQueryResponse.deserialize, 

587 ) 

588 return self._stubs["run_query"] 

589 

590 @property 

591 def run_aggregation_query( 

592 self, 

593 ) -> Callable[ 

594 [firestore.RunAggregationQueryRequest], 

595 Awaitable[firestore.RunAggregationQueryResponse], 

596 ]: 

597 r"""Return a callable for the run aggregation query method over gRPC. 

598 

599 Runs an aggregation query. 

600 

601 Rather than producing [Document][google.firestore.v1.Document] 

602 results like 

603 [Firestore.RunQuery][google.firestore.v1.Firestore.RunQuery], 

604 this API allows running an aggregation to produce a series of 

605 [AggregationResult][google.firestore.v1.AggregationResult] 

606 server-side. 

607 

608 High-Level Example: 

609 

610 :: 

611 

612 -- Return the number of documents in table given a filter. 

613 SELECT COUNT(*) FROM ( SELECT * FROM k where a = true ); 

614 

615 Returns: 

616 Callable[[~.RunAggregationQueryRequest], 

617 Awaitable[~.RunAggregationQueryResponse]]: 

618 A function that, when called, will call the underlying RPC 

619 on the server. 

620 """ 

621 # Generate a "stub function" on-the-fly which will actually make 

622 # the request. 

623 # gRPC handles serialization and deserialization, so we just need 

624 # to pass in the functions for each. 

625 if "run_aggregation_query" not in self._stubs: 

626 self._stubs["run_aggregation_query"] = self._logged_channel.unary_stream( 

627 "/google.firestore.v1.Firestore/RunAggregationQuery", 

628 request_serializer=firestore.RunAggregationQueryRequest.serialize, 

629 response_deserializer=firestore.RunAggregationQueryResponse.deserialize, 

630 ) 

631 return self._stubs["run_aggregation_query"] 

632 

633 @property 

634 def partition_query( 

635 self, 

636 ) -> Callable[ 

637 [firestore.PartitionQueryRequest], Awaitable[firestore.PartitionQueryResponse] 

638 ]: 

639 r"""Return a callable for the partition query method over gRPC. 

640 

641 Partitions a query by returning partition cursors 

642 that can be used to run the query in parallel. The 

643 returned partition cursors are split points that can be 

644 used by RunQuery as starting/end points for the query 

645 results. 

646 

647 Returns: 

648 Callable[[~.PartitionQueryRequest], 

649 Awaitable[~.PartitionQueryResponse]]: 

650 A function that, when called, will call the underlying RPC 

651 on the server. 

652 """ 

653 # Generate a "stub function" on-the-fly which will actually make 

654 # the request. 

655 # gRPC handles serialization and deserialization, so we just need 

656 # to pass in the functions for each. 

657 if "partition_query" not in self._stubs: 

658 self._stubs["partition_query"] = self._logged_channel.unary_unary( 

659 "/google.firestore.v1.Firestore/PartitionQuery", 

660 request_serializer=firestore.PartitionQueryRequest.serialize, 

661 response_deserializer=firestore.PartitionQueryResponse.deserialize, 

662 ) 

663 return self._stubs["partition_query"] 

664 

665 @property 

666 def write( 

667 self, 

668 ) -> Callable[[firestore.WriteRequest], Awaitable[firestore.WriteResponse]]: 

669 r"""Return a callable for the write method over gRPC. 

670 

671 Streams batches of document updates and deletes, in 

672 order. This method is only available via gRPC or 

673 WebChannel (not REST). 

674 

675 Returns: 

676 Callable[[~.WriteRequest], 

677 Awaitable[~.WriteResponse]]: 

678 A function that, when called, will call the underlying RPC 

679 on the server. 

680 """ 

681 # Generate a "stub function" on-the-fly which will actually make 

682 # the request. 

683 # gRPC handles serialization and deserialization, so we just need 

684 # to pass in the functions for each. 

685 if "write" not in self._stubs: 

686 self._stubs["write"] = self._logged_channel.stream_stream( 

687 "/google.firestore.v1.Firestore/Write", 

688 request_serializer=firestore.WriteRequest.serialize, 

689 response_deserializer=firestore.WriteResponse.deserialize, 

690 ) 

691 return self._stubs["write"] 

692 

693 @property 

694 def listen( 

695 self, 

696 ) -> Callable[[firestore.ListenRequest], Awaitable[firestore.ListenResponse]]: 

697 r"""Return a callable for the listen method over gRPC. 

698 

699 Listens to changes. This method is only available via 

700 gRPC or WebChannel (not REST). 

701 

702 Returns: 

703 Callable[[~.ListenRequest], 

704 Awaitable[~.ListenResponse]]: 

705 A function that, when called, will call the underlying RPC 

706 on the server. 

707 """ 

708 # Generate a "stub function" on-the-fly which will actually make 

709 # the request. 

710 # gRPC handles serialization and deserialization, so we just need 

711 # to pass in the functions for each. 

712 if "listen" not in self._stubs: 

713 self._stubs["listen"] = self._logged_channel.stream_stream( 

714 "/google.firestore.v1.Firestore/Listen", 

715 request_serializer=firestore.ListenRequest.serialize, 

716 response_deserializer=firestore.ListenResponse.deserialize, 

717 ) 

718 return self._stubs["listen"] 

719 

720 @property 

721 def list_collection_ids( 

722 self, 

723 ) -> Callable[ 

724 [firestore.ListCollectionIdsRequest], 

725 Awaitable[firestore.ListCollectionIdsResponse], 

726 ]: 

727 r"""Return a callable for the list collection ids method over gRPC. 

728 

729 Lists all the collection IDs underneath a document. 

730 

731 Returns: 

732 Callable[[~.ListCollectionIdsRequest], 

733 Awaitable[~.ListCollectionIdsResponse]]: 

734 A function that, when called, will call the underlying RPC 

735 on the server. 

736 """ 

737 # Generate a "stub function" on-the-fly which will actually make 

738 # the request. 

739 # gRPC handles serialization and deserialization, so we just need 

740 # to pass in the functions for each. 

741 if "list_collection_ids" not in self._stubs: 

742 self._stubs["list_collection_ids"] = self._logged_channel.unary_unary( 

743 "/google.firestore.v1.Firestore/ListCollectionIds", 

744 request_serializer=firestore.ListCollectionIdsRequest.serialize, 

745 response_deserializer=firestore.ListCollectionIdsResponse.deserialize, 

746 ) 

747 return self._stubs["list_collection_ids"] 

748 

749 @property 

750 def batch_write( 

751 self, 

752 ) -> Callable[ 

753 [firestore.BatchWriteRequest], Awaitable[firestore.BatchWriteResponse] 

754 ]: 

755 r"""Return a callable for the batch write method over gRPC. 

756 

757 Applies a batch of write operations. 

758 

759 The BatchWrite method does not apply the write operations 

760 atomically and can apply them out of order. Method does not 

761 allow more than one write per document. Each write succeeds or 

762 fails independently. See the 

763 [BatchWriteResponse][google.firestore.v1.BatchWriteResponse] for 

764 the success status of each write. 

765 

766 If you require an atomically applied set of writes, use 

767 [Commit][google.firestore.v1.Firestore.Commit] instead. 

768 

769 Returns: 

770 Callable[[~.BatchWriteRequest], 

771 Awaitable[~.BatchWriteResponse]]: 

772 A function that, when called, will call the underlying RPC 

773 on the server. 

774 """ 

775 # Generate a "stub function" on-the-fly which will actually make 

776 # the request. 

777 # gRPC handles serialization and deserialization, so we just need 

778 # to pass in the functions for each. 

779 if "batch_write" not in self._stubs: 

780 self._stubs["batch_write"] = self._logged_channel.unary_unary( 

781 "/google.firestore.v1.Firestore/BatchWrite", 

782 request_serializer=firestore.BatchWriteRequest.serialize, 

783 response_deserializer=firestore.BatchWriteResponse.deserialize, 

784 ) 

785 return self._stubs["batch_write"] 

786 

787 @property 

788 def create_document( 

789 self, 

790 ) -> Callable[[firestore.CreateDocumentRequest], Awaitable[document.Document]]: 

791 r"""Return a callable for the create document method over gRPC. 

792 

793 Creates a new document. 

794 

795 Returns: 

796 Callable[[~.CreateDocumentRequest], 

797 Awaitable[~.Document]]: 

798 A function that, when called, will call the underlying RPC 

799 on the server. 

800 """ 

801 # Generate a "stub function" on-the-fly which will actually make 

802 # the request. 

803 # gRPC handles serialization and deserialization, so we just need 

804 # to pass in the functions for each. 

805 if "create_document" not in self._stubs: 

806 self._stubs["create_document"] = self._logged_channel.unary_unary( 

807 "/google.firestore.v1.Firestore/CreateDocument", 

808 request_serializer=firestore.CreateDocumentRequest.serialize, 

809 response_deserializer=document.Document.deserialize, 

810 ) 

811 return self._stubs["create_document"] 

812 

813 def _prep_wrapped_messages(self, client_info): 

814 """Precompute the wrapped methods, overriding the base class method to use async wrappers.""" 

815 self._wrapped_methods = { 

816 self.get_document: self._wrap_method( 

817 self.get_document, 

818 default_retry=retries.AsyncRetry( 

819 initial=0.1, 

820 maximum=60.0, 

821 multiplier=1.3, 

822 predicate=retries.if_exception_type( 

823 core_exceptions.DeadlineExceeded, 

824 core_exceptions.InternalServerError, 

825 core_exceptions.ResourceExhausted, 

826 core_exceptions.ServiceUnavailable, 

827 ), 

828 deadline=60.0, 

829 ), 

830 default_timeout=60.0, 

831 client_info=client_info, 

832 ), 

833 self.list_documents: self._wrap_method( 

834 self.list_documents, 

835 default_retry=retries.AsyncRetry( 

836 initial=0.1, 

837 maximum=60.0, 

838 multiplier=1.3, 

839 predicate=retries.if_exception_type( 

840 core_exceptions.DeadlineExceeded, 

841 core_exceptions.InternalServerError, 

842 core_exceptions.ResourceExhausted, 

843 core_exceptions.ServiceUnavailable, 

844 ), 

845 deadline=60.0, 

846 ), 

847 default_timeout=60.0, 

848 client_info=client_info, 

849 ), 

850 self.update_document: self._wrap_method( 

851 self.update_document, 

852 default_retry=retries.AsyncRetry( 

853 initial=0.1, 

854 maximum=60.0, 

855 multiplier=1.3, 

856 predicate=retries.if_exception_type( 

857 core_exceptions.ResourceExhausted, 

858 core_exceptions.ServiceUnavailable, 

859 ), 

860 deadline=60.0, 

861 ), 

862 default_timeout=60.0, 

863 client_info=client_info, 

864 ), 

865 self.delete_document: self._wrap_method( 

866 self.delete_document, 

867 default_retry=retries.AsyncRetry( 

868 initial=0.1, 

869 maximum=60.0, 

870 multiplier=1.3, 

871 predicate=retries.if_exception_type( 

872 core_exceptions.DeadlineExceeded, 

873 core_exceptions.InternalServerError, 

874 core_exceptions.ResourceExhausted, 

875 core_exceptions.ServiceUnavailable, 

876 ), 

877 deadline=60.0, 

878 ), 

879 default_timeout=60.0, 

880 client_info=client_info, 

881 ), 

882 self.batch_get_documents: self._wrap_method( 

883 self.batch_get_documents, 

884 default_retry=retries.AsyncRetry( 

885 initial=0.1, 

886 maximum=60.0, 

887 multiplier=1.3, 

888 predicate=retries.if_exception_type( 

889 core_exceptions.DeadlineExceeded, 

890 core_exceptions.InternalServerError, 

891 core_exceptions.ResourceExhausted, 

892 core_exceptions.ServiceUnavailable, 

893 ), 

894 deadline=300.0, 

895 ), 

896 default_timeout=300.0, 

897 client_info=client_info, 

898 ), 

899 self.begin_transaction: self._wrap_method( 

900 self.begin_transaction, 

901 default_retry=retries.AsyncRetry( 

902 initial=0.1, 

903 maximum=60.0, 

904 multiplier=1.3, 

905 predicate=retries.if_exception_type( 

906 core_exceptions.DeadlineExceeded, 

907 core_exceptions.InternalServerError, 

908 core_exceptions.ResourceExhausted, 

909 core_exceptions.ServiceUnavailable, 

910 ), 

911 deadline=60.0, 

912 ), 

913 default_timeout=60.0, 

914 client_info=client_info, 

915 ), 

916 self.commit: self._wrap_method( 

917 self.commit, 

918 default_retry=retries.AsyncRetry( 

919 initial=0.1, 

920 maximum=60.0, 

921 multiplier=1.3, 

922 predicate=retries.if_exception_type( 

923 core_exceptions.ResourceExhausted, 

924 core_exceptions.ServiceUnavailable, 

925 ), 

926 deadline=60.0, 

927 ), 

928 default_timeout=60.0, 

929 client_info=client_info, 

930 ), 

931 self.rollback: self._wrap_method( 

932 self.rollback, 

933 default_retry=retries.AsyncRetry( 

934 initial=0.1, 

935 maximum=60.0, 

936 multiplier=1.3, 

937 predicate=retries.if_exception_type( 

938 core_exceptions.DeadlineExceeded, 

939 core_exceptions.InternalServerError, 

940 core_exceptions.ResourceExhausted, 

941 core_exceptions.ServiceUnavailable, 

942 ), 

943 deadline=60.0, 

944 ), 

945 default_timeout=60.0, 

946 client_info=client_info, 

947 ), 

948 self.run_query: self._wrap_method( 

949 self.run_query, 

950 default_retry=retries.AsyncRetry( 

951 initial=0.1, 

952 maximum=60.0, 

953 multiplier=1.3, 

954 predicate=retries.if_exception_type( 

955 core_exceptions.DeadlineExceeded, 

956 core_exceptions.InternalServerError, 

957 core_exceptions.ResourceExhausted, 

958 core_exceptions.ServiceUnavailable, 

959 ), 

960 deadline=300.0, 

961 ), 

962 default_timeout=300.0, 

963 client_info=client_info, 

964 ), 

965 self.run_aggregation_query: self._wrap_method( 

966 self.run_aggregation_query, 

967 default_retry=retries.AsyncRetry( 

968 initial=0.1, 

969 maximum=60.0, 

970 multiplier=1.3, 

971 predicate=retries.if_exception_type( 

972 core_exceptions.DeadlineExceeded, 

973 core_exceptions.InternalServerError, 

974 core_exceptions.ResourceExhausted, 

975 core_exceptions.ServiceUnavailable, 

976 ), 

977 deadline=300.0, 

978 ), 

979 default_timeout=300.0, 

980 client_info=client_info, 

981 ), 

982 self.partition_query: self._wrap_method( 

983 self.partition_query, 

984 default_retry=retries.AsyncRetry( 

985 initial=0.1, 

986 maximum=60.0, 

987 multiplier=1.3, 

988 predicate=retries.if_exception_type( 

989 core_exceptions.DeadlineExceeded, 

990 core_exceptions.InternalServerError, 

991 core_exceptions.ResourceExhausted, 

992 core_exceptions.ServiceUnavailable, 

993 ), 

994 deadline=300.0, 

995 ), 

996 default_timeout=300.0, 

997 client_info=client_info, 

998 ), 

999 self.write: self._wrap_method( 

1000 self.write, 

1001 default_timeout=86400.0, 

1002 client_info=client_info, 

1003 ), 

1004 self.listen: self._wrap_method( 

1005 self.listen, 

1006 default_retry=retries.AsyncRetry( 

1007 initial=0.1, 

1008 maximum=60.0, 

1009 multiplier=1.3, 

1010 predicate=retries.if_exception_type( 

1011 core_exceptions.DeadlineExceeded, 

1012 core_exceptions.InternalServerError, 

1013 core_exceptions.ResourceExhausted, 

1014 core_exceptions.ServiceUnavailable, 

1015 ), 

1016 deadline=86400.0, 

1017 ), 

1018 default_timeout=86400.0, 

1019 client_info=client_info, 

1020 ), 

1021 self.list_collection_ids: self._wrap_method( 

1022 self.list_collection_ids, 

1023 default_retry=retries.AsyncRetry( 

1024 initial=0.1, 

1025 maximum=60.0, 

1026 multiplier=1.3, 

1027 predicate=retries.if_exception_type( 

1028 core_exceptions.DeadlineExceeded, 

1029 core_exceptions.InternalServerError, 

1030 core_exceptions.ResourceExhausted, 

1031 core_exceptions.ServiceUnavailable, 

1032 ), 

1033 deadline=60.0, 

1034 ), 

1035 default_timeout=60.0, 

1036 client_info=client_info, 

1037 ), 

1038 self.batch_write: self._wrap_method( 

1039 self.batch_write, 

1040 default_retry=retries.AsyncRetry( 

1041 initial=0.1, 

1042 maximum=60.0, 

1043 multiplier=1.3, 

1044 predicate=retries.if_exception_type( 

1045 core_exceptions.Aborted, 

1046 core_exceptions.ResourceExhausted, 

1047 core_exceptions.ServiceUnavailable, 

1048 ), 

1049 deadline=60.0, 

1050 ), 

1051 default_timeout=60.0, 

1052 client_info=client_info, 

1053 ), 

1054 self.create_document: self._wrap_method( 

1055 self.create_document, 

1056 default_retry=retries.AsyncRetry( 

1057 initial=0.1, 

1058 maximum=60.0, 

1059 multiplier=1.3, 

1060 predicate=retries.if_exception_type( 

1061 core_exceptions.ResourceExhausted, 

1062 core_exceptions.ServiceUnavailable, 

1063 ), 

1064 deadline=60.0, 

1065 ), 

1066 default_timeout=60.0, 

1067 client_info=client_info, 

1068 ), 

1069 self.cancel_operation: self._wrap_method( 

1070 self.cancel_operation, 

1071 default_timeout=None, 

1072 client_info=client_info, 

1073 ), 

1074 self.delete_operation: self._wrap_method( 

1075 self.delete_operation, 

1076 default_timeout=None, 

1077 client_info=client_info, 

1078 ), 

1079 self.get_operation: self._wrap_method( 

1080 self.get_operation, 

1081 default_timeout=None, 

1082 client_info=client_info, 

1083 ), 

1084 self.list_operations: self._wrap_method( 

1085 self.list_operations, 

1086 default_timeout=None, 

1087 client_info=client_info, 

1088 ), 

1089 } 

1090 

1091 def _wrap_method(self, func, *args, **kwargs): 

1092 if self._wrap_with_kind: # pragma: NO COVER 

1093 kwargs["kind"] = self.kind 

1094 return gapic_v1.method_async.wrap_method(func, *args, **kwargs) 

1095 

1096 def close(self): 

1097 return self._logged_channel.close() 

1098 

1099 @property 

1100 def kind(self) -> str: 

1101 return "grpc_asyncio" 

1102 

1103 @property 

1104 def delete_operation( 

1105 self, 

1106 ) -> Callable[[operations_pb2.DeleteOperationRequest], None]: 

1107 r"""Return a callable for the delete_operation method over gRPC.""" 

1108 # Generate a "stub function" on-the-fly which will actually make 

1109 # the request. 

1110 # gRPC handles serialization and deserialization, so we just need 

1111 # to pass in the functions for each. 

1112 if "delete_operation" not in self._stubs: 

1113 self._stubs["delete_operation"] = self._logged_channel.unary_unary( 

1114 "/google.longrunning.Operations/DeleteOperation", 

1115 request_serializer=operations_pb2.DeleteOperationRequest.SerializeToString, 

1116 response_deserializer=None, 

1117 ) 

1118 return self._stubs["delete_operation"] 

1119 

1120 @property 

1121 def cancel_operation( 

1122 self, 

1123 ) -> Callable[[operations_pb2.CancelOperationRequest], None]: 

1124 r"""Return a callable for the cancel_operation method over gRPC.""" 

1125 # Generate a "stub function" on-the-fly which will actually make 

1126 # the request. 

1127 # gRPC handles serialization and deserialization, so we just need 

1128 # to pass in the functions for each. 

1129 if "cancel_operation" not in self._stubs: 

1130 self._stubs["cancel_operation"] = self._logged_channel.unary_unary( 

1131 "/google.longrunning.Operations/CancelOperation", 

1132 request_serializer=operations_pb2.CancelOperationRequest.SerializeToString, 

1133 response_deserializer=None, 

1134 ) 

1135 return self._stubs["cancel_operation"] 

1136 

1137 @property 

1138 def get_operation( 

1139 self, 

1140 ) -> Callable[[operations_pb2.GetOperationRequest], operations_pb2.Operation]: 

1141 r"""Return a callable for the get_operation method over gRPC.""" 

1142 # Generate a "stub function" on-the-fly which will actually make 

1143 # the request. 

1144 # gRPC handles serialization and deserialization, so we just need 

1145 # to pass in the functions for each. 

1146 if "get_operation" not in self._stubs: 

1147 self._stubs["get_operation"] = self._logged_channel.unary_unary( 

1148 "/google.longrunning.Operations/GetOperation", 

1149 request_serializer=operations_pb2.GetOperationRequest.SerializeToString, 

1150 response_deserializer=operations_pb2.Operation.FromString, 

1151 ) 

1152 return self._stubs["get_operation"] 

1153 

1154 @property 

1155 def list_operations( 

1156 self, 

1157 ) -> Callable[ 

1158 [operations_pb2.ListOperationsRequest], operations_pb2.ListOperationsResponse 

1159 ]: 

1160 r"""Return a callable for the list_operations method over gRPC.""" 

1161 # Generate a "stub function" on-the-fly which will actually make 

1162 # the request. 

1163 # gRPC handles serialization and deserialization, so we just need 

1164 # to pass in the functions for each. 

1165 if "list_operations" not in self._stubs: 

1166 self._stubs["list_operations"] = self._logged_channel.unary_unary( 

1167 "/google.longrunning.Operations/ListOperations", 

1168 request_serializer=operations_pb2.ListOperationsRequest.SerializeToString, 

1169 response_deserializer=operations_pb2.ListOperationsResponse.FromString, 

1170 ) 

1171 return self._stubs["list_operations"] 

1172 

1173 

1174__all__ = ("FirestoreGrpcAsyncIOTransport",)