Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/firestore_v1/base_pipeline.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

59 statements  

1# Copyright 2025 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15from __future__ import annotations 

16from typing import Sequence, TYPE_CHECKING 

17from google.cloud.firestore_v1 import pipeline_stages as stages 

18from google.cloud.firestore_v1.types.pipeline import ( 

19 StructuredPipeline as StructuredPipeline_pb, 

20) 

21from google.cloud.firestore_v1.vector import Vector 

22from google.cloud.firestore_v1.base_vector_query import DistanceMeasure 

23from google.cloud.firestore_v1.pipeline_expressions import ( 

24 AggregateFunction, 

25 AliasedExpression, 

26 Expression, 

27 Field, 

28 BooleanExpression, 

29 Selectable, 

30) 

31 

32if TYPE_CHECKING: # pragma: NO COVER 

33 from google.cloud.firestore_v1.client import Client 

34 from google.cloud.firestore_v1.async_client import AsyncClient 

35 

36 

37class _BasePipeline: 

38 """ 

39 Base class for building Firestore data transformation and query pipelines. 

40 

41 This class is not intended to be instantiated directly. 

42 Use `client.pipeline()` to create pipeline instances. 

43 """ 

44 

45 def __init__(self, client: Client | AsyncClient): 

46 """ 

47 Initializes a new pipeline. 

48 

49 Pipelines should not be instantiated directly. Instead, 

50 call client.pipeline() to create an instance 

51 

52 Args: 

53 client: The client associated with the pipeline 

54 """ 

55 self._client = client 

56 self.stages: Sequence[stages.Stage] = tuple() 

57 

58 @classmethod 

59 def _create_with_stages( 

60 cls, client: Client | AsyncClient, *stages 

61 ) -> _BasePipeline: 

62 """ 

63 Initializes a new pipeline with the given stages. 

64 

65 Pipeline classes should not be instantiated directly. 

66 

67 Args: 

68 client: The client associated with the pipeline 

69 *stages: Initial stages for the pipeline. 

70 """ 

71 new_instance = cls(client) 

72 new_instance.stages = tuple(stages) 

73 return new_instance 

74 

75 def __repr__(self): 

76 cls_str = type(self).__name__ 

77 if not self.stages: 

78 return f"{cls_str}()" 

79 elif len(self.stages) == 1: 

80 return f"{cls_str}({self.stages[0]!r})" 

81 else: 

82 stages_str = ",\n ".join([repr(s) for s in self.stages]) 

83 return f"{cls_str}(\n {stages_str}\n)" 

84 

85 def _to_pb(self, **options) -> StructuredPipeline_pb: 

86 return StructuredPipeline_pb( 

87 pipeline={"stages": [s._to_pb() for s in self.stages]}, 

88 options=options, 

89 ) 

90 

91 def _append(self, new_stage): 

92 """ 

93 Create a new Pipeline object with a new stage appended 

94 """ 

95 return self.__class__._create_with_stages(self._client, *self.stages, new_stage) 

96 

97 def add_fields(self, *fields: Selectable) -> "_BasePipeline": 

98 """ 

99 Adds new fields to outputs from previous stages. 

100 

101 This stage allows you to compute values on-the-fly based on existing data 

102 from previous stages or constants. You can use this to create new fields 

103 or overwrite existing ones (if there is name overlap). 

104 

105 The added fields are defined using `Selectable` expressions, which can be: 

106 - `Field`: References an existing document field. 

107 - `Function`: Performs a calculation using functions like `add`, 

108 `multiply` with assigned aliases using `Expression.as_()`. 

109 

110 Example: 

111 >>> from google.cloud.firestore_v1.pipeline_expressions import Field, add 

112 >>> pipeline = client.pipeline().collection("books") 

113 >>> pipeline = pipeline.add_fields( 

114 ... Field.of("rating").as_("bookRating"), # Rename 'rating' to 'bookRating' 

115 ... add(5, Field.of("quantity")).as_("totalCost") # Calculate 'totalCost' 

116 ... ) 

117 

118 Args: 

119 *fields: The fields to add to the documents, specified as `Selectable` 

120 expressions. 

121 Returns: 

122 A new Pipeline object with this stage appended to the stage list 

123 """ 

