1# Copyright 2024 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Classes for iterating over stream results async for the Google Cloud
16Firestore API.
17"""
18from __future__ import annotations
19
20from typing import TYPE_CHECKING, Any, AsyncGenerator, Coroutine, Optional, TypeVar
21
22from google.cloud.firestore_v1.query_profile import (
23 ExplainMetrics,
24 QueryExplainError,
25)
26import google.cloud.firestore_v1.types.query_profile as query_profile_pb
27
28if TYPE_CHECKING: # pragma: NO COVER
29 from google.cloud.firestore_v1.query_profile import ExplainOptions
30
31
32T = TypeVar("T")
33
34
35class AsyncStreamGenerator(AsyncGenerator[T, Any]):
36 """Asynchronous Generator for the streamed results.
37
38 Args:
39 response_generator (AsyncGenerator):
40 The inner generator that yields the returned results in the stream.
41 explain_options
42 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
43 Query profiling options for this stream request.
44 """
45
46 def __init__(
47 self,
48 response_generator: AsyncGenerator[T | query_profile_pb.ExplainMetrics, Any],
49 explain_options: Optional[ExplainOptions] = None,
50 ):
51 self._generator = response_generator
52 self._explain_options = explain_options
53 self._explain_metrics = None
54
55 def __aiter__(self) -> AsyncGenerator[T, Any]:
56 return self
57
58 async def __anext__(self) -> T:
59 try:
60 next_value = await self._generator.__anext__()
61 if type(next_value) is query_profile_pb.ExplainMetrics:
62 self._explain_metrics = ExplainMetrics._from_pb(next_value)
63 raise StopAsyncIteration
64 else:
65 return next_value
66 except StopAsyncIteration:
67 raise
68
69 def asend(self, value: Any = None) -> Coroutine[Any, Any, T]:
70 return self._generator.asend(value)
71
72 def athrow(self, *args, **kwargs) -> Coroutine[Any, Any, T]:
73 return self._generator.athrow(*args, **kwargs)
74
75 def aclose(self):
76 return self._generator.aclose()
77
78 @property
79 def explain_options(self) -> ExplainOptions | None:
80 """Query profiling options for this stream request."""
81 return self._explain_options
82
83 async def get_explain_metrics(self) -> ExplainMetrics:
84 """
85 Get the metrics associated with the query execution.
86 Metrics are only available when explain_options is set on the query. If
87 ExplainOptions.analyze is False, only plan_summary is available. If it is
88 True, execution_stats is also available.
89 :rtype: :class:`~google.cloud.firestore_v1.query_profile.ExplainMetrics`
90 :returns: The metrics associated with the query execution.
91 :raises: :class:`~google.cloud.firestore_v1.query_profile.QueryExplainError`
92 if explain_metrics is not available on the query.
93 """
94 if self._explain_metrics is not None:
95 return self._explain_metrics
96 elif self._explain_options is None:
97 raise QueryExplainError("explain_options not set on query.")
98 elif self._explain_options.analyze is False:
99 # We need to run the query to get the explain_metrics. Since no
100 # query results are returned, it's ok to discard the returned value.
101 try:
102 await self.__anext__()
103 except StopAsyncIteration:
104 pass
105
106 if self._explain_metrics is None:
107 raise QueryExplainError(
108 "Did not receive explain_metrics for this query, despite "
109 "explain_options is set and analyze = False."
110 )
111 else:
112 return self._explain_metrics
113 raise QueryExplainError(
114 "explain_metrics not available until query is complete."
115 )