Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/firestore_v1/async_query.py: 34%

74 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-09 06:27 +0000

1# Copyright 2020 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Classes for representing queries for the Google Cloud Firestore API. 

16 

17A :class:`~google.cloud.firestore_v1.query.Query` can be created directly from 

18a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be 

19a more common way to create a query than direct usage of the constructor. 

20""" 

21from __future__ import annotations 

22 

23from google.api_core import gapic_v1 

24from google.api_core import retry as retries 

25 

26from google.cloud import firestore_v1 

27from google.cloud.firestore_v1.base_query import ( 

28 BaseCollectionGroup, 

29 BaseQuery, 

30 QueryPartition, 

31 _query_response_to_snapshot, 

32 _collection_group_query_response_to_snapshot, 

33 _enum_from_direction, 

34) 

35 

36from google.cloud.firestore_v1 import async_document 

37from google.cloud.firestore_v1.async_aggregation import AsyncAggregationQuery 

38from google.cloud.firestore_v1.base_document import DocumentSnapshot 

39from typing import AsyncGenerator, List, Optional, Type, TYPE_CHECKING 

40 

41if TYPE_CHECKING: # pragma: NO COVER 

42 # Types needed only for Type Hints 

43 from google.cloud.firestore_v1.transaction import Transaction 

44 from google.cloud.firestore_v1.field_path import FieldPath 

45 

46 

47class AsyncQuery(BaseQuery): 

48 """Represents a query to the Firestore API. 

49 

50 Instances of this class are considered immutable: all methods that 

51 would modify an instance instead return a new instance. 

52 

53 Args: 

54 parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`): 

55 The collection that this query applies to. 

56 projection (Optional[:class:`google.cloud.proto.firestore.v1.\ 

57 query.StructuredQuery.Projection`]): 

58 A projection of document fields to limit the query results to. 

59 field_filters (Optional[Tuple[:class:`google.cloud.proto.firestore.v1.\ 

60 query.StructuredQuery.FieldFilter`, ...]]): 

61 The filters to be applied in the query. 

62 orders (Optional[Tuple[:class:`google.cloud.proto.firestore.v1.\ 

