Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/firestore_v1/pipeline.py: 60%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

20 statements  

1# Copyright 2025 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14""" 

15.. warning:: 

16 **Preview API**: Firestore Pipelines is currently in preview and is 

17 subject to potential breaking changes in future releases. 

18""" 

19 

20from __future__ import annotations 

21from typing import TYPE_CHECKING 

22from google.cloud.firestore_v1 import pipeline_stages as stages 

23from google.cloud.firestore_v1.base_pipeline import _BasePipeline 

24from google.cloud.firestore_v1.pipeline_result import PipelineStream 

25from google.cloud.firestore_v1.pipeline_result import PipelineSnapshot 

26from google.cloud.firestore_v1.pipeline_result import PipelineResult 

27 

28if TYPE_CHECKING: # pragma: NO COVER 

29 import datetime 

30 from google.cloud.firestore_v1.client import Client 

31 from google.cloud.firestore_v1.pipeline_expressions import Constant 

32 from google.cloud.firestore_v1.transaction import Transaction 

33 from google.cloud.firestore_v1.types.document import Value 

34 from google.cloud.firestore_v1.query_profile import PipelineExplainOptions 

35 

36 

37class Pipeline(_BasePipeline): 

38 """ 

39 Pipelines allow for complex data transformations and queries involving 

40 multiple stages like filtering, projection, aggregation, and vector search. 

41 

42 Usage Example: 

43 >>> from google.cloud.firestore_v1.pipeline_expressions import Field 

44 >>> 

45 >>> def run_pipeline(): 

46 ... client = Client(...) 

47 ... pipeline = client.pipeline() 

48 ... .collection("books") 

49 ... .where(Field.of("published").gt(1980)) 

50 ... .select("title", "author") 

51 ... for result in pipeline.execute(): 

52 ... print(result) 

53 

54 Use `client.pipeline()` to create instances of this class. 

55 

56 .. warning:: 

57 **Preview API**: Firestore Pipelines is currently in preview and is 

58 subject to potential breaking changes in future releases. 

59 """ 

60 

61 def __init__(self, client: Client, *stages: stages.Stage): 

62 """ 

63 Initializes a Pipeline. 

64 

65 Args: 

66 client: The `Client` instance to use for execution. 

67 *stages: Initial stages for the pipeline. 

68 """ 

69 super().__init__(client, *stages) 

70 

71 def execute( 

72 self, 

73 *, 

74 transaction: "Transaction" | None = None, 

75 read_time: datetime.datetime | None = None, 

76 explain_options: PipelineExplainOptions | None = None, 

77 additional_options: dict[str, Value | Constant] = {}, 

78 ) -> PipelineSnapshot[PipelineResult]: 

79 """ 

80 Executes this pipeline and returns results as a list 

81 

82 Args: 

83 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

84 An existing transaction that this query will run in. 

85 If a ``transaction`` is used and it already has write operations 

86 added, this method cannot be used (i.e. read-after-write is not 

87 allowed). 

88 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

89 time. This must be a microsecond precision timestamp within the past one hour, or 

90 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp 

91 within the past 7 days. For the most accurate results, use UTC timezone. 

92 explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]): 

93 Options to enable query profiling for this query. When set, 

94 explain_metrics will be available on the returned list. 

95 additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query. 

96 These options will take precedence over method argument if there is a conflict (e.g. explain_options) 

97 """ 

98 kwargs = {k: v for k, v in locals().items() if k != "self"} 

99 stream = PipelineStream(PipelineResult, self, **kwargs) 

100 results = [result for result in stream] 

101 return PipelineSnapshot(results, stream) 

102 

103 def stream( 

104 self, 

105 *, 

106 transaction: "Transaction" | None = None, 

107 read_time: datetime.datetime | None = None, 

108 explain_options: PipelineExplainOptions | None = None, 

109 additional_options: dict[str, Value | Constant] = {}, 

110 ) -> PipelineStream[PipelineResult]: 

111 """ 

112 Process this pipeline as a stream, providing results through an Iterable 

113 

114 Args: 

115 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

116 An existing transaction that this query will run in. 

117 If a ``transaction`` is used and it already has write operations 

118 added, this method cannot be used (i.e. read-after-write is not 

119 allowed). 

120 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

121 time. This must be a microsecond precision timestamp within the past one hour, or 

122 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp 

123 within the past 7 days. For the most accurate results, use UTC timezone. 

124 explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]): 

125 Options to enable query profiling for this query. When set, 

126 explain_metrics will be available on the returned generator. 

127 additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query. 

128 These options will take precedence over method argument if there is a conflict (e.g. explain_options) 

129 """ 

130 kwargs = {k: v for k, v in locals().items() if k != "self"} 

131 return PipelineStream(PipelineResult, self, **kwargs)