1# Copyright 2017 Google LLC All rights reserved. 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Helpers for applying Google Cloud Firestore changes in a transaction.""" 
    16from __future__ import annotations 
    17 
    18from typing import ( 
    19    TYPE_CHECKING, 
    20    Any, 
    21    AsyncGenerator, 
    22    Coroutine, 
    23    Generator, 
    24    Optional, 
    25    Union, 
    26) 
    27 
    28from google.api_core import retry as retries 
    29 
    30from google.cloud.firestore_v1 import types 
    31 
    32# Types needed only for Type Hints 
    33if TYPE_CHECKING:  # pragma: NO COVER 
    34    from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator 
    35    from google.cloud.firestore_v1.document import DocumentSnapshot 
    36    from google.cloud.firestore_v1.query_profile import ExplainOptions 
    37    from google.cloud.firestore_v1.stream_generator import StreamGenerator 
    38    from google.cloud.firestore_v1.types import write as write_pb 
    39 
    40    import datetime 
    41 
    42 
    43MAX_ATTEMPTS = 5 
    44"""int: Default number of transaction attempts (with retries).""" 
    45_CANT_BEGIN: str = "The transaction has already begun. Current transaction ID: {!r}." 
    46_MISSING_ID_TEMPLATE: str = "The transaction has no transaction ID, so it cannot be {}." 
    47_CANT_ROLLBACK: str = _MISSING_ID_TEMPLATE.format("rolled back") 
    48_CANT_COMMIT: str = _MISSING_ID_TEMPLATE.format("committed") 
    49_WRITE_READ_ONLY: str = "Cannot perform write operation in read-only transaction." 
    50_EXCEED_ATTEMPTS_TEMPLATE: str = "Failed to commit transaction in {:d} attempts." 
    51_CANT_RETRY_READ_ONLY: str = "Only read-write transactions can be retried." 
    52 
    53 
    54class BaseTransaction(object): 
    55    """Accumulate read-and-write operations to be sent in a transaction. 
    56 
    57    Args: 
    58        max_attempts (Optional[int]): The maximum number of attempts for 
    59            the transaction (i.e. allowing retries). Defaults to 
    60            :attr:`~google.cloud.firestore_v1.transaction.MAX_ATTEMPTS`. 
    61        read_only (Optional[bool]): Flag indicating if the transaction 
    62            should be read-only or should allow writes. Defaults to 
    63            :data:`False`. 
    64    """ 
    65 
    66    def __init__(self, max_attempts=MAX_ATTEMPTS, read_only=False) -> None: 
    67        self._max_attempts = max_attempts 
    68        self._read_only = read_only 
    69        self._id = None 
    70 
    71    def _add_write_pbs(self, write_pbs: list[write_pb.Write]): 
    72        raise NotImplementedError 
    73 
    74    def _options_protobuf( 
    75        self, retry_id: Union[bytes, None] 
    76    ) -> Optional[types.common.TransactionOptions]: 
    77        """Convert the current object to protobuf. 
    78 
    79        The ``retry_id`` value is used when retrying a transaction that 
    80        failed (e.g. due to contention). It is intended to be the "first" 
    81        transaction that failed (i.e. if multiple retries are needed). 
    82 
    83        Args: 
    84            retry_id (Union[bytes, NoneType]): Transaction ID of a transaction 
    85                to be retried. 
    86 
    87        Returns: 
    88            Optional[google.cloud.firestore_v1.types.TransactionOptions]: 
    89            The protobuf ``TransactionOptions`` if ``read_only==True`` or if 
    90            there is a transaction ID to be retried, else :data:`None`. 
    91 
    92        Raises: 
    93            ValueError: If ``retry_id`` is not :data:`None` but the 
    94                transaction is read-only. 
    95        """ 
    96        if retry_id is not None: 
    97            if self._read_only: 
    98                raise ValueError(_CANT_RETRY_READ_ONLY) 
    99 
    100            return types.TransactionOptions( 
    101                read_write=types.TransactionOptions.ReadWrite( 
    102                    retry_transaction=retry_id 
    103                ) 
    104            ) 
    105        elif self._read_only: 
    106            return types.TransactionOptions( 
    107                read_only=types.TransactionOptions.ReadOnly() 
    108            ) 
    109        else: 
    110            return None 
    111 
    112    @property 
    113    def in_progress(self): 
    114        """Determine if this transaction has already begun. 
    115 
    116        Returns: 
    117            bool: Indicates if the transaction has started. 
    118        """ 
    119        return self._id is not None 
    120 
    121    @property 
    122    def id(self): 
    123        """Get the current transaction ID. 
    124 
    125        Returns: 
    126            Optional[bytes]: The transaction ID (or :data:`None` if the 
    127            current transaction is not in progress). 
    128        """ 
    129        return self._id 
    130 
    131    def _clean_up(self) -> None: 
    132        """Clean up the instance after :meth:`_rollback`` or :meth:`_commit``. 
    133 
    134        This intended to occur on success or failure of the associated RPCs. 
    135        """ 
    136        self._write_pbs: list[write_pb.Write] = [] 
    137        self._id = None 
    138 
    139    def _begin(self, retry_id=None): 
    140        raise NotImplementedError 
    141 
    142    def _rollback(self): 
    143        raise NotImplementedError 
    144 
    145    def _commit(self) -> Union[list, Coroutine[Any, Any, list]]: 
    146        raise NotImplementedError 
    147 
    148    def get_all( 
    149        self, 
    150        references: list, 
    151        retry: retries.Retry | retries.AsyncRetry | object | None = None, 
    152        timeout: float | None = None, 
    153        *, 
    154        read_time: datetime.datetime | None = None, 
    155    ) -> ( 
    156        Generator[DocumentSnapshot, Any, None] 
    157        | Coroutine[Any, Any, AsyncGenerator[DocumentSnapshot, Any]] 
    158    ): 
    159        raise NotImplementedError 
    160 
    161    def get( 
    162        self, 
    163        ref_or_query, 
    164        retry: retries.Retry | retries.AsyncRetry | object | None = None, 
    165        timeout: float | None = None, 
    166        *, 
    167        explain_options: Optional[ExplainOptions] = None, 
    168        read_time: Optional[datetime.datetime] = None, 
    169    ) -> ( 
    170        StreamGenerator[DocumentSnapshot] 
    171        | Generator[DocumentSnapshot, Any, None] 
    172        | Coroutine[Any, Any, AsyncGenerator[DocumentSnapshot, Any]] 
    173        | Coroutine[Any, Any, AsyncStreamGenerator[DocumentSnapshot]] 
    174    ): 
    175        raise NotImplementedError 
    176 
    177 
    178class _BaseTransactional(object): 
    179    """Provide a callable object to use as a transactional decorater. 
    180 
    181    This is surfaced via 
    182    :func:`~google.cloud.firestore_v1.transaction.transactional`. 
    183 
    184    Args: 
    185        to_wrap (Callable[[:class:`~google.cloud.firestore_v1.transaction.Transaction`, ...], Any]): 
    186            A callable that should be run (and retried) in a transaction. 
    187    """ 
    188 
    189    def __init__(self, to_wrap) -> None: 
    190        self.to_wrap = to_wrap 
    191        self.current_id = None 
    192        """Optional[bytes]: The current transaction ID.""" 
    193        self.retry_id = None 
    194        """Optional[bytes]: The ID of the first attempted transaction.""" 
    195 
    196    def _reset(self) -> None: 
    197        """Unset the transaction IDs.""" 
    198        self.current_id = None 
    199        self.retry_id = None 
    200 
    201    def _pre_commit(self, transaction, *args, **kwargs): 
    202        raise NotImplementedError 
    203 
    204    def __call__(self, transaction, *args, **kwargs): 
    205        raise NotImplementedError