63 query.StructuredQuery.Order`, ...]]): 

64 The "order by" entries to use in the query. 

65 limit (Optional[int]): 

66 The maximum number of documents the query is allowed to return. 

67 offset (Optional[int]): 

68 The number of results to skip. 

69 start_at (Optional[Tuple[dict, bool]]): 

70 Two-tuple of : 

71 

72 * a mapping of fields. Any field that is present in this mapping 

73 must also be present in ``orders`` 

74 * an ``after`` flag 

75 

76 The fields and the flag combine to form a cursor used as 

77 a starting point in a query result set. If the ``after`` 

78 flag is :data:`True`, the results will start just after any 

79 documents which have fields matching the cursor, otherwise 

80 any matching documents will be included in the result set. 

81 When the query is formed, the document values 

82 will be used in the order given by ``orders``. 

83 end_at (Optional[Tuple[dict, bool]]): 

84 Two-tuple of: 

85 

86 * a mapping of fields. Any field that is present in this mapping 

87 must also be present in ``orders`` 

88 * a ``before`` flag 

89 

90 The fields and the flag combine to form a cursor used as 

91 an ending point in a query result set. If the ``before`` 

92 flag is :data:`True`, the results will end just before any 

93 documents which have fields matching the cursor, otherwise 

94 any matching documents will be included in the result set. 

95 When the query is formed, the document values 

96 will be used in the order given by ``orders``. 

97 all_descendants (Optional[bool]): 

98 When false, selects only collections that are immediate children 

99 of the `parent` specified in the containing `RunQueryRequest`. 

100 When true, selects all descendant collections. 

101 recursive (Optional[bool]): 

102 When true, returns all documents and all documents in any subcollections 

103 below them. Defaults to false. 

104 """ 

105 

106 def __init__( 

107 self, 

108 parent, 

109 projection=None, 

110 field_filters=(), 

111 orders=(), 

112 limit=None, 

113 limit_to_last=False, 

114 offset=None, 

115 start_at=None, 

116 end_at=None, 

117 all_descendants=False, 

118 recursive=False, 

119 ) -> None: 

120 super(AsyncQuery, self).__init__( 

121 parent=parent, 

122 projection=projection, 

123 field_filters=field_filters, 

124 orders=orders, 

125 limit=limit, 

126 limit_to_last=limit_to_last, 

127 offset=offset, 

128 start_at=start_at, 

129 end_at=end_at, 

130 all_descendants=all_descendants, 

131 recursive=recursive, 

132 ) 

133 

134 async def _chunkify( 

135 self, chunk_size: int 

136 ) -> AsyncGenerator[List[DocumentSnapshot], None]: 

137 max_to_return: Optional[int] = self._limit 

138 num_returned: int = 0 

139 original: AsyncQuery = self._copy() 

140 last_document: Optional[DocumentSnapshot] = None 

141 

142 while True: 

143 # Optionally trim the `chunk_size` down to honor a previously 

144 # applied limit as set by `self.limit()` 

145 _chunk_size: int = original._resolve_chunk_size(num_returned, chunk_size) 

146 

147 # Apply the optionally pruned limit and the cursor, if we are past 

148 # the first page. 

149 _q = original.limit(_chunk_size) 

150 

151 if last_document: 

152 _q = _q.start_after(last_document) 

153 

154 snapshots = await _q.get() 

155 

156 if snapshots: 

157 last_document = snapshots[-1] 

158 

159 num_returned += len(snapshots) 

160 

161 yield snapshots 

162 

163 # Terminate the iterator if we have reached either of two end 

164 # conditions: 

165 # 1. There are no more documents, or 

166 # 2. We have reached the desired overall limit 

167 if len(snapshots) < _chunk_size or ( 

168 max_to_return and num_returned >= max_to_return 

169 ): 

170 return 

171 

172 async def get( 

173 self, 

174 transaction: Transaction = None, 

175 retry: retries.Retry = gapic_v1.method.DEFAULT, 

176 timeout: float = None, 

177 ) -> list: 

178 """Read the documents in the collection that match this query. 

179 

180 This sends a ``RunQuery`` RPC and returns a list of documents 

181 returned in the stream of ``RunQueryResponse`` messages. 

182 

183 Args: 

184 transaction 

185 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

186 An existing transaction that this query will run in. 

187 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

188 should be retried. Defaults to a system-specified policy. 

189 timeout (float): The timeout for this request. Defaults to a 

190 system-specified value. 

191 

192 If a ``transaction`` is used and it already has write operations 

193 added, this method cannot be used (i.e. read-after-write is not 

194 allowed). 

195 

196 Returns: 

197 list: The documents in the collection that match this query. 

198 """ 

199 is_limited_to_last = self._limit_to_last 

200 

201 if self._limit_to_last: 

202 # In order to fetch up to `self._limit` results from the end of the 

203 # query flip the defined ordering on the query to start from the 

204 # end, retrieving up to `self._limit` results from the backend. 

205 for order in self._orders: 

206 order.direction = _enum_from_direction( 

207 self.DESCENDING 

208 if order.direction == self.ASCENDING 

209 else self.ASCENDING 

210 ) 

211 self._limit_to_last = False 

212 

213 result = self.stream(transaction=transaction, retry=retry, timeout=timeout) 

214 result = [d async for d in result] 

215 if is_limited_to_last: 

216 result = list(reversed(result)) 

217 

218 return result 

219 

220 def count( 

221 self, alias: str | None = None 

222 ) -> Type["firestore_v1.async_aggregation.AsyncAggregationQuery"]: 

223 """Adds a count over the nested query. 

224 

225 Args: 

226 alias(Optional[str]): Optional name of the field to store the result of the aggregation into. 

227 If not provided, Firestore will pick a default name following the format field_<incremental_id++>. 

228 

229 Returns: 

230 :class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery`: 

231 An instance of an AsyncAggregationQuery object 

232 """ 

233 return AsyncAggregationQuery(self).count(alias=alias) 

234 

235 def sum( 

236 self, field_ref: str | FieldPath, alias: str | None = None 

237 ) -> Type["firestore_v1.async_aggregation.AsyncAggregationQuery"]: 

238 """Adds a sum over the nested query. 

239 

240 Args: 

241 field_ref(Union[str, google.cloud.firestore_v1.field_path.FieldPath]): The field to aggregate across. 

242 alias(Optional[str]): Optional name of the field to store the result of the aggregation into. 

243 If not provided, Firestore will pick a default name following the format field_<incremental_id++>. 

244 

245 Returns: 

246 :class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery`: 

247 An instance of an AsyncAggregationQuery object 

248 """ 

249 return AsyncAggregationQuery(self).sum(field_ref, alias=alias) 

250 

251 def avg( 

252 self, field_ref: str | FieldPath, alias: str | None = None 

253 ) -> Type["firestore_v1.async_aggregation.AsyncAggregationQuery"]: 

254 """Adds an avg over the nested query. 

255 

256 Args: 

257 field_ref(Union[str, google.cloud.firestore_v1.field_path.FieldPath]): The field to aggregate across. 

258 alias(Optional[str]): Optional name of the field to store the result of the aggregation into. 

259 If not provided, Firestore will pick a default name following the format field_<incremental_id++>. 

260 

261 Returns: 

262 :class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery`: 

263 An instance of an AsyncAggregationQuery object 

