1# Copyright 2025 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""
15.. warning::
16 **Preview API**: Firestore Pipelines is currently in preview and is
17 subject to potential breaking changes in future releases
18"""
19
20from __future__ import annotations
21from typing import TYPE_CHECKING
22from google.cloud.firestore_v1 import pipeline_stages as stages
23from google.cloud.firestore_v1.base_pipeline import _BasePipeline
24from google.cloud.firestore_v1.pipeline_result import AsyncPipelineStream
25from google.cloud.firestore_v1.pipeline_result import PipelineSnapshot
26from google.cloud.firestore_v1.pipeline_result import PipelineResult
27
28if TYPE_CHECKING: # pragma: NO COVER
29 import datetime
30 from google.cloud.firestore_v1.async_client import AsyncClient
31 from google.cloud.firestore_v1.async_transaction import AsyncTransaction
32 from google.cloud.firestore_v1.pipeline_expressions import Constant
33 from google.cloud.firestore_v1.types.document import Value
34 from google.cloud.firestore_v1.query_profile import PipelineExplainOptions
35
36
37class AsyncPipeline(_BasePipeline):
38 """
39 Pipelines allow for complex data transformations and queries involving
40 multiple stages like filtering, projection, aggregation, and vector search.
41
42 This class extends `_BasePipeline` and provides methods to execute the
43 defined pipeline stages using an asynchronous `AsyncClient`.
44
45 Usage Example:
46 >>> from google.cloud.firestore_v1.pipeline_expressions import Field
47 >>>
48 >>> async def run_pipeline():
49 ... client = AsyncClient(...)
50 ... pipeline = client.pipeline()
51 ... .collection("books")
52 ... .where(Field.of("published").gt(1980))
53 ... .select("title", "author")
54 ... async for result in pipeline.stream():
55 ... print(result)
56
57 Use `client.pipeline()` to create instances of this class.
58
59 .. warning::
60 **Preview API**: Firestore Pipelines is currently in preview and is
61 subject to potential breaking changes in future releases
62 """
63
64 def __init__(self, client: AsyncClient, *stages: stages.Stage):
65 """
66 Initializes an asynchronous Pipeline.
67
68 Args:
69 client: The asynchronous `AsyncClient` instance to use for execution.
70 *stages: Initial stages for the pipeline.
71 """
72 super().__init__(client, *stages)
73
74 async def execute(
75 self,
76 *,
77 transaction: "AsyncTransaction" | None = None,
78 read_time: datetime.datetime | None = None,
79 explain_options: PipelineExplainOptions | None = None,
80 additional_options: dict[str, Value | Constant] = {},
81 ) -> PipelineSnapshot[PipelineResult]:
82 """
83 Executes this pipeline and returns results as a list
84
85 Args:
86 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
87 An existing transaction that this query will run in.
88 If a ``transaction`` is used and it already has write operations
89 added, this method cannot be used (i.e. read-after-write is not
90 allowed).
91 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
92 time. This must be a microsecond precision timestamp within the past one hour, or
93 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
94 within the past 7 days. For the most accurate results, use UTC timezone.
95 explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]):
96 Options to enable query profiling for this query. When set,
97 explain_metrics will be available on the returned list.
98 additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
99 These options will take precedence over method argument if there is a conflict (e.g. explain_options)
100 """
101 kwargs = {k: v for k, v in locals().items() if k != "self"}
102 stream = AsyncPipelineStream(PipelineResult, self, **kwargs)
103 results = [result async for result in stream]
104 return PipelineSnapshot(results, stream)
105
106 def stream(
107 self,
108 *,
109 read_time: datetime.datetime | None = None,
110 transaction: "AsyncTransaction" | None = None,
111 explain_options: PipelineExplainOptions | None = None,
112 additional_options: dict[str, Value | Constant] = {},
113 ) -> AsyncPipelineStream[PipelineResult]:
114 """
115 Process this pipeline as a stream, providing results through an AsyncIterable
116
117 Args:
118 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
119 An existing transaction that this query will run in.
120 If a ``transaction`` is used and it already has write operations
121 added, this method cannot be used (i.e. read-after-write is not
122 allowed).
123 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
124 time. This must be a microsecond precision timestamp within the past one hour, or
125 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
126 within the past 7 days. For the most accurate results, use UTC timezone.
127 explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]):
128 Options to enable query profiling for this query. When set,
129 explain_metrics will be available on the returned generator.
130 additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
131 These options will take precedence over method argument if there is a conflict (e.g. explain_options)
132 """
133 kwargs = {k: v for k, v in locals().items() if k != "self"}
134 return AsyncPipelineStream(PipelineResult, self, **kwargs)