Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/firestore_v1/async_query.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

91 statements  

1# Copyright 2020 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Classes for representing queries for the Google Cloud Firestore API. 

16 

17A :class:`~google.cloud.firestore_v1.query.Query` can be created directly from 

18a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be 

19a more common way to create a query than direct usage of the constructor. 

20""" 

21from __future__ import annotations 

22 

23from typing import ( 

24 TYPE_CHECKING, 

25 Any, 

26 AsyncGenerator, 

27 List, 

28 Optional, 

29 Type, 

30 Union, 

31 Sequence, 

32) 

33 

34from google.api_core import gapic_v1 

35from google.api_core import retry_async as retries 

36 

37from google.cloud import firestore_v1 

38from google.cloud.firestore_v1.async_aggregation import AsyncAggregationQuery 

39from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator 

40from google.cloud.firestore_v1.async_vector_query import AsyncVectorQuery 

41from google.cloud.firestore_v1.base_query import ( 

42 BaseCollectionGroup, 

43 BaseQuery, 

44 QueryPartition, 

45 _collection_group_query_response_to_snapshot, 

46 _enum_from_direction, 

47 _query_response_to_snapshot, 

48) 

49from google.cloud.firestore_v1.query_results import QueryResultsList 

50 

51if TYPE_CHECKING: # pragma: NO COVER 

52 import datetime 

53 

54 # Types needed only for Type Hints 

55 from google.cloud.firestore_v1.async_transaction import AsyncTransaction 

56 from google.cloud.firestore_v1.base_document import DocumentSnapshot 

57 from google.cloud.firestore_v1.base_vector_query import DistanceMeasure 

58 from google.cloud.firestore_v1.field_path import FieldPath 

59 from google.cloud.firestore_v1.query_profile import ExplainMetrics, ExplainOptions 

60 import google.cloud.firestore_v1.types.query_profile as query_profile_pb 

61 from google.cloud.firestore_v1.vector import Vector 

62 

63 

64class AsyncQuery(BaseQuery): 

65 """Represents a query to the Firestore API. 

66 

67 Instances of this class are considered immutable: all methods that 

68 would modify an instance instead return a new instance. 

69 

70 Args: 

71 parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`): 

72 The collection that this query applies to. 

73 projection (Optional[:class:`google.cloud.firestore_v1.\ 

74 query.StructuredQuery.Projection`]): 

75 A projection of document fields to limit the query results to. 

76 field_filters (Optional[Tuple[:class:`google.cloud.firestore_v1.\ 

77 query.StructuredQuery.FieldFilter`, ...]]): 

78 The filters to be applied in the query. 

79 orders (Optional[Tuple[:class:`google.cloud.firestore_v1.\ 

