Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/firestore_v1/query.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

118 statements  

1# Copyright 2017 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Classes for representing queries for the Google Cloud Firestore API. 

16 

17A :class:`~google.cloud.firestore_v1.query.Query` can be created directly from 

18a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be 

19a more common way to create a query than direct usage of the constructor. 

20""" 

21from __future__ import annotations 

22 

23from typing import ( 

24 TYPE_CHECKING, 

25 Any, 

26 Callable, 

27 Generator, 

28 List, 

29 Optional, 

30 Sequence, 

31 Type, 

32 Union, 

33) 

34 

35from google.api_core import exceptions, gapic_v1 

36from google.api_core import retry as retries 

37 

38from google.cloud import firestore_v1 

39from google.cloud.firestore_v1 import aggregation, transaction 

40from google.cloud.firestore_v1.query_results import QueryResultsList 

41from google.cloud.firestore_v1.base_document import ( 

42 DocumentSnapshot, 

43) 

44from google.cloud.firestore_v1.base_query import ( 

45 BaseCollectionGroup, 

46 BaseQuery, 

47 QueryPartition, 

48 _collection_group_query_response_to_snapshot, 

49 _enum_from_direction, 

50 _query_response_to_snapshot, 

51) 

52from google.cloud.firestore_v1.stream_generator import StreamGenerator 

53from google.cloud.firestore_v1.vector import Vector 

54from google.cloud.firestore_v1.vector_query import VectorQuery 

55from google.cloud.firestore_v1.watch import Watch 

56 

57if TYPE_CHECKING: # pragma: NO COVER 

58 from google.cloud.firestore_v1.base_vector_query import DistanceMeasure 

59 from google.cloud.firestore_v1.field_path import FieldPath 

60 from google.cloud.firestore_v1.query_profile import ExplainMetrics, ExplainOptions 

61 

62 import datetime 

63 

64 

65class Query(BaseQuery): 

66 """Represents a query to the Firestore API. 

67 

68 Instances of this class are considered immutable: all methods that 

69 would modify an instance instead return a new instance. 

70 

71 Args: 

72 parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`): 

73 The collection that this query applies to. 

74 projection (Optional[:class:`google.cloud.firestore_v1.\ 

75 query.StructuredQuery.Projection`]): 

76 A projection of document fields to limit the query results to. 

77 field_filters (Optional[Tuple[:class:`google.cloud.firestore_v1.\ 

78 query.StructuredQuery.FieldFilter`, ...]]): 

79 The filters to be applied in the query. 

80 orders (Optional[Tuple[:class:`google.cloud.firestore_v1.\ 

