1# Copyright 2020 Google LLC All rights reserved. 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Client for interacting with the Google Cloud Firestore API. 
    16 
    17This is the base from which all interactions with the API occur. 
    18 
    19In the hierarchy of API concepts 
    20 
    21* a :class:`~google.cloud.firestore_v1.client.Client` owns a 
    22  :class:`~google.cloud.firestore_v1.async_collection.AsyncCollectionReference` 
    23* a :class:`~google.cloud.firestore_v1.client.Client` owns a 
    24  :class:`~google.cloud.firestore_v1.async_document.AsyncDocumentReference` 
    25""" 
    26from __future__ import annotations 
    27 
    28from typing import TYPE_CHECKING, Any, AsyncGenerator, Iterable, List, Optional, Union 
    29 
    30from google.api_core import gapic_v1 
    31from google.api_core import retry_async as retries 
    32 
    33from google.cloud.firestore_v1.async_batch import AsyncWriteBatch 
    34from google.cloud.firestore_v1.async_collection import AsyncCollectionReference 
    35from google.cloud.firestore_v1.async_document import ( 
    36    AsyncDocumentReference, 
    37    DocumentSnapshot, 
    38) 
    39from google.cloud.firestore_v1.async_query import AsyncCollectionGroup 
    40from google.cloud.firestore_v1.async_transaction import AsyncTransaction 
    41from google.cloud.firestore_v1.base_client import _parse_batch_get  # type: ignore 
    42from google.cloud.firestore_v1.base_client import _CLIENT_INFO, BaseClient, _path_helper 
    43from google.cloud.firestore_v1.field_path import FieldPath 
    44from google.cloud.firestore_v1.services.firestore import ( 
    45    async_client as firestore_client, 
    46) 
    47from google.cloud.firestore_v1.services.firestore.transports import ( 
    48    grpc_asyncio as firestore_grpc_transport, 
    49) 
    50 
    51if TYPE_CHECKING:  # pragma: NO COVER 
    52    import datetime 
    53 
    54    from google.cloud.firestore_v1.bulk_writer import BulkWriter 
    55 
    56 
    57class AsyncClient(BaseClient): 
    58    """Client for interacting with Google Cloud Firestore API. 
    59 
    60    .. note:: 
    61 
    62        Since the Cloud Firestore API requires the gRPC transport, no 
    63        ``_http`` argument is accepted by this class. 
    64 
    65    Args: 
    66        project (Optional[str]): The project which the client acts on behalf 
    67            of. If not passed, falls back to the default inferred 
    68            from the environment. 
    69        credentials (Optional[~google.auth.credentials.Credentials]): The 
    70            OAuth2 Credentials to use for this client. If not passed, falls 
    71            back to the default inferred from the environment. 
    72        database (Optional[str]): The database name that the client targets. 
    73            For now, :attr:`DEFAULT_DATABASE` (the default value) is the 
    74            only valid database. 
    75        client_info (Optional[google.api_core.gapic_v1.client_info.ClientInfo]): 
    76            The client info used to send a user-agent string along with API 
    77            requests. If ``None``, then default info will be used. Generally, 
    78            you only need to set this if you're developing your own library 
    79            or partner tool. 
    80        client_options (Union[dict, google.api_core.client_options.ClientOptions]): 
    81            Client options used to set user options on the client. API Endpoint 
    82            should be set through client_options. 
    83    """ 
    84 
    85    def __init__( 
    86        self, 
    87        project=None, 
    88        credentials=None, 
    89        database=None, 
    90        client_info=_CLIENT_INFO, 
    91        client_options=None, 
    92    ) -> None: 
    93        super(AsyncClient, self).__init__( 
    94            project=project, 
    95            credentials=credentials, 
    96            database=database, 
    97            client_info=client_info, 
    98            client_options=client_options, 
    99        ) 
    100 
    101    def _to_sync_copy(self): 
    102        from google.cloud.firestore_v1.client import Client 
    103 
    104        if not getattr(self, "_sync_copy", None): 
    105            self._sync_copy = Client( 
    106                project=self.project, 
    107                credentials=self._credentials, 
    108                database=self._database, 
    109                client_info=self._client_info, 
    110                client_options=self._client_options, 
    111            ) 
    112        return self._sync_copy 
    113 
    114    @property 
    115    def _firestore_api(self): 
    116        """Lazy-loading getter GAPIC Firestore API. 
    117        Returns: 
    118            :class:`~google.cloud.gapic.firestore.v1`.async_firestore_client.FirestoreAsyncClient: 
    119            The GAPIC client with the credentials of the current client. 
    120        """ 
    121        return self._firestore_api_helper( 
    122            firestore_grpc_transport.FirestoreGrpcAsyncIOTransport, 
    123            firestore_client.FirestoreAsyncClient, 
    124            firestore_client, 
    125        ) 
    126 
    127    @property 
    128    def _target(self): 
    129        """Return the target (where the API is). 
    130        Eg. "firestore.googleapis.com" 
    131 
    132        Returns: 
    133            str: The location of the API. 
    134        """ 
    135        return self._target_helper(firestore_client.FirestoreAsyncClient) 
    136 
    137    def collection(self, *collection_path: str) -> AsyncCollectionReference: 
    138        """Get a reference to a collection. 
    139 
    140        For a top-level collection: 
    141 
    142        .. code-block:: python 
    143 
    144            >>> client.collection('top') 
    145 
    146        For a sub-collection: 
    147 
    148        .. code-block:: python 
    149 
    150            >>> client.collection('mydocs/doc/subcol') 
    151            >>> # is the same as 
    152            >>> client.collection('mydocs', 'doc', 'subcol') 
    153 
    154        Sub-collections can be nested deeper in a similar fashion. 
    155 
    156        Args: 
    157            collection_path: Can either be 
    158 
    159                * A single ``/``-delimited path to a collection 
    160                * A tuple of collection path segments 
    161 
    162        Returns: 
    163            :class:`~google.cloud.firestore_v1.async_collection.AsyncCollectionReference`: 
    164            A reference to a collection in the Firestore database. 
    165        """ 
    166        return AsyncCollectionReference(*_path_helper(collection_path), client=self) 
    167 
    168    def collection_group(self, collection_id: str) -> AsyncCollectionGroup: 
    169        """ 
    170        Creates and returns a new AsyncQuery that includes all documents in the 
    171        database that are contained in a collection or subcollection with the 
    172        given collection_id. 
    173 
    174        .. code-block:: python 
    175 
    176            >>> query = client.collection_group('mygroup') 
    177 
    178        Args: 
    179            collection_id (str) Identifies the collections to query over. 
    180 
    181                Every collection or subcollection with this ID as the last segment of its 
    182                path will be included. Cannot contain a slash. 
    183 
    184        Returns: 
    185            :class:`~google.cloud.firestore_v1.async_query.AsyncCollectionGroup`: 
    186            The created AsyncQuery. 
    187        """ 
    188        return AsyncCollectionGroup(self._get_collection_reference(collection_id)) 
    189 
    190    def document(self, *document_path: str) -> AsyncDocumentReference: 
    191        """Get a reference to a document in a collection. 
    192 
    193        For a top-level document: 
    194 
    195        .. code-block:: python 
    196 
    197            >>> client.document('collek/shun') 
    198            >>> # is the same as 
    199            >>> client.document('collek', 'shun') 
    200 
    201        For a document in a sub-collection: 
    202 
    203        .. code-block:: python 
    204 
    205            >>> client.document('mydocs/doc/subcol/child') 
    206            >>> # is the same as 
    207            >>> client.document('mydocs', 'doc', 'subcol', 'child') 
    208 
    209        Documents in sub-collections can be nested deeper in a similar fashion. 
    210 
    211        Args: 
    212            document_path: Can either be 
    213 
    214                * A single ``/``-delimited path to a document 
    215                * A tuple of document path segments 
    216 
    217        Returns: 
    218            :class:`~google.cloud.firestore_v1.document.AsyncDocumentReference`: 
    219            A reference to a document in a collection. 
    220        """ 
    221        return AsyncDocumentReference( 
    222            *self._document_path_helper(*document_path), client=self 
    223        ) 
    224 
    225    async def get_all( 
    226        self, 
    227        references: List[AsyncDocumentReference], 
    228        field_paths: Iterable[str] | None = None, 
    229        transaction: AsyncTransaction | None = None, 
    230        retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT, 
    231        timeout: float | None = None, 
    232        *, 
    233        read_time: datetime.datetime | None = None, 
    234    ) -> AsyncGenerator[DocumentSnapshot, Any]: 
    235        """Retrieve a batch of documents. 
    236 
    237        .. note:: 
    238 
    239           Documents returned by this method are not guaranteed to be 
    240           returned in the same order that they are given in ``references``. 
    241 
    242        .. note:: 
    243 
    244           If multiple ``references`` refer to the same document, the server 
    245           will only return one result. 
    246 
    247        See :meth:`~google.cloud.firestore_v1.client.Client.field_path` for 
    248        more information on **field paths**. 
    249 
    250        If a ``transaction`` is used and it already has write operations 
    251        added, this method cannot be used (i.e. read-after-write is not 
    252        allowed). 
    253 
    254        Args: 
    255            references (List[.AsyncDocumentReference, ...]): Iterable of document 
    256                references to be retrieved. 
    257            field_paths (Optional[Iterable[str, ...]]): An iterable of field 
    258                paths (``.``-delimited list of field names) to use as a 
    259                projection of document fields in the returned results. If 
    260                no value is provided, all fields will be returned. 
    261            transaction (Optional[:class:`~google.cloud.firestore_v1.async_transaction.AsyncTransaction`]): 
    262                An existing transaction that these ``references`` will be 
    263                retrieved in. 
    264            retry (google.api_core.retry.Retry): Designation of what errors, if any, 
    265                should be retried.  Defaults to a system-specified policy. 
    266            timeout (float): The timeout for this request.  Defaults to a 
    267                system-specified value. 
    268            read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 
    269                time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery 
    270                is enabled, can additionally be a whole minute timestamp within the past 7 days. If no 
    271                timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC. 
    272 
    273        Yields: 
    274            .DocumentSnapshot: The next document snapshot that fulfills the 
    275            query, or :data:`None` if the document does not exist. 
    276        """ 
    277        request, reference_map, kwargs = self._prep_get_all( 
    278            references, field_paths, transaction, retry, timeout, read_time 
    279        ) 
    280 
    281        response_iterator = await self._firestore_api.batch_get_documents( 
    282            request=request, 
    283            metadata=self._rpc_metadata, 
    284            **kwargs, 
    285        ) 
    286 
    287        async for get_doc_response in response_iterator: 
    288            yield _parse_batch_get(get_doc_response, reference_map, self) 
    289 
    290    async def collections( 
    291        self, 
    292        retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT, 
    293        timeout: float | None = None, 
    294        *, 
    295        read_time: datetime.datetime | None = None, 
    296    ) -> AsyncGenerator[AsyncCollectionReference, Any]: 
    297        """List top-level collections of the client's database. 
    298 
    299        Args: 
    300            retry (google.api_core.retry.Retry): Designation of what errors, if any, 
    301                should be retried.  Defaults to a system-specified policy. 
    302            timeout (float): The timeout for this request.  Defaults to a 
    303                system-specified value. 
    304            read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 
    305                time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery 
    306                is enabled, can additionally be a whole minute timestamp within the past 7 days. If no 
    307                timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC. 
    308 
    309        Returns: 
    310            Sequence[:class:`~google.cloud.firestore_v1.async_collection.AsyncCollectionReference`]: 
    311                iterator of subcollections of the current document. 
    312        """ 
    313        request, kwargs = self._prep_collections(retry, timeout, read_time) 
    314        iterator = await self._firestore_api.list_collection_ids( 
    315            request=request, 
    316            metadata=self._rpc_metadata, 
    317            **kwargs, 
    318        ) 
    319 
    320        async for collection_id in iterator: 
    321            yield self.collection(collection_id) 
    322 
    323    async def recursive_delete( 
    324        self, 
    325        reference: Union[AsyncCollectionReference, AsyncDocumentReference], 
    326        *, 
    327        bulk_writer: Optional["BulkWriter"] = None, 
    328        chunk_size: int = 5000, 
    329    ) -> int: 
    330        """Deletes documents and their subcollections, regardless of collection 
    331        name. 
    332 
    333        Passing an AsyncCollectionReference leads to each document in the 
    334        collection getting deleted, as well as all of their descendents. 
    335 
    336        Passing an AsyncDocumentReference deletes that one document and all of 
    337        its descendents. 
    338 
    339        Args: 
    340            reference (Union[ 
    341                :class:`@google.cloud.firestore_v1.async_collection.CollectionReference`, 
    342                :class:`@google.cloud.firestore_v1.async_document.DocumentReference`, 
    343            ]) 
    344                The reference to be deleted. 
    345 
    346            bulk_writer (Optional[:class:`@google.cloud.firestore_v1.bulk_writer.BulkWriter`]) 
    347                The BulkWriter used to delete all matching documents. Supply this 
    348                if you want to override the default throttling behavior. 
    349        """ 
    350        if bulk_writer is None: 
    351            bulk_writer = self.bulk_writer() 
    352 
    353        return await self._recursive_delete( 
    354            reference, 
    355            bulk_writer=bulk_writer, 
    356            chunk_size=chunk_size, 
    357        ) 
    358 
    359    async def _recursive_delete( 
    360        self, 
    361        reference: Union[AsyncCollectionReference, AsyncDocumentReference], 
    362        bulk_writer: "BulkWriter", 
    363        *, 
    364        chunk_size: int = 5000, 
    365        depth: int = 0, 
    366    ) -> int: 
    367        """Recursion helper for `recursive_delete.""" 
    368 
    369        num_deleted: int = 0 
    370 
    371        if isinstance(reference, AsyncCollectionReference): 
    372            chunk: List[DocumentSnapshot] 
    373            async for chunk in reference.recursive().select( 
    374                [FieldPath.document_id()] 
    375            )._chunkify(chunk_size): 
    376                doc_snap: DocumentSnapshot 
    377                for doc_snap in chunk: 
    378                    num_deleted += 1 
    379                    bulk_writer.delete(doc_snap.reference) 
    380 
    381        elif isinstance(reference, AsyncDocumentReference): 
    382            col_ref: AsyncCollectionReference 
    383            async for col_ref in reference.collections(): 
    384                num_deleted += await self._recursive_delete( 
    385                    col_ref, 
    386                    bulk_writer=bulk_writer, 
    387                    depth=depth + 1, 
    388                    chunk_size=chunk_size, 
    389                ) 
    390            num_deleted += 1 
    391            bulk_writer.delete(reference) 
    392 
    393        else: 
    394            raise TypeError( 
    395                f"Unexpected type for reference: {reference.__class__.__name__}" 
    396            ) 
    397 
    398        if depth == 0: 
    399            bulk_writer.close() 
    400 
    401        return num_deleted 
    402 
    403    def batch(self) -> AsyncWriteBatch: 
    404        """Get a batch instance from this client. 
    405 
    406        Returns: 
    407            :class:`~google.cloud.firestore_v1.async_batch.AsyncWriteBatch`: 
    408            A "write" batch to be used for accumulating document changes and 
    409            sending the changes all at once. 
    410        """ 
    411        return AsyncWriteBatch(self) 
    412 
    413    def transaction(self, **kwargs) -> AsyncTransaction: 
    414        """Get a transaction that uses this client. 
    415 
    416        See :class:`~google.cloud.firestore_v1.async_transaction.AsyncTransaction` for 
    417        more information on transactions and the constructor arguments. 
    418 
    419        Args: 
    420            kwargs (Dict[str, Any]): The keyword arguments (other than 
    421                ``client``) to pass along to the 
    422                :class:`~google.cloud.firestore_v1.async_transaction.AsyncTransaction` 
    423                constructor. 
    424 
    425        Returns: 
    426            :class:`~google.cloud.firestore_v1.async_transaction.AsyncTransaction`: 
    427            A transaction attached to this client. 
    428        """ 
    429        return AsyncTransaction(self, **kwargs)