1# Copyright 2024 Google LLC All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Classes for iterating over stream results for the Google Cloud Firestore API.
16"""
17from __future__ import annotations
18
19from typing import TYPE_CHECKING, Any, Generator, Optional, TypeVar
20
21from google.cloud.firestore_v1.query_profile import (
22 ExplainMetrics,
23 QueryExplainError,
24)
25
26if TYPE_CHECKING: # pragma: NO COVER
27 from google.cloud.firestore_v1.query_profile import ExplainOptions
28
29
30T = TypeVar("T")
31
32
33class StreamGenerator(Generator[T, Any, Optional[ExplainMetrics]]):
34 """Generator for the streamed results.
35
36 Args:
37 response_generator (Generator[T, Any, Optional[ExplainMetrics]]):
38 The inner generator that yields the returned document in the stream.
39 explain_options
40 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
41 Query profiling options for this stream request.
42 """
43
44 def __init__(
45 self,
46 response_generator: Generator[T, Any, Optional[ExplainMetrics]],
47 explain_options: Optional[ExplainOptions] = None,
48 ):
49 self._generator = response_generator
50 self._explain_options = explain_options
51 self._explain_metrics = None
52
53 def __iter__(self) -> StreamGenerator:
54 return self
55
56 def __next__(self) -> T:
57 try:
58 return self._generator.__next__()
59 except StopIteration as e:
60 # If explain_metrics is available, it would be returned.
61 if e.value:
62 self._explain_metrics = ExplainMetrics._from_pb(e.value)
63 raise
64
65 def send(self, value: Any = None) -> T:
66 return self._generator.send(value)
67
68 def throw(self, *args, **kwargs) -> T:
69 return self._generator.throw(*args, **kwargs)
70
71 def close(self):
72 return self._generator.close()
73
74 @property
75 def explain_options(self) -> ExplainOptions | None:
76 """Query profiling options for this stream request."""
77 return self._explain_options
78
79 def get_explain_metrics(self) -> ExplainMetrics:
80 """
81 Get the metrics associated with the query execution.
82 Metrics are only available when explain_options is set on the query. If
83 ExplainOptions.analyze is False, only plan_summary is available. If it is
84 True, execution_stats is also available.
85 :rtype: :class:`~google.cloud.firestore_v1.query_profile.ExplainMetrics`
86 :returns: The metrics associated with the query execution.
87 :raises: :class:`~google.cloud.firestore_v1.query_profile.QueryExplainError`
88 if explain_metrics is not available on the query.
89 """
90 if self._explain_metrics is not None:
91 return self._explain_metrics
92 elif self._explain_options is None:
93 raise QueryExplainError("explain_options not set on query.")
94 elif self._explain_options.analyze is False:
95 # We need to run the query to get the explain_metrics. Since no
96 # query results are returned, it's ok to discard the returned value.
97 try:
98 next(self)
99 except StopIteration:
100 pass
101
102 if self._explain_metrics is None:
103 raise QueryExplainError(
104 "Did not receive explain_metrics for this query, despite "
105 "explain_options is set and analyze = False."
106 )
107 else:
108 return self._explain_metrics
109 raise QueryExplainError(
110 "explain_metrics not available until query is complete."
111 )