1# Copyright 2025 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""
15.. warning::
16 **Preview API**: Firestore Pipelines is currently in preview and is
17 subject to potential breaking changes in future releases.
18"""
19
20from __future__ import annotations
21from typing import TYPE_CHECKING
22from google.cloud.firestore_v1 import pipeline_stages as stages
23from google.cloud.firestore_v1.base_pipeline import _BasePipeline
24from google.cloud.firestore_v1.pipeline_result import PipelineStream
25from google.cloud.firestore_v1.pipeline_result import PipelineSnapshot
26from google.cloud.firestore_v1.pipeline_result import PipelineResult
27
28if TYPE_CHECKING: # pragma: NO COVER
29 import datetime
30 from google.cloud.firestore_v1.client import Client
31 from google.cloud.firestore_v1.pipeline_expressions import Constant
32 from google.cloud.firestore_v1.transaction import Transaction
33 from google.cloud.firestore_v1.types.document import Value
34 from google.cloud.firestore_v1.query_profile import PipelineExplainOptions
35
36
37class Pipeline(_BasePipeline):
38 """
39 Pipelines allow for complex data transformations and queries involving
40 multiple stages like filtering, projection, aggregation, and vector search.
41
42 Usage Example:
43 >>> from google.cloud.firestore_v1.pipeline_expressions import Field
44 >>>
45 >>> def run_pipeline():
46 ... client = Client(...)
47 ... pipeline = client.pipeline()
48 ... .collection("books")
49 ... .where(Field.of("published").gt(1980))
50 ... .select("title", "author")
51 ... for result in pipeline.execute():
52 ... print(result)
53
54 Use `client.pipeline()` to create instances of this class.
55
56 .. warning::
57 **Preview API**: Firestore Pipelines is currently in preview and is
58 subject to potential breaking changes in future releases.
59 """
60
61 def __init__(self, client: Client, *stages: stages.Stage):
62 """
63 Initializes a Pipeline.
64
65 Args:
66 client: The `Client` instance to use for execution.
67 *stages: Initial stages for the pipeline.
68 """
69 super().__init__(client, *stages)
70
71 def execute(
72 self,
73 *,
74 transaction: "Transaction" | None = None,
75 read_time: datetime.datetime | None = None,
76 explain_options: PipelineExplainOptions | None = None,
77 additional_options: dict[str, Value | Constant] = {},
78 ) -> PipelineSnapshot[PipelineResult]:
79 """
80 Executes this pipeline and returns results as a list
81
82 Args:
83 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
84 An existing transaction that this query will run in.
85 If a ``transaction`` is used and it already has write operations
86 added, this method cannot be used (i.e. read-after-write is not
87 allowed).
88 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
89 time. This must be a microsecond precision timestamp within the past one hour, or
90 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
91 within the past 7 days. For the most accurate results, use UTC timezone.
92 explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]):
93 Options to enable query profiling for this query. When set,
94 explain_metrics will be available on the returned list.
95 additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
96 These options will take precedence over method argument if there is a conflict (e.g. explain_options)
97 """
98 kwargs = {k: v for k, v in locals().items() if k != "self"}
99 stream = PipelineStream(PipelineResult, self, **kwargs)
100 results = [result for result in stream]
101 return PipelineSnapshot(results, stream)
102
103 def stream(
104 self,
105 *,
106 transaction: "Transaction" | None = None,
107 read_time: datetime.datetime | None = None,
108 explain_options: PipelineExplainOptions | None = None,
109 additional_options: dict[str, Value | Constant] = {},
110 ) -> PipelineStream[PipelineResult]:
111 """
112 Process this pipeline as a stream, providing results through an Iterable
113
114 Args:
115 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
116 An existing transaction that this query will run in.
117 If a ``transaction`` is used and it already has write operations
118 added, this method cannot be used (i.e. read-after-write is not
119 allowed).
120 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
121 time. This must be a microsecond precision timestamp within the past one hour, or
122 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
123 within the past 7 days. For the most accurate results, use UTC timezone.
124 explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]):
125 Options to enable query profiling for this query. When set,
126 explain_metrics will be available on the returned generator.
127 additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
128 These options will take precedence over method argument if there is a conflict (e.g. explain_options)
129 """
130 kwargs = {k: v for k, v in locals().items() if k != "self"}
131 return PipelineStream(PipelineResult, self, **kwargs)