80 query.StructuredQuery.Order`, ...]]): 

81 The "order by" entries to use in the query. 

82 limit (Optional[int]): 

83 The maximum number of documents the query is allowed to return. 

84 offset (Optional[int]): 

85 The number of results to skip. 

86 start_at (Optional[Tuple[dict, bool]]): 

87 Two-tuple of : 

88 

89 * a mapping of fields. Any field that is present in this mapping 

90 must also be present in ``orders`` 

91 * an ``after`` flag 

92 

93 The fields and the flag combine to form a cursor used as 

94 a starting point in a query result set. If the ``after`` 

95 flag is :data:`True`, the results will start just after any 

96 documents which have fields matching the cursor, otherwise 

97 any matching documents will be included in the result set. 

98 When the query is formed, the document values 

99 will be used in the order given by ``orders``. 

100 end_at (Optional[Tuple[dict, bool]]): 

101 Two-tuple of: 

102 

103 * a mapping of fields. Any field that is present in this mapping 

104 must also be present in ``orders`` 

105 * a ``before`` flag 

106 

107 The fields and the flag combine to form a cursor used as 

108 an ending point in a query result set. If the ``before`` 

109 flag is :data:`True`, the results will end just before any 

110 documents which have fields matching the cursor, otherwise 

111 any matching documents will be included in the result set. 

112 When the query is formed, the document values 

113 will be used in the order given by ``orders``. 

114 all_descendants (Optional[bool]): 

115 When false, selects only collections that are immediate children 

116 of the `parent` specified in the containing `RunQueryRequest`. 

117 When true, selects all descendant collections. 

118 recursive (Optional[bool]): 

119 When true, returns all documents and all documents in any subcollections 

120 below them. Defaults to false. 

121 """ 

122 

123 def __init__( 

124 self, 

125 parent, 

126 projection=None, 

127 field_filters=(), 

128 orders=(), 

129 limit=None, 

130 limit_to_last=False, 

131 offset=None, 

132 start_at=None, 

133 end_at=None, 

134 all_descendants=False, 

135 recursive=False, 

136 ) -> None: 

137 super(AsyncQuery, self).__init__( 

138 parent=parent, 

139 projection=projection, 

140 field_filters=field_filters, 

141 orders=orders, 

142 limit=limit, 

143 limit_to_last=limit_to_last, 

144 offset=offset, 

145 start_at=start_at, 

146 end_at=end_at, 

147 all_descendants=all_descendants, 

148 recursive=recursive, 

149 ) 

150 

151 async def _chunkify( 

152 self, chunk_size: int 

153 ) -> AsyncGenerator[List[DocumentSnapshot], None]: 

154 max_to_return: Optional[int] = self._limit 

155 num_returned: int = 0 

156 original: AsyncQuery = self._copy() 

157 last_document: Optional[DocumentSnapshot] = None 

158 

159 while True: 

160 # Optionally trim the `chunk_size` down to honor a previously 

161 # applied limit as set by `self.limit()` 

162 _chunk_size: int = original._resolve_chunk_size(num_returned, chunk_size) 

163 

164 # Apply the optionally pruned limit and the cursor, if we are past 

165 # the first page. 

166 _q = original.limit(_chunk_size) 

167 

168 if last_document: 

169 _q = _q.start_after(last_document) 

170 

171 snapshots = await _q.get() 

172 

173 if snapshots: 

174 last_document = snapshots[-1] 

175 

176 num_returned += len(snapshots) 

177 

178 yield snapshots 

179 

180 # Terminate the iterator if we have reached either of two end 

181 # conditions: 

182 # 1. There are no more documents, or 

183 # 2. We have reached the desired overall limit 

184 if len(snapshots) < _chunk_size or ( 

185 max_to_return and num_returned >= max_to_return 

186 ): 

187 return 

188 

189 async def get( 

190 self, 

191 transaction: Optional[AsyncTransaction] = None, 

192 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT, 

193 timeout: Optional[float] = None, 

194 *, 

195 explain_options: Optional[ExplainOptions] = None, 

196 read_time: Optional[datetime.datetime] = None, 

197 ) -> QueryResultsList[DocumentSnapshot]: 

198 """Read the documents in the collection that match this query. 

199 

200 This sends a ``RunQuery`` RPC and returns a list of documents 

201 returned in the stream of ``RunQueryResponse`` messages. 

202 

203 Args: 

204 transaction 

205 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

206 An existing transaction that this query will run in. 

207 retry (Optional[google.api_core.retry.Retry]): Designation of what 

208 errors, if any, should be retried. Defaults to a 

209 system-specified policy. 

210 timeout (Otional[float]): The timeout for this request. Defaults 

211 to a system-specified value. 

212 explain_options 

213 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

214 Options to enable query profiling for this query. When set, 

215 explain_metrics will be available on the returned generator. 

216 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

217 time. This must be a microsecond precision timestamp within the past one hour, or 

218 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp 

219 within the past 7 days. For the most accurate results, use UTC timezone. 

220 

221 If a ``transaction`` is used and it already has write operations 

222 added, this method cannot be used (i.e. read-after-write is not 

223 allowed). 

224 

225 Returns: 

226 QueryResultsList[DocumentSnapshot]: The documents in the collection 

227 that match this query. 

228 """ 

229 explain_metrics: ExplainMetrics | None = None 

230 

231 is_limited_to_last = self._limit_to_last 

232 

233 if self._limit_to_last: 

234 # In order to fetch up to `self._limit` results from the end of the 

235 # query flip the defined ordering on the query to start from the 

236 # end, retrieving up to `self._limit` results from the backend. 

237 for order in self._orders: 

238 order.direction = _enum_from_direction( 

239 self.DESCENDING 

240 if order.direction == self.ASCENDING 

241 else self.ASCENDING 

242 ) 

243 self._limit_to_last = False 

244 result = self.stream( 

245 transaction=transaction, 

246 retry=retry, 

247 timeout=timeout, 

248 explain_options=explain_options, 

249 read_time=read_time, 

250 ) 

251 try: 

252 result_list = [d async for d in result] 

253 if is_limited_to_last: 

254 result_list = list(reversed(result_list)) 

255 

256 if explain_options is None: 

257 explain_metrics = None 

258 else: 

259 explain_metrics = await result.get_explain_metrics() 

260 finally: 

261 await result.aclose() 

262 

263 return QueryResultsList(result_list, explain_options, explain_metrics) 

264 

265 def find_nearest( 

266 self, 

267 vector_field: str, 

268 query_vector: Union[Vector, Sequence[float]], 

269 limit: int, 

270 distance_measure: DistanceMeasure, 

271 *, 

272 distance_result_field: Optional[str] = None, 

273 distance_threshold: Optional[float] = None, 

274 ) -> AsyncVectorQuery: 

275 """ 

276 Finds the closest vector embeddings to the given query vector. 

277 

278 Args: 

279 vector_field (str): An indexed vector field to search upon. Only documents which contain 

280 vectors whose dimensionality match the query_vector can be returned. 

281 query_vector (Vector | Sequence[float]): The query vector that we are searching on. Must be a vector of no more 

282 than 2048 dimensions. 

283 limit (int): The number of nearest neighbors to return. Must be a positive integer of no more than 1000. 

284 distance_measure (:class:`DistanceMeasure`): The Distance Measure to use. 

285 distance_result_field (Optional[str]): 

286 Name of the field to output the result of the vector distance 

287 calculation. If unset then the distance will not be returned. 

288 distance_threshold (Optional[float]): 

289 A threshold for which no less similar documents will be returned. 

290 

291 Returns: 

292 :class`~firestore_v1.vector_query.VectorQuery`: the vector query. 

293 """ 

294 return AsyncVectorQuery(self).find_nearest( 

295 vector_field=vector_field, 

296 query_vector=query_vector, 

297 limit=limit, 

298 distance_measure=distance_measure, 

299 distance_result_field=distance_result_field, 

300 distance_threshold=distance_threshold, 

301 ) 

302 

303 def count( 

304 self, alias: str | None = None 

305 ) -> Type["firestore_v1.async_aggregation.AsyncAggregationQuery"]: 

306 """Adds a count over the nested query. 

307 

308 Args: 

309 alias(Optional[str]): Optional name of the field to store the result of the aggregation into. 

310 If not provided, Firestore will pick a default name following the format field_<incremental_id++>. 

311 

312 Returns: 

313 :class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery`: 

