1# Copyright 2025 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import annotations
16from typing import Sequence, TYPE_CHECKING
17from google.cloud.firestore_v1 import pipeline_stages as stages
18from google.cloud.firestore_v1.types.pipeline import (
19 StructuredPipeline as StructuredPipeline_pb,
20)
21from google.cloud.firestore_v1.vector import Vector
22from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
23from google.cloud.firestore_v1.pipeline_expressions import (
24 AggregateFunction,
25 AliasedExpression,
26 Expression,
27 Field,
28 BooleanExpression,
29 Selectable,
30)
31
32if TYPE_CHECKING: # pragma: NO COVER
33 from google.cloud.firestore_v1.client import Client
34 from google.cloud.firestore_v1.async_client import AsyncClient
35
36
37class _BasePipeline:
38 """
39 Base class for building Firestore data transformation and query pipelines.
40
41 This class is not intended to be instantiated directly.
42 Use `client.pipeline()` to create pipeline instances.
43 """
44
45 def __init__(self, client: Client | AsyncClient):
46 """
47 Initializes a new pipeline.
48
49 Pipelines should not be instantiated directly. Instead,
50 call client.pipeline() to create an instance
51
52 Args:
53 client: The client associated with the pipeline
54 """
55 self._client = client
56 self.stages: Sequence[stages.Stage] = tuple()
57
58 @classmethod
59 def _create_with_stages(
60 cls, client: Client | AsyncClient, *stages
61 ) -> _BasePipeline:
62 """
63 Initializes a new pipeline with the given stages.
64
65 Pipeline classes should not be instantiated directly.
66
67 Args:
68 client: The client associated with the pipeline
69 *stages: Initial stages for the pipeline.
70 """
71 new_instance = cls(client)
72 new_instance.stages = tuple(stages)
73 return new_instance
74
75 def __repr__(self):
76 cls_str = type(self).__name__
77 if not self.stages:
78 return f"{cls_str}()"
79 elif len(self.stages) == 1:
80 return f"{cls_str}({self.stages[0]!r})"
81 else:
82 stages_str = ",\n ".join([repr(s) for s in self.stages])
83 return f"{cls_str}(\n {stages_str}\n)"
84
85 def _to_pb(self, **options) -> StructuredPipeline_pb:
86 return StructuredPipeline_pb(
87 pipeline={"stages": [s._to_pb() for s in self.stages]},
88 options=options,
89 )
90
91 def _append(self, new_stage):
92 """
93 Create a new Pipeline object with a new stage appended
94 """
95 return self.__class__._create_with_stages(self._client, *self.stages, new_stage)
96
97 def add_fields(self, *fields: Selectable) -> "_BasePipeline":
98 """
99 Adds new fields to outputs from previous stages.
100
101 This stage allows you to compute values on-the-fly based on existing data
102 from previous stages or constants. You can use this to create new fields
103 or overwrite existing ones (if there is name overlap).
104
105 The added fields are defined using `Selectable` expressions, which can be:
106 - `Field`: References an existing document field.
107 - `Function`: Performs a calculation using functions like `add`,
108 `multiply` with assigned aliases using `Expression.as_()`.
109
110 Example:
111 >>> from google.cloud.firestore_v1.pipeline_expressions import Field, add
112 >>> pipeline = client.pipeline().collection("books")
113 >>> pipeline = pipeline.add_fields(
114 ... Field.of("rating").as_("bookRating"), # Rename 'rating' to 'bookRating'
115 ... add(5, Field.of("quantity")).as_("totalCost") # Calculate 'totalCost'
116 ... )
117
118 Args:
119 *fields: The fields to add to the documents, specified as `Selectable`
120 expressions.
121 Returns:
122 A new Pipeline object with this stage appended to the stage list
123 """
124 return self._append(stages.AddFields(*fields))
125
126 def remove_fields(self, *fields: Field | str) -> "_BasePipeline":
127 """
128 Removes fields from outputs of previous stages.
129
130 Example:
131 >>> from google.cloud.firestore_v1.pipeline_expressions import Field
132 >>> pipeline = client.pipeline().collection("books")
133 >>> # Remove by name
134 >>> pipeline = pipeline.remove_fields("rating", "cost")
135 >>> # Remove by Field object
136 >>> pipeline = pipeline.remove_fields(Field.of("rating"), Field.of("cost"))
137
138
139 Args:
140 *fields: The fields to remove, specified as field names (str) or
141 `Field` objects.
142
143 Returns:
144 A new Pipeline object with this stage appended to the stage list
145 """
146 return self._append(stages.RemoveFields(*fields))
147
148 def select(self, *selections: str | Selectable) -> "_BasePipeline":
149 """
150 Selects or creates a set of fields from the outputs of previous stages.
151
152 The selected fields are defined using `Selectable` expressions or field names:
153 - `Field`: References an existing document field.
154 - `Function`: Represents the result of a function with an assigned alias
155 name using `Expression.as_()`.
156 - `str`: The name of an existing field.
157
158 If no selections are provided, the output of this stage is empty. Use
159 `add_fields()` instead if only additions are desired.
160
161 Example:
162 >>> from google.cloud.firestore_v1.pipeline_expressions import Field, to_upper
163 >>> pipeline = client.pipeline().collection("books")
164 >>> # Select by name
165 >>> pipeline = pipeline.select("name", "address")
166 >>> # Select using Field and Function expressions
167 >>> pipeline = pipeline.select(
168 ... Field.of("name"),
169 ... Field.of("address").to_upper().as_("upperAddress"),
170 ... )
171
172 Args:
173 *selections: The fields to include in the output documents, specified as
174 field names (str) or `Selectable` expressions.
175
176 Returns:
177 A new Pipeline object with this stage appended to the stage list
178 """
179 return self._append(stages.Select(*selections))
180
181 def where(self, condition: BooleanExpression) -> "_BasePipeline":
182 """
183 Filters the documents from previous stages to only include those matching
184 the specified `BooleanExpression`.
185
186 This stage allows you to apply conditions to the data, similar to a "WHERE"
187 clause in SQL. You can filter documents based on their field values, using
188 implementations of `BooleanExpression`, typically including but not limited to:
189 - field comparators: `eq`, `lt` (less than), `gt` (greater than), etc.
190 - logical operators: `And`, `Or`, `Not`, etc.
191 - advanced functions: `regex_matches`, `array_contains`, etc.
192
193 Example:
194 >>> from google.cloud.firestore_v1.pipeline_expressions import Field, And,
195 >>> pipeline = client.pipeline().collection("books")
196 >>> # Using static functions
197 >>> pipeline = pipeline.where(
198 ... And(
199 ... Field.of("rating").gt(4.0), # Filter for ratings > 4.0
200 ... Field.of("genre").eq("Science Fiction") # Filter for genre
201 ... )
202 ... )
203 >>> # Using methods on expressions
204 >>> pipeline = pipeline.where(
205 ... And(
206 ... Field.of("rating").gt(4.0),
207 ... Field.of("genre").eq("Science Fiction")
208 ... )
209 ... )
210
211
212 Args:
213 condition: The `BooleanExpression` to apply.
214
215 Returns:
216 A new Pipeline object with this stage appended to the stage list
217 """
218 return self._append(stages.Where(condition))
219
220 def find_nearest(
221 self,
222 field: str | Expression,
223 vector: Sequence[float] | "Vector",
224 distance_measure: "DistanceMeasure",
225 options: stages.FindNearestOptions | None = None,
226 ) -> "_BasePipeline":
227 """
228 Performs vector distance (similarity) search with given parameters on the
229 stage inputs.
230
231 This stage adds a "nearest neighbor search" capability to your pipelines.
232 Given a field or expression that evaluates to a vector and a target vector,
233 this stage will identify and return the inputs whose vector is closest to
234 the target vector, using the specified distance measure and options.
235
236 Example:
237 >>> from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
238 >>> from google.cloud.firestore_v1.pipeline_stages import FindNearestOptions
239 >>> from google.cloud.firestore_v1.pipeline_expressions import Field
240 >>>
241 >>> target_vector = [0.1, 0.2, 0.3]
242 >>> pipeline = client.pipeline().collection("books")
243 >>> # Find using field name
244 >>> pipeline = pipeline.find_nearest(
245 ... "topicVectors",
246 ... target_vector,
247 ... DistanceMeasure.COSINE,
248 ... options=FindNearestOptions(limit=10, distance_field="distance")
249 ... )
250 >>> # Find using Field expression
251 >>> pipeline = pipeline.find_nearest(
252 ... Field.of("topicVectors"),
253 ... target_vector,
254 ... DistanceMeasure.COSINE,
255 ... options=FindNearestOptions(limit=10, distance_field="distance")
256 ... )
257
258 Args:
259 field: The name of the field (str) or an expression (`Expression`) that
260 evaluates to the vector data. This field should store vector values.
261 vector: The target vector (sequence of floats or `Vector` object) to
262 compare against.
263 distance_measure: The distance measure (`DistanceMeasure`) to use
264 (e.g., `DistanceMeasure.COSINE`, `DistanceMeasure.EUCLIDEAN`).
265 limit: The maximum number of nearest neighbors to return.
266 options: Configuration options (`FindNearestOptions`) for the search,
267 such as limit and output distance field name.
268
269 Returns:
270 A new Pipeline object with this stage appended to the stage list
271 """
272 return self._append(
273 stages.FindNearest(field, vector, distance_measure, options)
274 )
275
276 def replace_with(
277 self,
278 field: Selectable,
279 ) -> "_BasePipeline":
280 """
281 Fully overwrites all fields in a document with those coming from a nested map.
282
283 This stage allows you to emit a map value as a document. Each key of the map becomes a field
284 on the document that contains the corresponding value.
285
286 Example:
287 Input document:
288 ```json
289 {
290 "name": "John Doe Jr.",
291 "parents": {
292 "father": "John Doe Sr.",
293 "mother": "Jane Doe"
294 }
295 }
296 ```
297
298 >>> # Emit the 'parents' map as the document
299 >>> pipeline = client.pipeline().collection("people").replace_with(Field.of("parents"))
300
301 Output document:
302 ```json
303 {
304 "father": "John Doe Sr.",
305 "mother": "Jane Doe"
306 }
307 ```
308
309 Args:
310 field: The `Selectable` field containing the map whose content will
311 replace the document.
312 Returns:
313 A new Pipeline object with this stage appended to the stage list
314 """
315 return self._append(stages.ReplaceWith(field))
316
317 def sort(self, *orders: stages.Ordering) -> "_BasePipeline":
318 """
319 Sorts the documents from previous stages based on one or more `Ordering` criteria.
320
321 This stage allows you to order the results of your pipeline. You can specify
322 multiple `Ordering` instances to sort by multiple fields or expressions in
323 ascending or descending order. If documents have the same value for a sorting
324 criterion, the next specified ordering will be used. If all orderings result
325 in equal comparison, the documents are considered equal and the relative order
326 is unspecified.
327
328 Example:
329 >>> from google.cloud.firestore_v1.pipeline_expressions import Field
330 >>> pipeline = client.pipeline().collection("books")
331 >>> # Sort books by rating descending, then title ascending
332 >>> pipeline = pipeline.sort(
333 ... Field.of("rating").descending(),
334 ... Field.of("title").ascending()
335 ... )
336
337 Args:
338 *orders: One or more `Ordering` instances specifying the sorting criteria.
339
340 Returns:
341 A new Pipeline object with this stage appended to the stage list
342 """
343 return self._append(stages.Sort(*orders))
344
345 def sample(self, limit_or_options: int | stages.SampleOptions) -> "_BasePipeline":
346 """
347 Performs a pseudo-random sampling of the documents from the previous stage.
348
349 This stage filters documents pseudo-randomly.
350 - If an `int` limit is provided, it specifies the maximum number of documents
351 to emit. If fewer documents are available, all are passed through.
352 - If `SampleOptions` are provided, they specify how sampling is performed
353 (e.g., by document count or percentage).
354
355 Example:
356 >>> from google.cloud.firestore_v1.pipeline_expressions import SampleOptions
357 >>> pipeline = client.pipeline().collection("books")
358 >>> # Sample 10 books, if available.
359 >>> pipeline = pipeline.sample(10)
360 >>> pipeline = pipeline.sample(SampleOptions.doc_limit(10))
361 >>> # Sample 50% of books.
362 >>> pipeline = pipeline.sample(SampleOptions.percentage(0.5))
363
364
365 Args:
366 limit_or_options: Either an integer specifying the maximum number of
367 documents to sample, or a `SampleOptions` object.
368
369 Returns:
370 A new Pipeline object with this stage appended to the stage list
371 """
372 return self._append(stages.Sample(limit_or_options))
373
374 def union(self, other: "_BasePipeline") -> "_BasePipeline":
375 """
376 Performs a union of all documents from this pipeline and another pipeline,
377 including duplicates.
378
379 This stage passes through documents from the previous stage of this pipeline,
380 and also passes through documents from the previous stage of the `other`
381 pipeline provided. The order of documents emitted from this stage is undefined.
382
383 Example:
384 >>> books_pipeline = client.pipeline().collection("books")
385 >>> magazines_pipeline = client.pipeline().collection("magazines")
386 >>> # Emit documents from both collections
387 >>> combined_pipeline = books_pipeline.union(magazines_pipeline)
388
389 Args:
390 other: The other `Pipeline` whose results will be unioned with this one.
391
392 Returns:
393 A new Pipeline object with this stage appended to the stage list
394 """
395 return self._append(stages.Union(other))
396
397 def unnest(
398 self,
399 field: str | Selectable,
400 alias: str | Field | None = None,
401 options: stages.UnnestOptions | None = None,
402 ) -> "_BasePipeline":
403 """
404 Produces a document for each element in an array field from the previous stage document.
405
406 For each previous stage document, this stage will emit zero or more augmented documents. The
407 input array found in the previous stage document field specified by the `fieldName` parameter,
408 will emit an augmented document for each input array element. The input array element will
409 augment the previous stage document by setting the `alias` field with the array element value.
410 If `alias` is unset, the data in `field` will be overwritten.
411
412 Example:
413 Input document:
414 ```json
415 { "title": "The Hitchhiker's Guide", "tags": [ "comedy", "sci-fi" ], ... }
416 ```
417
418 >>> from google.cloud.firestore_v1.pipeline_stages import UnnestOptions
419 >>> pipeline = client.pipeline().collection("books")
420 >>> # Emit a document for each tag
421 >>> pipeline = pipeline.unnest("tags", alias="tag")
422
423 Output documents (without options):
424 ```json
425 { "title": "The Hitchhiker's Guide", "tag": "comedy", ... }
426 { "title": "The Hitchhiker's Guide", "tag": "sci-fi", ... }
427 ```
428
429 Optionally, `UnnestOptions` can specify a field to store the original index
430 of the element within the array
431
432 Example:
433 Input document:
434 ```json
435 { "title": "The Hitchhiker's Guide", "tags": [ "comedy", "sci-fi" ], ... }
436 ```
437
438 >>> from google.cloud.firestore_v1.pipeline_stages import UnnestOptions
439 >>> pipeline = client.pipeline().collection("books")
440 >>> # Emit a document for each tag, including the index
441 >>> pipeline = pipeline.unnest("tags", options=UnnestOptions(index_field="tagIndex"))
442
443 Output documents (with index_field="tagIndex"):
444 ```json
445 { "title": "The Hitchhiker's Guide", "tags": "comedy", "tagIndex": 0, ... }
446 { "title": "The Hitchhiker's Guide", "tags": "sci-fi", "tagIndex": 1, ... }
447 ```
448
449 Args:
450 field: The name of the field containing the array to unnest.
451 alias The alias field is used as the field name for each element within the output array.
452 If unset, or if `alias` matches the `field`, the output data will overwrite the original field.
453 options: Optional `UnnestOptions` to configure additional behavior, like adding an index field.
454
455 Returns:
456 A new Pipeline object with this stage appended to the stage list
457 """
458 return self._append(stages.Unnest(field, alias, options))
459
460 def raw_stage(self, name: str, *params: Expression) -> "_BasePipeline":
461 """
462 Adds a stage to the pipeline by specifying the stage name as an argument. This does not offer any
463 type safety on the stage params and requires the caller to know the order (and optionally names)
464 of parameters accepted by the stage.
465
466 This class provides a way to call stages that are supported by the Firestore backend but that
467 are not implemented in the SDK version being used.
468
469 Example:
470 >>> # Assume we don't have a built-in "where" stage
471 >>> pipeline = client.pipeline().collection("books")
472 >>> pipeline = pipeline.raw_stage("where", Field.of("published").lt(900))
473 >>> pipeline = pipeline.select("title", "author")
474
475 Args:
476 name: The name of the stage.
477 *params: A sequence of `Expression` objects representing the parameters for the stage.
478
479 Returns:
480 A new Pipeline object with this stage appended to the stage list
481 """
482 return self._append(stages.RawStage(name, *params))
483
484 def offset(self, offset: int) -> "_BasePipeline":
485 """
486 Skips the first `offset` number of documents from the results of previous stages.
487
488 This stage is useful for implementing pagination, allowing you to retrieve
489 results in chunks. It is typically used in conjunction with `limit()` to
490 control the size of each page.
491
492 Example:
493 >>> from google.cloud.firestore_v1.pipeline_expressions import Field
494 >>> pipeline = client.pipeline().collection("books")
495 >>> # Retrieve the second page of 20 results (assuming sorted)
496 >>> pipeline = pipeline.sort(Field.of("published").descending())
497 >>> pipeline = pipeline.offset(20) # Skip the first 20 results
498 >>> pipeline = pipeline.limit(20) # Take the next 20 results
499
500 Args:
501 offset: The non-negative number of documents to skip.
502
503 Returns:
504 A new Pipeline object with this stage appended to the stage list
505 """
506 return self._append(stages.Offset(offset))
507
508 def limit(self, limit: int) -> "_BasePipeline":
509 """
510 Limits the maximum number of documents returned by previous stages to `limit`.
511
512 This stage is useful for controlling the size of the result set, often used for:
513 - **Pagination:** In combination with `offset()` to retrieve specific pages.
514 - **Top-N queries:** To get a limited number of results after sorting.
515 - **Performance:** To prevent excessive data transfer.
516
517 Example:
518 >>> from google.cloud.firestore_v1.pipeline_expressions import Field
519 >>> pipeline = client.pipeline().collection("books")
520 >>> # Limit the results to the top 10 highest-rated books
521 >>> pipeline = pipeline.sort(Field.of("rating").descending())
522 >>> pipeline = pipeline.limit(10)
523
524 Args:
525 limit: The non-negative maximum number of documents to return.
526
527 Returns:
528 A new Pipeline object with this stage appended to the stage list
529 """
530 return self._append(stages.Limit(limit))
531
532 def aggregate(
533 self,
534 *accumulators: AliasedExpression[AggregateFunction],
535 groups: Sequence[str | Selectable] = (),
536 ) -> "_BasePipeline":
537 """
538 Performs aggregation operations on the documents from previous stages,
539 optionally grouped by specified fields or expressions.
540
541 This stage allows you to calculate aggregate values (like sum, average, count,
542 min, max) over a set of documents.
543
544 - **Accumulators:** Define the aggregation calculations using `AggregateFunction`
545 expressions (e.g., `sum()`, `avg()`, `count()`, `min()`, `max()`) combined
546 with `as_()` to name the result field.
547 - **Groups:** Optionally specify fields (by name or `Selectable`) to group
548 the documents by. Aggregations are then performed within each distinct group.
549 If no groups are provided, the aggregation is performed over the entire input.
550 Example:
551 >>> from google.cloud.firestore_v1.pipeline_expressions import Field
552 >>> pipeline = client.pipeline().collection("books")
553 >>> # Calculate the average rating and total count for all books
554 >>> pipeline = pipeline.aggregate(
555 ... Field.of("rating").avg().as_("averageRating"),
556 ... Field.of("rating").count().as_("totalBooks")
557 ... )
558 >>> # Calculate the average rating for each genre
559 >>> pipeline = pipeline.aggregate(
560 ... Field.of("rating").avg().as_("avg_rating"),
561 ... groups=["genre"] # Group by the 'genre' field
562 ... )
563 >>> # Calculate the count for each author, grouping by Field object
564 >>> pipeline = pipeline.aggregate(
565 ... Count().as_("bookCount"),
566 ... groups=[Field.of("author")]
567 ... )
568
569
570 Args:
571 *accumulators: One or more expressions defining the aggregations to perform and their
572 corresponding output names.
573 groups: An optional sequence of field names (str) or `Selectable`
574 expressions to group by before aggregating.
575
576 Returns:
577 A new Pipeline object with this stage appended to the stage list
578 """
579 return self._append(stages.Aggregate(*accumulators, groups=groups))
580
581 def distinct(self, *fields: str | Selectable) -> "_BasePipeline":
582 """
583 Returns documents with distinct combinations of values for the specified
584 fields or expressions.
585
586 This stage filters the results from previous stages to include only one
587 document for each unique combination of values in the specified `fields`.
588 The output documents contain only the fields specified in the `distinct` call.
589
590 Example:
591 >>> from google.cloud.firestore_v1.pipeline_expressions import Field, to_upper
592 >>> pipeline = client.pipeline().collection("books")
593 >>> # Get a list of unique genres (output has only 'genre' field)
594 >>> pipeline = pipeline.distinct("genre")
595 >>> # Get unique combinations of author (uppercase) and genre
596 >>> pipeline = pipeline.distinct(
597 ... Field.of("author").to_upper().as_("authorUpper"),
598 ... Field.of("genre")
599 ... )
600
601
602 Args:
603 *fields: Field names (str) or `Selectable` expressions to consider when
604 determining distinct value combinations. The output will only
605 contain these fields/expressions.
606
607 Returns:
608 A new Pipeline object with this stage appended to the stage list
609 """
610 return self._append(stages.Distinct(*fields))