124 return self._append(stages.AddFields(*fields)) 

125 

126 def remove_fields(self, *fields: Field | str) -> "_BasePipeline": 

127 """ 

128 Removes fields from outputs of previous stages. 

129 

130 Example: 

131 >>> from google.cloud.firestore_v1.pipeline_expressions import Field 

132 >>> pipeline = client.pipeline().collection("books") 

133 >>> # Remove by name 

134 >>> pipeline = pipeline.remove_fields("rating", "cost") 

135 >>> # Remove by Field object 

136 >>> pipeline = pipeline.remove_fields(Field.of("rating"), Field.of("cost")) 

137 

138 

139 Args: 

140 *fields: The fields to remove, specified as field names (str) or 

141 `Field` objects. 

142 

143 Returns: 

144 A new Pipeline object with this stage appended to the stage list 

145 """ 

146 return self._append(stages.RemoveFields(*fields)) 

147 

148 def select(self, *selections: str | Selectable) -> "_BasePipeline": 

149 """ 

150 Selects or creates a set of fields from the outputs of previous stages. 

151 

152 The selected fields are defined using `Selectable` expressions or field names: 

153 - `Field`: References an existing document field. 

154 - `Function`: Represents the result of a function with an assigned alias 

155 name using `Expression.as_()`. 

156 - `str`: The name of an existing field. 

157 

158 If no selections are provided, the output of this stage is empty. Use 

159 `add_fields()` instead if only additions are desired. 

160 

161 Example: 

162 >>> from google.cloud.firestore_v1.pipeline_expressions import Field, to_upper 

163 >>> pipeline = client.pipeline().collection("books") 

164 >>> # Select by name 

165 >>> pipeline = pipeline.select("name", "address") 

166 >>> # Select using Field and Function expressions 

167 >>> pipeline = pipeline.select( 

168 ... Field.of("name"), 

169 ... Field.of("address").to_upper().as_("upperAddress"), 

170 ... ) 

171 

172 Args: 

173 *selections: The fields to include in the output documents, specified as 

174 field names (str) or `Selectable` expressions. 

175 

176 Returns: 

177 A new Pipeline object with this stage appended to the stage list 

178 """ 

179 return self._append(stages.Select(*selections)) 

180 

181 def where(self, condition: BooleanExpression) -> "_BasePipeline": 

182 """ 

183 Filters the documents from previous stages to only include those matching 

184 the specified `BooleanExpression`. 

185 

186 This stage allows you to apply conditions to the data, similar to a "WHERE" 

187 clause in SQL. You can filter documents based on their field values, using 

188 implementations of `BooleanExpression`, typically including but not limited to: 

189 - field comparators: `eq`, `lt` (less than), `gt` (greater than), etc. 

190 - logical operators: `And`, `Or`, `Not`, etc. 

191 - advanced functions: `regex_matches`, `array_contains`, etc. 

192 

193 Example: 

194 >>> from google.cloud.firestore_v1.pipeline_expressions import Field, And, 

195 >>> pipeline = client.pipeline().collection("books") 

196 >>> # Using static functions 

197 >>> pipeline = pipeline.where( 

198 ... And( 

199 ... Field.of("rating").gt(4.0), # Filter for ratings > 4.0 

200 ... Field.of("genre").eq("Science Fiction") # Filter for genre 

201 ... ) 

202 ... ) 

203 >>> # Using methods on expressions 

204 >>> pipeline = pipeline.where( 

205 ... And( 

206 ... Field.of("rating").gt(4.0), 

207 ... Field.of("genre").eq("Science Fiction") 

208 ... ) 

209 ... ) 

210 

211 

212 Args: 

213 condition: The `BooleanExpression` to apply. 

214 

215 Returns: 

216 A new Pipeline object with this stage appended to the stage list 

217 """ 

218 return self._append(stages.Where(condition)) 

219 