314 An instance of an AsyncAggregationQuery object 

315 """ 

316 return AsyncAggregationQuery(self).count(alias=alias) 

317 

318 def sum( 

319 self, field_ref: str | FieldPath, alias: str | None = None 

320 ) -> Type["firestore_v1.async_aggregation.AsyncAggregationQuery"]: 

321 """Adds a sum over the nested query. 

322 

323 Args: 

324 field_ref(Union[str, google.cloud.firestore_v1.field_path.FieldPath]): The field to aggregate across. 

325 alias(Optional[str]): Optional name of the field to store the result of the aggregation into. 

326 If not provided, Firestore will pick a default name following the format field_<incremental_id++>. 

327 

328 Returns: 

329 :class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery`: 

330 An instance of an AsyncAggregationQuery object 

331 """ 

332 return AsyncAggregationQuery(self).sum(field_ref, alias=alias) 

333 

334 def avg( 

335 self, field_ref: str | FieldPath, alias: str | None = None 

336 ) -> Type["firestore_v1.async_aggregation.AsyncAggregationQuery"]: 

337 """Adds an avg over the nested query. 

338 

339 Args: 

340 field_ref(Union[str, google.cloud.firestore_v1.field_path.FieldPath]): The field to aggregate across. 

341 alias(Optional[str]): Optional name of the field to store the result of the aggregation into. 

342 If not provided, Firestore will pick a default name following the format field_<incremental_id++>. 

343 

344 Returns: 

345 :class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery`: 