81 query.StructuredQuery.Order`, ...]]): 

82 The "order by" entries to use in the query. 

83 limit (Optional[int]): 

84 The maximum number of documents the query is allowed to return. 

85 offset (Optional[int]): 

86 The number of results to skip. 

87 start_at (Optional[Tuple[dict, bool]]): 

88 Two-tuple of : 

89 

90 * a mapping of fields. Any field that is present in this mapping 

91 must also be present in ``orders`` 

92 * an ``after`` flag 

93 

94 The fields and the flag combine to form a cursor used as 

95 a starting point in a query result set. If the ``after`` 

96 flag is :data:`True`, the results will start just after any 

97 documents which have fields matching the cursor, otherwise 

98 any matching documents will be included in the result set. 

99 When the query is formed, the document values 

100 will be used in the order given by ``orders``. 

101 end_at (Optional[Tuple[dict, bool]]): 

102 Two-tuple of: 

103 

104 * a mapping of fields. Any field that is present in this mapping 

105 must also be present in ``orders`` 

106 * a ``before`` flag 

107 

108 The fields and the flag combine to form a cursor used as 

109 an ending point in a query result set. If the ``before`` 

110 flag is :data:`True`, the results will end just before any 

111 documents which have fields matching the cursor, otherwise 

112 any matching documents will be included in the result set. 

113 When the query is formed, the document values 

114 will be used in the order given by ``orders``. 

115 all_descendants (Optional[bool]): 

116 When false, selects only collections that are immediate children 

117 of the `parent` specified in the containing `RunQueryRequest`. 

118 When true, selects all descendant collections. 

119 """ 

120 

121 def __init__( 

122 self, 

123 parent, 

124 projection=None, 

125 field_filters=(), 

126 orders=(), 

127 limit=None, 

128 limit_to_last=False, 

129 offset=None, 

130 start_at=None, 

131 end_at=None, 

132 all_descendants=False, 

133 recursive=False, 

134 ) -> None: 

135 super(Query, self).__init__( 

136 parent=parent, 

137 projection=projection, 

138 field_filters=field_filters, 

139 orders=orders, 

140 limit=limit, 

141 limit_to_last=limit_to_last, 

142 offset=offset, 

143 start_at=start_at, 

144 end_at=end_at, 

145 all_descendants=all_descendants, 

146 recursive=recursive, 

147 ) 

148 

149 def get( 

150 self, 

151 transaction=None, 

152 retry: retries.Retry | object | None = gapic_v1.method.DEFAULT, 

153 timeout: Optional[float] = None, 

154 *, 

155 explain_options: Optional[ExplainOptions] = None, 

156 read_time: Optional[datetime.datetime] = None, 

157 ) -> QueryResultsList[DocumentSnapshot]: 

158 """Read the documents in the collection that match this query. 

159 

160 This sends a ``RunQuery`` RPC and returns a list of documents 

161 returned in the stream of ``RunQueryResponse`` messages. 

162 

163 Args: 

164 transaction 

165 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

166 An existing transaction that this query will run in. 

167 If a ``transaction`` is used and it already has write operations 

168 added, this method cannot be used (i.e. read-after-write is not 

169 allowed). 

170 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

171 should be retried. Defaults to a system-specified policy. 

172 timeout (float): The timeout for this request. Defaults to a 

173 system-specified value. 

174 explain_options 

175 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

176 Options to enable query profiling for this query. When set, 

177 explain_metrics will be available on the returned generator. 

178 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

179 time. This must be a microsecond precision timestamp within the past one hour, or 

180 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp 

181 within the past 7 days. For the most accurate results, use UTC timezone. 

182 

183 Returns: 

184 QueryResultsList[DocumentSnapshot]: The documents in the collection 

185 that match this query. 

186 """ 

187 explain_metrics: ExplainMetrics | None = None 

188 

189 is_limited_to_last = self._limit_to_last 

190 

191 if self._limit_to_last: 

192 # In order to fetch up to `self._limit` results from the end of the 

193 # query flip the defined ordering on the query to start from the 

194 # end, retrieving up to `self._limit` results from the backend. 

195 for order in self._orders: 

196 order.direction = _enum_from_direction( 

197 self.DESCENDING 

198 if order.direction.name == self.ASCENDING 

199 else self.ASCENDING 

200 ) 

201 self._limit_to_last = False 

202 

203 result = self.stream( 

204 transaction=transaction, 

205 retry=retry, 

206 timeout=timeout, 

207 explain_options=explain_options, 

208 read_time=read_time, 

209 ) 

210 result_list = list(result) 

211 if is_limited_to_last: 

212 result_list = list(reversed(result_list)) 

213 

214 if explain_options is None: 

215 explain_metrics = None 

216 else: 

217 explain_metrics = result.get_explain_metrics() 

218 

219 return QueryResultsList(result_list, explain_options, explain_metrics) 

220 

221 def _chunkify( 

222 self, chunk_size: int 

223 ) -> Generator[List[DocumentSnapshot], None, None]: 

224 max_to_return: Optional[int] = self._limit 

225 num_returned: int = 0 

226 original: Query = self._copy() 

227 last_document: Optional[DocumentSnapshot] = None 

228 

229 while True: 

230 # Optionally trim the `chunk_size` down to honor a previously 

231 # applied limits as set by `self.limit()` 

232 _chunk_size: int = original._resolve_chunk_size(num_returned, chunk_size) 

233 

234 # Apply the optionally pruned limit and the cursor, if we are past 

235 # the first page. 

236 _q = original.limit(_chunk_size) 

237 

238 if last_document: 

239 _q = _q.start_after(last_document) 

240 

241 snapshots = _q.get() 

242 

243 if snapshots: 

244 last_document = snapshots[-1] 

245 

246 num_returned += len(snapshots) 

247 

248 yield snapshots 

249 

250 # Terminate the iterator if we have reached either of two end 

251 # conditions: 

252 # 1. There are no more documents, or 

253 # 2. We have reached the desired overall limit 

254 if len(snapshots) < _chunk_size or ( 

255 max_to_return and num_returned >= max_to_return 

256 ): 

257 return 

258 

259 def _get_stream_iterator( 

260 self, transaction, retry, timeout, explain_options=None, read_time=None 

261 ): 

262 """Helper method for :meth:`stream`.""" 

263 request, expected_prefix, kwargs = self._prep_stream( 

264 transaction, retry, timeout, explain_options, read_time 

265 ) 

266 

267 response_iterator = self._client._firestore_api.run_query( 

268 request=request, 

269 metadata=self._client._rpc_metadata, 

270 **kwargs, 

271 ) 

272 

273 return response_iterator, expected_prefix 

274 

275 def _retry_query_after_exception(self, exc, retry, transaction): 

276 """Helper method for :meth:`stream`.""" 

277 if transaction is None: # no snapshot-based retry inside transaction 

278 if retry is gapic_v1.method.DEFAULT: 

279 transport = self._client._firestore_api._transport 

280 gapic_callable = transport.run_query 

281 retry = gapic_callable._retry 

282 return retry._predicate(exc) 

283 

284 return False 

285 

286 def find_nearest( 

287 self, 

288 vector_field: str, 

289 query_vector: Union[Vector, Sequence[float]], 

290 limit: int, 

291 distance_measure: DistanceMeasure, 

292 *, 

293 distance_result_field: Optional[str] = None, 

294 distance_threshold: Optional[float] = None, 

295 ) -> Type["firestore_v1.vector_query.VectorQuery"]: 

296 """ 

297 Finds the closest vector embeddings to the given query vector. 

298 

299 Args: 

300 vector_field (str): An indexed vector field to search upon. Only documents which contain 

301 vectors whose dimensionality match the query_vector can be returned. 

302 query_vector(Vector | Sequence[float]): The query vector that we are searching on. Must be a vector of no more 

303 than 2048 dimensions. 

304 limit (int): The number of nearest neighbors to return. Must be a positive integer of no more than 1000. 

305 distance_measure (:class:`DistanceMeasure`): The Distance Measure to use. 

306 distance_result_field (Optional[str]): 

307 Name of the field to output the result of the vector distance 

308 calculation. If unset then the distance will not be returned. 

309 distance_threshold (Optional[float]): 

310 A threshold for which no less similar documents will be returned. 

311 

312 

313 Returns: 

314 :class`~firestore_v1.vector_query.VectorQuery`: the vector query. 

315 """ 

316 return VectorQuery(self).find_nearest( 

317 vector_field=vector_field, 

318 query_vector=query_vector, 

319 limit=limit, 

320 distance_measure=distance_measure, 

321 distance_result_field=distance_result_field, 

322 distance_threshold=distance_threshold, 

323 ) 

324 

325 def count( 

326 self, alias: str | None = None 

327 ) -> Type["firestore_v1.aggregation.AggregationQuery"]: 

328 """ 

329 Adds a count over the query. 

330 

331 :type alias: Optional[str] 

332 :param alias: Optional name of the field to store the result of the aggregation into. 

333 If not provided, Firestore will pick a default name following the format field_<incremental_id++>. 

334 """ 

335 return aggregation.AggregationQuery(self).count(alias=alias) 

336 

337 def sum( 

338 self, field_ref: str | FieldPath, alias: str | None = None 

339 ) -> Type["firestore_v1.aggregation.AggregationQuery"]: 

340 """ 

341 Adds a sum over the query. 

342 

343 :type field_ref: Union[str, google.cloud.firestore_v1.field_path.FieldPath] 

344 :param field_ref: The field to aggregate across. 

345 

346 :type alias: Optional[str] 

347 :param alias: Optional name of the field to store the result of the aggregation into. 

348 If not provided, Firestore will pick a default name following the format field_<incremental_id++>. 

349 """ 

350 return aggregation.AggregationQuery(self).sum(field_ref, alias=alias) 

351 

352 def avg( 

353 self, field_ref: str | FieldPath, alias: str | None = None 

354 ) -> Type["firestore_v1.aggregation.AggregationQuery"]: 

355 """ 

356 Adds an avg over the query. 

357 

358 :type field_ref: [Union[str, google.cloud.firestore_v1.field_path.FieldPath] 

359 :param field_ref: The field to aggregate across. 

360 

361 :type alias: Optional[str] 

362 :param alias: Optional name of the field to store the result of the aggregation into. 

363 If not provided, Firestore will pick a default name following the format field_<incremental_id++>. 

364 """ 

365 return aggregation.AggregationQuery(self).avg(field_ref, alias=alias) 

366 

367 def _make_stream( 

368 self, 

369 transaction: Optional[transaction.Transaction] = None, 

370 retry: retries.Retry | object | None = gapic_v1.method.DEFAULT, 

371 timeout: float | None = None, 

372 explain_options: Optional[ExplainOptions] = None, 

373 read_time: Optional[datetime.datetime] = None, 

374 ) -> Generator[DocumentSnapshot, Any, Optional[ExplainMetrics]]: 

375 """Internal method for stream(). Read the documents in the collection 

376 that match this query. 

377 

378 Internal method for stream(). 

379 This sends a ``RunQuery`` RPC and then returns a generator which 

380 consumes each document returned in the stream of ``RunQueryResponse`` 

381 messages. 

382 

383 .. note:: 

384 

385 The underlying stream of responses will time out after 

386 the ``max_rpc_timeout_millis`` value set in the GAPIC 

387 client configuration for the ``RunQuery`` API. Snapshots 

388 not consumed from the iterator before that point will be lost. 

389 

390 If a ``transaction`` is used and it already has write operations 

391 added, this method cannot be used (i.e. read-after-write is not 

392 allowed). 

393 

394 Args: 

395 transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\ 

396 Transaction`]): 