220 def find_nearest( 

221 self, 

222 field: str | Expression, 

223 vector: Sequence[float] | "Vector", 

224 distance_measure: "DistanceMeasure", 

225 options: stages.FindNearestOptions | None = None, 

226 ) -> "_BasePipeline": 

227 """ 

228 Performs vector distance (similarity) search with given parameters on the 

229 stage inputs. 

230 

231 This stage adds a "nearest neighbor search" capability to your pipelines. 

232 Given a field or expression that evaluates to a vector and a target vector, 

233 this stage will identify and return the inputs whose vector is closest to 

234 the target vector, using the specified distance measure and options. 

235 

236 Example: 

237 >>> from google.cloud.firestore_v1.base_vector_query import DistanceMeasure 

238 >>> from google.cloud.firestore_v1.pipeline_stages import FindNearestOptions 

239 >>> from google.cloud.firestore_v1.pipeline_expressions import Field 

240 >>> 

241 >>> target_vector = [0.1, 0.2, 0.3] 

242 >>> pipeline = client.pipeline().collection("books") 

243 >>> # Find using field name 

244 >>> pipeline = pipeline.find_nearest( 

245 ... "topicVectors", 

246 ... target_vector, 

247 ... DistanceMeasure.COSINE, 

248 ... options=FindNearestOptions(limit=10, distance_field="distance") 

249 ... ) 

250 >>> # Find using Field expression 

251 >>> pipeline = pipeline.find_nearest( 

252 ... Field.of("topicVectors"), 

253 ... target_vector, 

254 ... DistanceMeasure.COSINE, 

255 ... options=FindNearestOptions(limit=10, distance_field="distance") 

256 ... ) 

257 

258 Args: 

259 field: The name of the field (str) or an expression (`Expression`) that 

260 evaluates to the vector data. This field should store vector values. 

261 vector: The target vector (sequence of floats or `Vector` object) to 

262 compare against. 

263 distance_measure: The distance measure (`DistanceMeasure`) to use 

264 (e.g., `DistanceMeasure.COSINE`, `DistanceMeasure.EUCLIDEAN`). 

265 limit: The maximum number of nearest neighbors to return. 

266 options: Configuration options (`FindNearestOptions`) for the search, 

267 such as limit and output distance field name. 

268 

269 Returns: 

270 A new Pipeline object with this stage appended to the stage list 

271 """ 

272 return self._append( 

273 stages.FindNearest(field, vector, distance_measure, options) 

274 ) 

275 

276 def replace_with( 

277 self, 

278 field: Selectable, 

279 ) -> "_BasePipeline": 

280 """ 

281 Fully overwrites all fields in a document with those coming from a nested map. 

282 

283 This stage allows you to emit a map value as a document. Each key of the map becomes a field 

284 on the document that contains the corresponding value. 

285 

286 Example: 

287 Input document: 

288 ```json 

289 { 

290 "name": "John Doe Jr.", 

291 "parents": { 

292 "father": "John Doe Sr.", 

293 "mother": "Jane Doe" 

294 } 

295 } 

296 ``` 

297 

298 >>> # Emit the 'parents' map as the document 

299 >>> pipeline = client.pipeline().collection("people").replace_with(Field.of("parents")) 

300 

301 Output document: 

302 ```json 

303 { 

304 "father": "John Doe Sr.", 

305 "mother": "Jane Doe" 

306 } 

307 ``` 

308 

309 Args: 

310 field: The `Selectable` field containing the map whose content will 

311 replace the document. 

312 Returns: 

313 A new Pipeline object with this stage appended to the stage list 

314 """ 

315 return self._append(stages.ReplaceWith(field)) 

316 

317 def sort(self, *orders: stages.Ordering) -> "_BasePipeline": 

