Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/firestore_v1/async_stream_generator.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

46 statements  

1# Copyright 2024 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Classes for iterating over stream results async for the Google Cloud 

16Firestore API. 

17""" 

18from __future__ import annotations 

19 

20from typing import TYPE_CHECKING, Any, AsyncGenerator, Coroutine, Optional, TypeVar 

21 

22from google.cloud.firestore_v1.query_profile import ( 

23 ExplainMetrics, 

24 QueryExplainError, 

25) 

26import google.cloud.firestore_v1.types.query_profile as query_profile_pb 

27 

28if TYPE_CHECKING: # pragma: NO COVER 

29 from google.cloud.firestore_v1.query_profile import ExplainOptions 

30 

31 

32T = TypeVar("T") 

33 

34 

35class AsyncStreamGenerator(AsyncGenerator[T, Any]): 

36 """Asynchronous Generator for the streamed results. 

37 

38 Args: 

39 response_generator (AsyncGenerator): 

40 The inner generator that yields the returned results in the stream. 

41 explain_options 

42 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

43 Query profiling options for this stream request. 

44 """ 

45 

46 def __init__( 

47 self, 

48 response_generator: AsyncGenerator[T | query_profile_pb.ExplainMetrics, Any], 

49 explain_options: Optional[ExplainOptions] = None, 

50 ): 

51 self._generator = response_generator 

52 self._explain_options = explain_options 

53 self._explain_metrics = None 

54 

55 def __aiter__(self) -> AsyncGenerator[T, Any]: 

56 return self 

57 

58 async def __anext__(self) -> T: 

59 try: 

60 next_value = await self._generator.__anext__() 

61 if type(next_value) is query_profile_pb.ExplainMetrics: 

62 self._explain_metrics = ExplainMetrics._from_pb(next_value) 

63 raise StopAsyncIteration 

64 else: 

65 return next_value 

66 except StopAsyncIteration: 

67 raise 

68 

69 def asend(self, value: Any = None) -> Coroutine[Any, Any, T]: 

70 return self._generator.asend(value) 

71 

72 def athrow(self, *args, **kwargs) -> Coroutine[Any, Any, T]: 

73 return self._generator.athrow(*args, **kwargs) 

74 

75 def aclose(self): 

76 return self._generator.aclose() 

77 

78 @property 

79 def explain_options(self) -> ExplainOptions | None: 

80 """Query profiling options for this stream request.""" 

81 return self._explain_options 

82 

83 async def get_explain_metrics(self) -> ExplainMetrics: 

84 """ 

85 Get the metrics associated with the query execution. 

86 Metrics are only available when explain_options is set on the query. If 

87 ExplainOptions.analyze is False, only plan_summary is available. If it is 

88 True, execution_stats is also available. 

89 :rtype: :class:`~google.cloud.firestore_v1.query_profile.ExplainMetrics` 

90 :returns: The metrics associated with the query execution. 

91 :raises: :class:`~google.cloud.firestore_v1.query_profile.QueryExplainError` 

92 if explain_metrics is not available on the query. 

93 """ 

94 if self._explain_metrics is not None: 

95 return self._explain_metrics 

96 elif self._explain_options is None: 

97 raise QueryExplainError("explain_options not set on query.") 

98 elif self._explain_options.analyze is False: 

99 # We need to run the query to get the explain_metrics. Since no 

100 # query results are returned, it's ok to discard the returned value. 

101 try: 

102 await self.__anext__() 

103 except StopAsyncIteration: 

104 pass 

105 

106 if self._explain_metrics is None: 

107 raise QueryExplainError( 

108 "Did not receive explain_metrics for this query, despite " 

109 "explain_options is set and analyze = False." 

110 ) 

111 else: 

112 return self._explain_metrics 

113 raise QueryExplainError( 

114 "explain_metrics not available until query is complete." 

115 )