397 An existing transaction that the query will run in. 

398 retry (Optional[google.api_core.retry.Retry]): Designation of what 

399 errors, if any, should be retried. Defaults to a 

400 system-specified policy. 

401 timeout (Optional[float]): The timeout for this request. Defaults 

402 to a system-specified value. 

403 explain_options 

404 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

405 Options to enable query profiling for this query. When set, 

406 explain_metrics will be available on the returned generator. 

407 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

408 time. This must be a microsecond precision timestamp within the past one hour, or 

409 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp 

410 within the past 7 days. For the most accurate results, use UTC timezone. 

411 

412 Yields: 

413 DocumentSnapshot: 

414 The next document that fulfills the query. 

415 

416 Returns: 

417 ([google.cloud.firestore_v1.types.query_profile.ExplainMetrtics | None]): 

418 The results of query profiling, if received from the service. 

419 """ 

420 metrics: ExplainMetrics | None = None 

421 

422 response_iterator, expected_prefix = self._get_stream_iterator( 

423 transaction, 

424 retry, 

425 timeout, 

426 explain_options, 

427 read_time, 

428 ) 

429 

430 last_snapshot = None 

431 

432 while True: 

433 try: 

434 response = next(response_iterator, None) 

435 except exceptions.GoogleAPICallError as exc: 

436 if self._retry_query_after_exception(exc, retry, transaction): 

437 new_query = self.start_after(last_snapshot) 

438 response_iterator, _ = new_query._get_stream_iterator( 

439 transaction, 

440 retry, 

441 timeout, 

442 read_time=read_time, 

443 ) 

444 continue 

445 else: 

446 raise 

447 

448 if response is None: # EOI 

449 break 

450 

451 if metrics is None and response.explain_metrics: 

452 metrics = response.explain_metrics 

453 

454 if self._all_descendants: 

455 snapshot = _collection_group_query_response_to_snapshot( 

456 response, self._parent 

457 ) 

458 else: 

459 snapshot = _query_response_to_snapshot( 

460 response, self._parent, expected_prefix 

461 ) 

462 if snapshot is not None: 

463 last_snapshot = snapshot 

464 yield snapshot 

465 

466 return metrics 

467 

468 def stream( 

469 self, 

470 transaction: transaction.Transaction | None = None, 

471 retry: retries.Retry | object | None = gapic_v1.method.DEFAULT, 

472 timeout: float | None = None, 

473 *, 

474 explain_options: Optional[ExplainOptions] = None, 

475 read_time: Optional[datetime.datetime] = None, 

476 ) -> StreamGenerator[DocumentSnapshot]: 

477 """Read the documents in the collection that match this query. 