318 """ 

319 Sorts the documents from previous stages based on one or more `Ordering` criteria. 

320 

321 This stage allows you to order the results of your pipeline. You can specify 

322 multiple `Ordering` instances to sort by multiple fields or expressions in 

323 ascending or descending order. If documents have the same value for a sorting 

324 criterion, the next specified ordering will be used. If all orderings result 

325 in equal comparison, the documents are considered equal and the relative order 

326 is unspecified. 

327 

328 Example: 

329 >>> from google.cloud.firestore_v1.pipeline_expressions import Field 

330 >>> pipeline = client.pipeline().collection("books") 

331 >>> # Sort books by rating descending, then title ascending 

332 >>> pipeline = pipeline.sort( 

333 ... Field.of("rating").descending(), 

334 ... Field.of("title").ascending() 

335 ... ) 

336 

337 Args: 

338 *orders: One or more `Ordering` instances specifying the sorting criteria. 

339 

340 Returns: 

341 A new Pipeline object with this stage appended to the stage list 

342 """ 

343 return self._append(stages.Sort(*orders)) 

344 

345 def sample(self, limit_or_options: int | stages.SampleOptions) -> "_BasePipeline": 

346 """ 

347 Performs a pseudo-random sampling of the documents from the previous stage. 

348 

349 This stage filters documents pseudo-randomly. 

350 - If an `int` limit is provided, it specifies the maximum number of documents 

351 to emit. If fewer documents are available, all are passed through. 

352 - If `SampleOptions` are provided, they specify how sampling is performed 

353 (e.g., by document count or percentage). 

354 

355 Example: 

356 >>> from google.cloud.firestore_v1.pipeline_expressions import SampleOptions 

357 >>> pipeline = client.pipeline().collection("books") 

358 >>> # Sample 10 books, if available. 

359 >>> pipeline = pipeline.sample(10) 

360 >>> pipeline = pipeline.sample(SampleOptions.doc_limit(10)) 

361 >>> # Sample 50% of books. 

362 >>> pipeline = pipeline.sample(SampleOptions.percentage(0.5)) 

363 

364 

365 Args: 

366 limit_or_options: Either an integer specifying the maximum number of 

367 documents to sample, or a `SampleOptions` object. 

368 

369 Returns: 

370 A new Pipeline object with this stage appended to the stage list 

371 """ 

372 return self._append(stages.Sample(limit_or_options)) 

373 

374 def union(self, other: "_BasePipeline") -> "_BasePipeline": 

375 """ 

376 Performs a union of all documents from this pipeline and another pipeline, 

377 including duplicates. 

378 

379 This stage passes through documents from the previous stage of this pipeline, 

380 and also passes through documents from the previous stage of the `other` 

381 pipeline provided. The order of documents emitted from this stage is undefined. 

382 

383 Example: 

384 >>> books_pipeline = client.pipeline().collection("books") 

385 >>> magazines_pipeline = client.pipeline().collection("magazines") 

386 >>> # Emit documents from both collections 

387 >>> combined_pipeline = books_pipeline.union(magazines_pipeline) 

388 

389 Args: 

390 other: The other `Pipeline` whose results will be unioned with this one. 

391 

392 Returns: 

393 A new Pipeline object with this stage appended to the stage list 

394 """ 

395 return self._append(stages.Union(other)) 

396 

397 def unnest( 

398 self, 

399 field: str | Selectable, 

400 alias: str | Field | None = None, 

401 options: stages.UnnestOptions | None = None, 

402 ) -> "_BasePipeline": 

