Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/firestore_v1/async_pipeline.py: 60%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

20 statements  

1# Copyright 2025 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14""" 

15.. warning:: 

16 **Preview API**: Firestore Pipelines is currently in preview and is 

17 subject to potential breaking changes in future releases 

18""" 

19 

20from __future__ import annotations 

21from typing import TYPE_CHECKING 

22from google.cloud.firestore_v1 import pipeline_stages as stages 

23from google.cloud.firestore_v1.base_pipeline import _BasePipeline 

24from google.cloud.firestore_v1.pipeline_result import AsyncPipelineStream 

25from google.cloud.firestore_v1.pipeline_result import PipelineSnapshot 

26from google.cloud.firestore_v1.pipeline_result import PipelineResult 

27 

28if TYPE_CHECKING: # pragma: NO COVER 

29 import datetime 

30 from google.cloud.firestore_v1.async_client import AsyncClient 

31 from google.cloud.firestore_v1.async_transaction import AsyncTransaction 

32 from google.cloud.firestore_v1.pipeline_expressions import Constant 

33 from google.cloud.firestore_v1.types.document import Value 

34 from google.cloud.firestore_v1.query_profile import PipelineExplainOptions 

35 

36 

37class AsyncPipeline(_BasePipeline): 

38 """ 

39 Pipelines allow for complex data transformations and queries involving 

40 multiple stages like filtering, projection, aggregation, and vector search. 

41 

42 This class extends `_BasePipeline` and provides methods to execute the 

43 defined pipeline stages using an asynchronous `AsyncClient`. 

44 

45 Usage Example: 

46 >>> from google.cloud.firestore_v1.pipeline_expressions import Field 

47 >>> 

48 >>> async def run_pipeline(): 

49 ... client = AsyncClient(...) 

50 ... pipeline = client.pipeline() 

51 ... .collection("books") 

52 ... .where(Field.of("published").gt(1980)) 

53 ... .select("title", "author") 

54 ... async for result in pipeline.stream(): 

55 ... print(result) 

56 

57 Use `client.pipeline()` to create instances of this class. 

58 

59 .. warning:: 

60 **Preview API**: Firestore Pipelines is currently in preview and is 

61 subject to potential breaking changes in future releases 

62 """ 

63 

64 def __init__(self, client: AsyncClient, *stages: stages.Stage): 

65 """ 

66 Initializes an asynchronous Pipeline. 

67 

68 Args: 

69 client: The asynchronous `AsyncClient` instance to use for execution. 

70 *stages: Initial stages for the pipeline. 

71 """ 

72 super().__init__(client, *stages) 

73 

74 async def execute( 

75 self, 

76 *, 

77 transaction: "AsyncTransaction" | None = None, 

78 read_time: datetime.datetime | None = None, 

79 explain_options: PipelineExplainOptions | None = None, 

80 additional_options: dict[str, Value | Constant] = {}, 

81 ) -> PipelineSnapshot[PipelineResult]: 

82 """ 

83 Executes this pipeline and returns results as a list 

84 

85 Args: 

86 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

87 An existing transaction that this query will run in. 

88 If a ``transaction`` is used and it already has write operations 

89 added, this method cannot be used (i.e. read-after-write is not 

90 allowed). 

91 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

92 time. This must be a microsecond precision timestamp within the past one hour, or 

93 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp 

94 within the past 7 days. For the most accurate results, use UTC timezone. 

95 explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]): 

96 Options to enable query profiling for this query. When set, 

97 explain_metrics will be available on the returned list. 

98 additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query. 

99 These options will take precedence over method argument if there is a conflict (e.g. explain_options) 

100 """ 

101 kwargs = {k: v for k, v in locals().items() if k != "self"} 

102 stream = AsyncPipelineStream(PipelineResult, self, **kwargs) 

103 results = [result async for result in stream] 

104 return PipelineSnapshot(results, stream) 

105 

106 def stream( 

107 self, 

108 *, 

109 read_time: datetime.datetime | None = None, 

110 transaction: "AsyncTransaction" | None = None, 

111 explain_options: PipelineExplainOptions | None = None, 

112 additional_options: dict[str, Value | Constant] = {}, 

113 ) -> AsyncPipelineStream[PipelineResult]: 

114 """ 

115 Process this pipeline as a stream, providing results through an AsyncIterable 

116 

117 Args: 

118 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

119 An existing transaction that this query will run in. 

120 If a ``transaction`` is used and it already has write operations 

121 added, this method cannot be used (i.e. read-after-write is not 

122 allowed). 

123 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

124 time. This must be a microsecond precision timestamp within the past one hour, or 

125 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp 

126 within the past 7 days. For the most accurate results, use UTC timezone. 

127 explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]): 

128 Options to enable query profiling for this query. When set, 

129 explain_metrics will be available on the returned generator. 

130 additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query. 

131 These options will take precedence over method argument if there is a conflict (e.g. explain_options) 

132 """ 

133 kwargs = {k: v for k, v in locals().items() if k != "self"} 

134 return AsyncPipelineStream(PipelineResult, self, **kwargs)