478 

479 This sends a ``RunQuery`` RPC and then returns a generator which 

480 consumes each document returned in the stream of ``RunQueryResponse`` 

481 messages. 

482 

483 .. note:: 

484 

485 The underlying stream of responses will time out after 

486 the ``max_rpc_timeout_millis`` value set in the GAPIC 

487 client configuration for the ``RunQuery`` API. Snapshots 

488 not consumed from the iterator before that point will be lost. 

489 

490 If a ``transaction`` is used and it already has write operations 

491 added, this method cannot be used (i.e. read-after-write is not 

492 allowed). 

493 

494 Args: 

495 transaction 

496 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

497 An existing transaction that this query will run in. 

498 retry (Optional[google.api_core.retry.Retry]): Designation of what 

499 errors, if any, should be retried. Defaults to a 

500 system-specified policy. 

501 timeout (Optinal[float]): The timeout for this request. Defaults 

502 to a system-specified value. 

503 explain_options 

504 (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): 

505 Options to enable query profiling for this query. When set, 

506 explain_metrics will be available on the returned generator. 

507 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

508 time. This must be a microsecond precision timestamp within the past one hour, or 

509 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp 

510 within the past 7 days. For the most accurate results, use UTC timezone. 

511 

512 Returns: 

513 `StreamGenerator[DocumentSnapshot]`: A generator of the query results. 