403 """ 

404 Produces a document for each element in an array field from the previous stage document. 

405 

406 For each previous stage document, this stage will emit zero or more augmented documents. The 

407 input array found in the previous stage document field specified by the `fieldName` parameter, 

408 will emit an augmented document for each input array element. The input array element will 

409 augment the previous stage document by setting the `alias` field with the array element value. 

410 If `alias` is unset, the data in `field` will be overwritten. 

411 

412 Example: 

413 Input document: 

414 ```json 

415 { "title": "The Hitchhiker's Guide", "tags": [ "comedy", "sci-fi" ], ... } 

416 ``` 

417 

418 >>> from google.cloud.firestore_v1.pipeline_stages import UnnestOptions 

419 >>> pipeline = client.pipeline().collection("books") 

420 >>> # Emit a document for each tag 

421 >>> pipeline = pipeline.unnest("tags", alias="tag") 

422 

423 Output documents (without options): 

424 ```json 

425 { "title": "The Hitchhiker's Guide", "tag": "comedy", ... } 

426 { "title": "The Hitchhiker's Guide", "tag": "sci-fi", ... } 

427 ``` 

428 

429 Optionally, `UnnestOptions` can specify a field to store the original index 

430 of the element within the array 

431 

432 Example: 

433 Input document: 

434 ```json 

435 { "title": "The Hitchhiker's Guide", "tags": [ "comedy", "sci-fi" ], ... } 

436 ``` 

437 

438 >>> from google.cloud.firestore_v1.pipeline_stages import UnnestOptions 

439 >>> pipeline = client.pipeline().collection("books") 

440 >>> # Emit a document for each tag, including the index 

441 >>> pipeline = pipeline.unnest("tags", options=UnnestOptions(index_field="tagIndex")) 

442 

443 Output documents (with index_field="tagIndex"): 

444 ```json 

445 { "title": "The Hitchhiker's Guide", "tags": "comedy", "tagIndex": 0, ... } 

446 { "title": "The Hitchhiker's Guide", "tags": "sci-fi", "tagIndex": 1, ... } 

447 ``` 

448 

449 Args: 

450 field: The name of the field containing the array to unnest. 

451 alias The alias field is used as the field name for each element within the output array. 

452 If unset, or if `alias` matches the `field`, the output data will overwrite the original field. 

453 options: Optional `UnnestOptions` to configure additional behavior, like adding an index field. 

454 

455 Returns: 

456 A new Pipeline object with this stage appended to the stage list 

457 """ 

458 return self._append(stages.Unnest(field, alias, options)) 

459 

460 def raw_stage(self, name: str, *params: Expression) -> "_BasePipeline": 

461 """ 

462 Adds a stage to the pipeline by specifying the stage name as an argument. This does not offer any 

463 type safety on the stage params and requires the caller to know the order (and optionally names) 

464 of parameters accepted by the stage. 

465 

466 This class provides a way to call stages that are supported by the Firestore backend but that 

467 are not implemented in the SDK version being used. 

468 

469 Example: 

470 >>> # Assume we don't have a built-in "where" stage 

471 >>> pipeline = client.pipeline().collection("books") 

472 >>> pipeline = pipeline.raw_stage("where", Field.of("published").lt(900)) 

473 >>> pipeline = pipeline.select("title", "author") 

474 

475 Args: 

476 name: The name of the stage. 

477 *params: A sequence of `Expression` objects representing the parameters for the stage. 

478 

479 Returns: 

480 A new Pipeline object with this stage appended to the stage list 

481 """ 

482 return self._append(stages.RawStage(name, *params)) 

483 

484 def offset(self, offset: int) -> "_BasePipeline": 

485 """ 

486 Skips the first `offset` number of documents from the results of previous stages. 

487 

488 This stage is useful for implementing pagination, allowing you to retrieve 

489 results in chunks. It is typically used in conjunction with `limit()` to 

490 control the size of each page. 

491 

492 Example: 

493 >>> from google.cloud.firestore_v1.pipeline_expressions import Field 

494 >>> pipeline = client.pipeline().collection("books") 

495 >>> # Retrieve the second page of 20 results (assuming sorted) 

496 >>> pipeline = pipeline.sort(Field.of("published").descending()) 

497 >>> pipeline = pipeline.offset(20) # Skip the first 20 results 

498 >>> pipeline = pipeline.limit(20) # Take the next 20 results 

499 

500 Args: 

501 offset: The non-negative number of documents to skip. 

502 

503 Returns: 

504 A new Pipeline object with this stage appended to the stage list 

505 """ 

506 return self._append(stages.Offset(offset)) 

507 

508 def limit(self, limit: int) -> "_BasePipeline": 