346 An instance of an AsyncAggregationQuery object 

347 """ 

348 return AsyncAggregationQuery(self).avg(field_ref, alias=alias) 

349 

350 async def _make_stream( 

351 self, 

352 transaction: Optional[AsyncTransaction] = None, 

353 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT, 

354 timeout: Optional[float] = None, 

355 explain_options: Optional[ExplainOptions] = None, 

356 read_time: Optional[datetime.datetime] = None, 

357 ) -> AsyncGenerator[DocumentSnapshot | query_profile_pb.ExplainMetrics, Any]: 

358 """Internal method for stream(). Read the documents in the collection 

359 that match this query. 

360 

361 This sends a ``RunQuery`` RPC and then returns a generator which 

362 consumes each document returned in the stream of ``RunQueryResponse`` 

363 messages. 

364 

365 .. note:: 

366 

367 The underlying stream of responses will time out after 

368 the ``max_rpc_timeout_millis`` value set in the GAPIC 

369 client configuration for the ``RunQuery`` API. Snapshots 

370 not consumed from the iterator before that point will be lost. 

371 

372 If a ``transaction`` is used and it already has write operations 

373 added, this method cannot be used (i.e. read-after-write is not 

374 allowed). 

375 

376 Args: 

377 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\ 

378 Transaction`]): 

379 An existing transaction that the query will run in. 

380 retry (Optional[google.api_core.retry.Retry]): Designation of what 

381 errors, if any, should be retried. Defaults to a 

382 system-specified policy. 

383 timeout (Optional[float]): The timeout for this request. Defaults 

384 to a system-specified value. 

385 explain_options 

386 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

387 Options to enable query profiling for this query. When set, 

388 explain_metrics will be available on the returned generator. 

389 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

390 time. This must be a microsecond precision timestamp within the past one hour, or 

391 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp 

392 within the past 7 days. For the most accurate results, use UTC timezone. 

393 

394 Yields: 

395 [:class:`~google.cloud.firestore_v1.base_document.DocumentSnapshot` \ 

396 | google.cloud.firestore_v1.types.query_profile.ExplainMetrtics]: 

397 The next document that fulfills the query. Query results will be 

398 yielded as `DocumentSnapshot`. When the result contains returned 

399 explain metrics, yield `query_profile_pb.ExplainMetrics` individually. 

400 """ 

401 request, expected_prefix, kwargs = self._prep_stream( 

402 transaction, 

403 retry, 

404 timeout, 

405 explain_options, 

406 read_time, 

407 ) 

408 

409 response_iterator = await self._client._firestore_api.run_query( 

410 request=request, 

411 metadata=self._client._rpc_metadata, 

412 **kwargs, 

413 ) 

414 

415 async for response in response_iterator: 

416 if self._all_descendants: 

417 snapshot = _collection_group_query_response_to_snapshot( 

418 response, self._parent 

419 ) 

420 else: 

421 snapshot = _query_response_to_snapshot( 

422 response, self._parent, expected_prefix 

423 ) 

424 if snapshot is not None: 

425 yield snapshot 

426 

427 if response.explain_metrics: 

428 metrics = response.explain_metrics 

429 yield metrics 

430 

431 def stream( 

432 self, 

433 transaction: Optional[AsyncTransaction] = None, 

434 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT, 

435 timeout: Optional[float] = None, 

436 *, 

437 explain_options: Optional[ExplainOptions] = None, 

438 read_time: Optional[datetime.datetime] = None, 

439 ) -> AsyncStreamGenerator[DocumentSnapshot]: 

440 """Read the documents in the collection that match this query. 

441 

442 This sends a ``RunQuery`` RPC and then returns a generator which 

443 consumes each document returned in the stream of ``RunQueryResponse`` 

444 messages. 

445 

446 .. note:: 

447 

448 The underlying stream of responses will time out after 

449 the ``max_rpc_timeout_millis`` value set in the GAPIC 

450 client configuration for the ``RunQuery`` API. Snapshots 

451 not consumed from the iterator before that point will be lost. 

452 

453 If a ``transaction`` is used and it already has write operations 

454 added, this method cannot be used (i.e. read-after-write is not 

455 allowed). 

456 

457 Args: 

458 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\ 

459 Transaction`]): 