514 """ 

515 inner_generator = self._make_stream( 

516 transaction=transaction, 

517 retry=retry, 

518 timeout=timeout, 

519 explain_options=explain_options, 

520 read_time=read_time, 

521 ) 

522 return StreamGenerator(inner_generator, explain_options) 

523 

524 def on_snapshot(self, callback: Callable) -> Watch: 

525 """Monitor the documents in this collection that match this query. 

526 

527 This starts a watch on this query using a background thread. The 

528 provided callback is run on the snapshot of the documents. 

529 

530 Args: 

531 callback(Callable[[:class:`~google.cloud.firestore.query.QuerySnapshot`], NoneType]): 

532 a callback to run when a change occurs. 

533 

534 Example: 

535 

536 .. code-block:: python 

537 

538 from google.cloud import firestore_v1 

539 

540 db = firestore_v1.Client() 

541 query_ref = db.collection(u'users').where("user", "==", u'Ada') 

542 

543 def on_snapshot(docs, changes, read_time): 

544 for doc in docs: 

545 print(u'{} => {}'.format(doc.id, doc.to_dict())) 

546 

547 # Watch this query 

548 query_watch = query_ref.on_snapshot(on_snapshot) 

549 

550 # Terminate this watch 

551 query_watch.unsubscribe() 

552 """ 

553 return Watch.for_query(self, callback, DocumentSnapshot) 

554 

555 @staticmethod 

556 def _get_collection_reference_class() -> ( 

557 Type["firestore_v1.collection.CollectionReference"] 

558 ): 

559 from google.cloud.firestore_v1.collection import CollectionReference 

560 

561 return CollectionReference 

562 

563 

564class CollectionGroup(Query, BaseCollectionGroup): 

565 """Represents a Collection Group in the Firestore API. 

566 

567 This is a specialization of :class:`.Query` that includes all documents in the 

568 database that are contained in a collection or subcollection of the given 

569 parent. 

570 

571 Args: 

572 parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`): 

573 The collection that this query applies to. 

574 """ 

575 

576 def __init__( 

577 self, 

578 parent, 

579 projection=None, 

580 field_filters=(), 

581 orders=(), 

582 limit=None, 

583 limit_to_last=False, 

584 offset=None, 

585 start_at=None, 

586 end_at=None, 

587 all_descendants=True, 

588 recursive=False, 

589 ) -> None: 

590 super(CollectionGroup, self).__init__( 

591 parent=parent, 

592 projection=projection, 

593 field_filters=field_filters, 

594 orders=orders, 

595 limit=limit, 

596 limit_to_last=limit_to_last, 

597 offset=offset, 

598 start_at=start_at, 

599 end_at=end_at, 

600 all_descendants=all_descendants, 

601 recursive=recursive, 

602 ) 

603 

604 @staticmethod 

605 def _get_query_class(): 

606 return Query 

607 

608 def get_partitions( 

609 self, 

610 partition_count, 

611 retry: retries.Retry | object | None = gapic_v1.method.DEFAULT, 

612 timeout: float | None = None, 

613 *, 

614 read_time: Optional[datetime.datetime] = None, 

615 ) -> Generator[QueryPartition, None, None]: 

616 """Partition a query for parallelization. 

617 

618 Partitions a query by returning partition cursors that can be used to run the 

619 query in parallel. The returned partition cursors are split points that can be 

620 used as starting/end points for the query results. 

621 

622 Args: 

623 partition_count (int): The desired maximum number of partition points. The 

624 number must be strictly positive. The actual number of partitions 

625 returned may be fewer. 

626 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

627 should be retried. Defaults to a system-specified policy. 

628 timeout (float): The timeout for this request. Defaults to a 

629 system-specified value. 

630 read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given 

631 time. This must be a microsecond precision timestamp within the past one hour, or 

632 if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp 

633 within the past 7 days. For the most accurate results, use UTC timezone. 

634 """ 

635 request, kwargs = self._prep_get_partitions( 

636 partition_count, retry, timeout, read_time 

637 ) 

638 

639 pager = self._client._firestore_api.partition_query( 

640 request=request, 

641 metadata=self._client._rpc_metadata, 

642 **kwargs, 

643 ) 

644 

645 start_at = None 

646 for cursor_pb in pager: 

647 cursor = self._client.document(cursor_pb.values[0].reference_value) 

648 yield QueryPartition(self, start_at, cursor) 

649 start_at = cursor 

650 

651 yield QueryPartition(self, start_at, None)