509 """ 

510 Limits the maximum number of documents returned by previous stages to `limit`. 

511 

512 This stage is useful for controlling the size of the result set, often used for: 

513 - **Pagination:** In combination with `offset()` to retrieve specific pages. 

514 - **Top-N queries:** To get a limited number of results after sorting. 

515 - **Performance:** To prevent excessive data transfer. 

516 

517 Example: 

518 >>> from google.cloud.firestore_v1.pipeline_expressions import Field 

519 >>> pipeline = client.pipeline().collection("books") 

520 >>> # Limit the results to the top 10 highest-rated books 

521 >>> pipeline = pipeline.sort(Field.of("rating").descending()) 

522 >>> pipeline = pipeline.limit(10) 

523 

524 Args: 

525 limit: The non-negative maximum number of documents to return. 

526 

527 Returns: 

528 A new Pipeline object with this stage appended to the stage list 

529 """ 

530 return self._append(stages.Limit(limit)) 

531 

532 def aggregate( 

533 self, 

534 *accumulators: AliasedExpression[AggregateFunction], 

535 groups: Sequence[str | Selectable] = (), 

536 ) -> "_BasePipeline": 

537 """ 

538 Performs aggregation operations on the documents from previous stages, 

539 optionally grouped by specified fields or expressions. 

540 

541 This stage allows you to calculate aggregate values (like sum, average, count, 

542 min, max) over a set of documents. 

543 

544 - **Accumulators:** Define the aggregation calculations using `AggregateFunction` 

545 expressions (e.g., `sum()`, `avg()`, `count()`, `min()`, `max()`) combined 

546 with `as_()` to name the result field. 

547 - **Groups:** Optionally specify fields (by name or `Selectable`) to group 

548 the documents by. Aggregations are then performed within each distinct group. 

549 If no groups are provided, the aggregation is performed over the entire input. 

550 Example: 

551 >>> from google.cloud.firestore_v1.pipeline_expressions import Field 

552 >>> pipeline = client.pipeline().collection("books") 

553 >>> # Calculate the average rating and total count for all books 

554 >>> pipeline = pipeline.aggregate( 

555 ... Field.of("rating").avg().as_("averageRating"), 

556 ... Field.of("rating").count().as_("totalBooks") 

557 ... ) 

558 >>> # Calculate the average rating for each genre 

559 >>> pipeline = pipeline.aggregate( 

560 ... Field.of("rating").avg().as_("avg_rating"), 

561 ... groups=["genre"] # Group by the 'genre' field 

562 ... ) 

563 >>> # Calculate the count for each author, grouping by Field object 

564 >>> pipeline = pipeline.aggregate( 

565 ... Count().as_("bookCount"), 

566 ... groups=[Field.of("author")] 

567 ... ) 

568 

569 

570 Args: 

571 *accumulators: One or more expressions defining the aggregations to perform and their 

572 corresponding output names. 

573 groups: An optional sequence of field names (str) or `Selectable` 

574 expressions to group by before aggregating. 

575 

576 Returns: 

577 A new Pipeline object with this stage appended to the stage list 

578 """ 

579 return self._append(stages.Aggregate(*accumulators, groups=groups)) 

580 

581 def distinct(self, *fields: str | Selectable) -> "_BasePipeline": 

582 """ 

583 Returns documents with distinct combinations of values for the specified 

584 fields or expressions. 

585 

586 This stage filters the results from previous stages to include only one 

587 document for each unique combination of values in the specified `fields`. 

588 The output documents contain only the fields specified in the `distinct` call. 

589 

590 Example: 

591 >>> from google.cloud.firestore_v1.pipeline_expressions import Field, to_upper 

592 >>> pipeline = client.pipeline().collection("books") 

593 >>> # Get a list of unique genres (output has only 'genre' field) 

594 >>> pipeline = pipeline.distinct("genre") 

595 >>> # Get unique combinations of author (uppercase) and genre 

596 >>> pipeline = pipeline.distinct( 

597 ... Field.of("author").to_upper().as_("authorUpper"), 

598 ... Field.of("genre") 

599 ... ) 

600 

601 

602 Args: 

603 *fields: Field names (str) or `Selectable` expressions to consider when 

604 determining distinct value combinations. The output will only 

605 contain these fields/expressions. 

606 

607 Returns: 

608 A new Pipeline object with this stage appended to the stage list 

609 """ 

610 return self._append(stages.Distinct(*fields))