Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/firestore_v1/pipeline_source.py: 60%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

25 statements  

1# Copyright 2025 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14""" 

15.. warning:: 

16 **Preview API**: Firestore Pipelines is currently in preview and is 

17 subject to potential breaking changes in future releases. 

18""" 

19 

20from __future__ import annotations 

21from typing import Generic, TypeVar, TYPE_CHECKING 

22from google.cloud.firestore_v1 import pipeline_stages as stages 

23from google.cloud.firestore_v1.base_pipeline import _BasePipeline 

24from google.cloud.firestore_v1._helpers import DOCUMENT_PATH_DELIMITER 

25 

26if TYPE_CHECKING: # pragma: NO COVER 

27 from google.cloud.firestore_v1.client import Client 

28 from google.cloud.firestore_v1.async_client import AsyncClient 

29 from google.cloud.firestore_v1.base_document import BaseDocumentReference 

30 from google.cloud.firestore_v1.base_query import BaseQuery 

31 from google.cloud.firestore_v1.base_aggregation import BaseAggregationQuery 

32 from google.cloud.firestore_v1.base_collection import BaseCollectionReference 

33 

34 

35PipelineType = TypeVar("PipelineType", bound=_BasePipeline) 

36 

37 

38class PipelineSource(Generic[PipelineType]): 

39 """ 

40 A factory for creating Pipeline instances, which provide a framework for building data 

41 transformation and query pipelines for Firestore. 

42 

43 Not meant to be instantiated directly. Instead, start by calling client.pipeline() 

44 to obtain an instance of PipelineSource. From there, you can use the provided 

45 methods to specify the data source for your pipeline. 

46 """ 

47 

48 def __init__(self, client: Client | AsyncClient): 

49 self.client = client 

50 

51 def _create_pipeline(self, source_stage): 

52 return self.client._pipeline_cls._create_with_stages(self.client, source_stage) 

53 

54 def create_from( 

55 self, query: "BaseQuery" | "BaseAggregationQuery" | "BaseCollectionReference" 

56 ) -> PipelineType: 

57 """ 

58 Create a pipeline from an existing query 

59 

60 Args: 

61 query: the query to build the pipeline off of 

62 Returns: 

63 a new pipeline instance representing the query 

64 """ 

65 return query._build_pipeline(self) 

66 

67 def collection(self, path: str | tuple[str]) -> PipelineType: 

68 """ 

69 Creates a new Pipeline that operates on a specified Firestore collection. 

70 

71 Args: 

72 path: The path to the Firestore collection (e.g., "users"). Can either be: 

73 * A single ``/``-delimited path to a collection 

74 * A tuple of collection path segment 

75 Returns: 

76 a new pipeline instance targeting the specified collection 

77 """ 

78 if isinstance(path, tuple): 

79 path = DOCUMENT_PATH_DELIMITER.join(path) 

80 return self._create_pipeline(stages.Collection(path)) 

81 

82 def collection_group(self, collection_id: str) -> PipelineType: 

83 """ 

84 Creates a new Pipeline that that operates on all documents in a collection group. 

85 Args: 

86 collection_id: The ID of the collection group 

87 Returns: 

88 a new pipeline instance targeting the specified collection group 

89 """ 

90 return self._create_pipeline(stages.CollectionGroup(collection_id)) 

91 

92 def database(self) -> PipelineType: 

93 """ 

94 Creates a new Pipeline that operates on all documents in the Firestore database. 

95 Returns: 

96 a new pipeline instance targeting the specified collection 

97 """ 

98 return self._create_pipeline(stages.Database()) 

99 

100 def documents(self, *docs: "BaseDocumentReference") -> PipelineType: 

101 """ 

102 Creates a new Pipeline that operates on a specific set of Firestore documents. 

103 Args: 

104 docs: The DocumentReference instances representing the documents to include in the pipeline. 

105 Returns: 

106 a new pipeline instance targeting the specified documents 

107 """ 

108 return self._create_pipeline(stages.Documents.of(*docs))