1# Copyright 2025 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""
15.. warning::
16 **Preview API**: Firestore Pipelines is currently in preview and is
17 subject to potential breaking changes in future releases.
18"""
19
20from __future__ import annotations
21from typing import Generic, TypeVar, TYPE_CHECKING
22from google.cloud.firestore_v1 import pipeline_stages as stages
23from google.cloud.firestore_v1.base_pipeline import _BasePipeline
24from google.cloud.firestore_v1._helpers import DOCUMENT_PATH_DELIMITER
25
26if TYPE_CHECKING: # pragma: NO COVER
27 from google.cloud.firestore_v1.client import Client
28 from google.cloud.firestore_v1.async_client import AsyncClient
29 from google.cloud.firestore_v1.base_document import BaseDocumentReference
30 from google.cloud.firestore_v1.base_query import BaseQuery
31 from google.cloud.firestore_v1.base_aggregation import BaseAggregationQuery
32 from google.cloud.firestore_v1.base_collection import BaseCollectionReference
33
34
35PipelineType = TypeVar("PipelineType", bound=_BasePipeline)
36
37
38class PipelineSource(Generic[PipelineType]):
39 """
40 A factory for creating Pipeline instances, which provide a framework for building data
41 transformation and query pipelines for Firestore.
42
43 Not meant to be instantiated directly. Instead, start by calling client.pipeline()
44 to obtain an instance of PipelineSource. From there, you can use the provided
45 methods to specify the data source for your pipeline.
46 """
47
48 def __init__(self, client: Client | AsyncClient):
49 self.client = client
50
51 def _create_pipeline(self, source_stage):
52 return self.client._pipeline_cls._create_with_stages(self.client, source_stage)
53
54 def create_from(
55 self, query: "BaseQuery" | "BaseAggregationQuery" | "BaseCollectionReference"
56 ) -> PipelineType:
57 """
58 Create a pipeline from an existing query
59
60 Args:
61 query: the query to build the pipeline off of
62 Returns:
63 a new pipeline instance representing the query
64 """
65 return query._build_pipeline(self)
66
67 def collection(self, path: str | tuple[str]) -> PipelineType:
68 """
69 Creates a new Pipeline that operates on a specified Firestore collection.
70
71 Args:
72 path: The path to the Firestore collection (e.g., "users"). Can either be:
73 * A single ``/``-delimited path to a collection
74 * A tuple of collection path segment
75 Returns:
76 a new pipeline instance targeting the specified collection
77 """
78 if isinstance(path, tuple):
79 path = DOCUMENT_PATH_DELIMITER.join(path)
80 return self._create_pipeline(stages.Collection(path))
81
82 def collection_group(self, collection_id: str) -> PipelineType:
83 """
84 Creates a new Pipeline that that operates on all documents in a collection group.
85 Args:
86 collection_id: The ID of the collection group
87 Returns:
88 a new pipeline instance targeting the specified collection group
89 """
90 return self._create_pipeline(stages.CollectionGroup(collection_id))
91
92 def database(self) -> PipelineType:
93 """
94 Creates a new Pipeline that operates on all documents in the Firestore database.
95 Returns:
96 a new pipeline instance targeting the specified collection
97 """
98 return self._create_pipeline(stages.Database())
99
100 def documents(self, *docs: "BaseDocumentReference") -> PipelineType:
101 """
102 Creates a new Pipeline that operates on a specific set of Firestore documents.
103 Args:
104 docs: The DocumentReference instances representing the documents to include in the pipeline.
105 Returns:
106 a new pipeline instance targeting the specified documents
107 """
108 return self._create_pipeline(stages.Documents.of(*docs))