1# Copyright 2025 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""
15.. warning::
16 **Preview API**: Firestore Pipelines is currently in preview and is
17 subject to potential breaking changes in future releases.
18"""
19
20from __future__ import annotations
21from typing import Generic, TypeVar, TYPE_CHECKING
22from google.cloud.firestore_v1 import pipeline_stages as stages
23from google.cloud.firestore_v1.base_pipeline import _BasePipeline
24from google.cloud.firestore_v1._helpers import DOCUMENT_PATH_DELIMITER
25
26if TYPE_CHECKING: # pragma: NO COVER
27 from google.cloud.firestore_v1.client import Client
28 from google.cloud.firestore_v1.async_client import AsyncClient
29 from google.cloud.firestore_v1.base_document import BaseDocumentReference
30 from google.cloud.firestore_v1.base_query import BaseQuery
31 from google.cloud.firestore_v1.base_aggregation import BaseAggregationQuery
32 from google.cloud.firestore_v1.base_collection import BaseCollectionReference
33
34
35PipelineType = TypeVar("PipelineType", bound=_BasePipeline)
36
37
38class PipelineSource(Generic[PipelineType]):
39 """
40 A factory for creating Pipeline instances, which provide a framework for building data
41 transformation and query pipelines for Firestore.
42
43 Not meant to be instantiated directly. Instead, start by calling client.pipeline()
44 to obtain an instance of PipelineSource. From there, you can use the provided
45 methods to specify the data source for your pipeline.
46 """
47
48 def __init__(self, client: Client | AsyncClient):
49 self.client = client
50
51 def _create_pipeline(self, source_stage):
52 return self.client._pipeline_cls._create_with_stages(self.client, source_stage)
53
54 def create_from(
55 self, query: "BaseQuery" | "BaseAggregationQuery" | "BaseCollectionReference"
56 ) -> PipelineType:
57 """
58 Create a pipeline from an existing query
59
60 Queries containing a `cursor` or `limit_to_last` are not currently supported
61
62 Args:
63 query: the query to build the pipeline off of
64 Raises:
65 - NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
66 Returns:
67 a new pipeline instance representing the query
68 """
69 return query._build_pipeline(self)
70
71 def collection(self, path: str | tuple[str]) -> PipelineType:
72 """
73 Creates a new Pipeline that operates on a specified Firestore collection.
74
75 Args:
76 path: The path to the Firestore collection (e.g., "users"). Can either be:
77 * A single ``/``-delimited path to a collection
78 * A tuple of collection path segment
79 Returns:
80 a new pipeline instance targeting the specified collection
81 """
82 if isinstance(path, tuple):
83 path = DOCUMENT_PATH_DELIMITER.join(path)
84 return self._create_pipeline(stages.Collection(path))
85
86 def collection_group(self, collection_id: str) -> PipelineType:
87 """
88 Creates a new Pipeline that that operates on all documents in a collection group.
89 Args:
90 collection_id: The ID of the collection group
91 Returns:
92 a new pipeline instance targeting the specified collection group
93 """
94 return self._create_pipeline(stages.CollectionGroup(collection_id))
95
96 def database(self) -> PipelineType:
97 """
98 Creates a new Pipeline that operates on all documents in the Firestore database.
99 Returns:
100 a new pipeline instance targeting the specified collection
101 """
102 return self._create_pipeline(stages.Database())
103
104 def documents(self, *docs: "BaseDocumentReference") -> PipelineType:
105 """
106 Creates a new Pipeline that operates on a specific set of Firestore documents.
107 Args:
108 docs: The DocumentReference instances representing the documents to include in the pipeline.
109 Returns:
110 a new pipeline instance targeting the specified documents
111 """
112 return self._create_pipeline(stages.Documents.of(*docs))