264 """ 

265 return AsyncAggregationQuery(self).avg(field_ref, alias=alias) 

266 

267 async def stream( 

268 self, 

269 transaction=None, 

270 retry: retries.Retry = gapic_v1.method.DEFAULT, 

271 timeout: float = None, 

272 ) -> AsyncGenerator[async_document.DocumentSnapshot, None]: 

273 """Read the documents in the collection that match this query. 

274 

275 This sends a ``RunQuery`` RPC and then returns an iterator which 

276 consumes each document returned in the stream of ``RunQueryResponse`` 

277 messages. 

278 

279 .. note:: 

280 

281 The underlying stream of responses will time out after 

282 the ``max_rpc_timeout_millis`` value set in the GAPIC 

283 client configuration for the ``RunQuery`` API. Snapshots 

284 not consumed from the iterator before that point will be lost. 

285 

286 If a ``transaction`` is used and it already has write operations 

287 added, this method cannot be used (i.e. read-after-write is not 

288 allowed). 

289 

290 Args: 

291 transaction 

292 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

293 An existing transaction that this query will run in. 

294 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

295 should be retried. Defaults to a system-specified policy. 

296 timeout (float): The timeout for this request. Defaults to a 

297 system-specified value. 

298 

299 Yields: 

300 :class:`~google.cloud.firestore_v1.async_document.DocumentSnapshot`: 

301 The next document that fulfills the query. 

302 """ 

303 request, expected_prefix, kwargs = self._prep_stream( 

304 transaction, 

305 retry, 

306 timeout, 

307 ) 

308 

309 response_iterator = await self._client._firestore_api.run_query( 

310 request=request, 

311 metadata=self._client._rpc_metadata, 

312 **kwargs, 

313 ) 

314 

315 async for response in response_iterator: 

316 if self._all_descendants: 

317 snapshot = _collection_group_query_response_to_snapshot( 

318 response, self._parent 

319 ) 

320 else: 

321 snapshot = _query_response_to_snapshot( 

322 response, self._parent, expected_prefix 

323 ) 

324 if snapshot is not None: 

325 yield snapshot 

326 

327 @staticmethod 

328 def _get_collection_reference_class() -> ( 

329 Type["firestore_v1.async_collection.AsyncCollectionReference"] 

330 ): 

331 from google.cloud.firestore_v1.async_collection import AsyncCollectionReference 

332 

333 return AsyncCollectionReference 

334 

335 

336class AsyncCollectionGroup(AsyncQuery, BaseCollectionGroup): 

337 """Represents a Collection Group in the Firestore API. 

338 

339 This is a specialization of :class:`.AsyncQuery` that includes all documents in the 

340 database that are contained in a collection or subcollection of the given 

341 parent. 

342 

343 Args: 

344 parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`): 

345 The collection that this query applies to. 

346 """ 

347 

348 def __init__( 

349 self, 

350 parent, 

351 projection=None, 

352 field_filters=(), 

353 orders=(), 

354 limit=None, 

355 limit_to_last=False, 

356 offset=None, 

357 start_at=None, 

358 end_at=None, 

359 all_descendants=True, 

360 recursive=False, 

361 ) -> None: 

362 super(AsyncCollectionGroup, self).__init__( 

363 parent=parent, 

364 projection=projection, 

365 field_filters=field_filters, 

366 orders=orders, 

367 limit=limit, 

368 limit_to_last=limit_to_last, 

369 offset=offset, 

370 start_at=start_at, 

371 end_at=end_at, 

372 all_descendants=all_descendants, 

373 recursive=recursive, 

374 ) 

375 

376 @staticmethod 

377 def _get_query_class(): 

378 return AsyncQuery 

379 

380 async def get_partitions( 

381 self, 

382 partition_count, 

383 retry: retries.Retry = gapic_v1.method.DEFAULT, 

384 timeout: float = None, 

385 ) -> AsyncGenerator[QueryPartition, None]: 

386 """Partition a query for parallelization. 

387 

388 Partitions a query by returning partition cursors that can be used to run the 

389 query in parallel. The returned partition cursors are split points that can be 

390 used as starting/end points for the query results. 

391 

392 Args: 

393 partition_count (int): The desired maximum number of partition points. The 

394 number must be strictly positive. The actual number of partitions 

395 returned may be fewer. 

396 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

397 should be retried. Defaults to a system-specified policy. 

398 timeout (float): The timeout for this request. Defaults to a 

399 system-specified value. 

400 """ 

401 request, kwargs = self._prep_get_partitions(partition_count, retry, timeout) 

402 pager = await self._client._firestore_api.partition_query( 

403 request=request, 

404 metadata=self._client._rpc_metadata, 

405 **kwargs, 

406 ) 

407 

408 start_at = None 

409 async for cursor_pb in pager: 

410 cursor = self._client.document(cursor_pb.values[0].reference_value) 

411 yield QueryPartition(self, start_at, cursor) 

412 start_at = cursor 

413 

414 yield QueryPartition(self, start_at, None)