460 An existing transaction that the query will run in. 

461 retry (Optional[google.api_core.retry.Retry]): Designation of what 

462 errors, if any, should be retried. Defaults to a 

463 system-specified policy. 

464 timeout (Optional[float]): The timeout for this request. Defaults 

465 to a system-specified value. 

466 explain_options 

467 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

468 Options to enable query profiling for this query. When set, 

469 explain_metrics will be available on the returned generator. 

470 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

471 time. This must be a microsecond precision timestamp within the past one hour, or 

472 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp 

473 within the past 7 days. For the most accurate results, use UTC timezone. 

474 

475 Returns: 

476 `AsyncStreamGenerator[DocumentSnapshot]`: 

477 An asynchronous generator of the queryresults. 

478 """ 

479 inner_generator = self._make_stream( 

480 transaction=transaction, 

481 retry=retry, 

482 timeout=timeout, 

483 explain_options=explain_options, 

484 read_time=read_time, 

485 ) 

486 return AsyncStreamGenerator(inner_generator, explain_options) 

487 

488 @staticmethod 

489 def _get_collection_reference_class() -> ( 

490 Type["firestore_v1.async_collection.AsyncCollectionReference"] 

491 ): 

492 from google.cloud.firestore_v1.async_collection import AsyncCollectionReference 

493 

494 return AsyncCollectionReference 

495 

496 

497class AsyncCollectionGroup(AsyncQuery, BaseCollectionGroup): 

498 """Represents a Collection Group in the Firestore API. 

499 

500 This is a specialization of :class:`.AsyncQuery` that includes all documents in the 

501 database that are contained in a collection or subcollection of the given 

502 parent. 

503 

504 Args: 

505 parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`): 

506 The collection that this query applies to. 

507 """ 

508 

509 def __init__( 

510 self, 

511 parent, 

512 projection=None, 

513 field_filters=(), 

514 orders=(), 

515 limit=None, 

516 limit_to_last=False, 

517 offset=None, 

518 start_at=None, 

519 end_at=None, 

520 all_descendants=True, 

521 recursive=False, 

522 ) -> None: 

523 super(AsyncCollectionGroup, self).__init__( 

524 parent=parent, 

525 projection=projection, 

526 field_filters=field_filters, 

527 orders=orders, 

528 limit=limit, 

529 limit_to_last=limit_to_last, 

530 offset=offset, 

531 start_at=start_at, 

532 end_at=end_at, 

533 all_descendants=all_descendants, 

534 recursive=recursive, 

535 ) 

536 

537 @staticmethod 

538 def _get_query_class(): 

539 return AsyncQuery 

540 

541 async def get_partitions( 

542 self, 

543 partition_count, 

544 retry: retries.AsyncRetry | object | None = gapic_v1.method.DEFAULT, 

545 timeout: float | None = None, 

546 *, 

547 read_time: Optional[datetime.datetime] = None, 

548 ) -> AsyncGenerator[QueryPartition, None]: 

549 """Partition a query for parallelization. 

550 

551 Partitions a query by returning partition cursors that can be used to run the 

552 query in parallel. The returned partition cursors are split points that can be 

553 used as starting/end points for the query results. 

554 

555 Args: 

556 partition_count (int): The desired maximum number of partition points. The 

557 number must be strictly positive. The actual number of partitions 

558 returned may be fewer. 

559 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

560 should be retried. Defaults to a system-specified policy. 

561 timeout (float): The timeout for this request. Defaults to a 

562 system-specified value. 

563 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

564 time. This must be a microsecond precision timestamp within the past one hour, or 

565 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp 

566 within the past 7 days. For the most accurate results, use UTC timezone. 

567 """ 

568 request, kwargs = self._prep_get_partitions( 

569 partition_count, retry, timeout, read_time 

570 ) 

571 

572 pager = await self._client._firestore_api.partition_query( 

573 request=request, 

574 metadata=self._client._rpc_metadata, 

575 **kwargs, 

576 ) 

577 

578 start_at = None 

579 async for cursor_pb in pager: 

580 cursor = self._client.document(cursor_pb.values[0].reference_value) 

581 yield QueryPartition(self, start_at, cursor) 

582 start_at = cursor 

583 

584 yield QueryPartition(self, start_at, None)