Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/firestore_v1/query.py: 33%

99 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-09 06:27 +0000

1# Copyright 2017 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Classes for representing queries for the Google Cloud Firestore API. 

16 

17A :class:`~google.cloud.firestore_v1.query.Query` can be created directly from 

18a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be 

19a more common way to create a query than direct usage of the constructor. 

20""" 

21from __future__ import annotations 

22 

23from google.cloud import firestore_v1 

24from google.cloud.firestore_v1.base_document import DocumentSnapshot 

25from google.api_core import exceptions 

26from google.api_core import gapic_v1 

27from google.api_core import retry as retries 

28 

29from google.cloud.firestore_v1.base_query import ( 

30 BaseCollectionGroup, 

31 BaseQuery, 

32 QueryPartition, 

33 _query_response_to_snapshot, 

34 _collection_group_query_response_to_snapshot, 

35 _enum_from_direction, 

36) 

37from google.cloud.firestore_v1 import aggregation 

38 

39from google.cloud.firestore_v1 import document 

40from google.cloud.firestore_v1.watch import Watch 

41from typing import Any, Callable, Generator, List, Optional, Type, TYPE_CHECKING 

42 

43if TYPE_CHECKING: # pragma: NO COVER 

44 from google.cloud.firestore_v1.field_path import FieldPath 

45 

46 

47class Query(BaseQuery): 

48 """Represents a query to the Firestore API. 

49 

50 Instances of this class are considered immutable: all methods that 

51 would modify an instance instead return a new instance. 

52 

53 Args: 

54 parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`): 

55 The collection that this query applies to. 

56 projection (Optional[:class:`google.cloud.proto.firestore.v1.\ 

57 query.StructuredQuery.Projection`]): 

58 A projection of document fields to limit the query results to. 

59 field_filters (Optional[Tuple[:class:`google.cloud.proto.firestore.v1.\ 

60 query.StructuredQuery.FieldFilter`, ...]]): 

61 The filters to be applied in the query. 

62 orders (Optional[Tuple[:class:`google.cloud.proto.firestore.v1.\ 

63 query.StructuredQuery.Order`, ...]]): 

64 The "order by" entries to use in the query. 

65 limit (Optional[int]): 

66 The maximum number of documents the query is allowed to return. 

67 offset (Optional[int]): 

68 The number of results to skip. 

69 start_at (Optional[Tuple[dict, bool]]): 

70 Two-tuple of : 

71 

72 * a mapping of fields. Any field that is present in this mapping 

73 must also be present in ``orders`` 

74 * an ``after`` flag 

75 

76 The fields and the flag combine to form a cursor used as 

77 a starting point in a query result set. If the ``after`` 

78 flag is :data:`True`, the results will start just after any 

79 documents which have fields matching the cursor, otherwise 

80 any matching documents will be included in the result set. 

81 When the query is formed, the document values 

82 will be used in the order given by ``orders``. 

83 end_at (Optional[Tuple[dict, bool]]): 

84 Two-tuple of: 

85 

86 * a mapping of fields. Any field that is present in this mapping 

87 must also be present in ``orders`` 

88 * a ``before`` flag 

89 

90 The fields and the flag combine to form a cursor used as 

91 an ending point in a query result set. If the ``before`` 

92 flag is :data:`True`, the results will end just before any 

93 documents which have fields matching the cursor, otherwise 

94 any matching documents will be included in the result set. 

95 When the query is formed, the document values 

96 will be used in the order given by ``orders``. 

97 all_descendants (Optional[bool]): 

98 When false, selects only collections that are immediate children 

99 of the `parent` specified in the containing `RunQueryRequest`. 

100 When true, selects all descendant collections. 

101 """ 

102 

103 def __init__( 

104 self, 

105 parent, 

106 projection=None, 

107 field_filters=(), 

108 orders=(), 

109 limit=None, 

110 limit_to_last=False, 

111 offset=None, 

112 start_at=None, 

113 end_at=None, 

114 all_descendants=False, 

115 recursive=False, 

116 ) -> None: 

117 super(Query, self).__init__( 

118 parent=parent, 

119 projection=projection, 

120 field_filters=field_filters, 

121 orders=orders, 

122 limit=limit, 

123 limit_to_last=limit_to_last, 

124 offset=offset, 

125 start_at=start_at, 

126 end_at=end_at, 

127 all_descendants=all_descendants, 

128 recursive=recursive, 

129 ) 

130 

131 def get( 

132 self, 

133 transaction=None, 

134 retry: retries.Retry = gapic_v1.method.DEFAULT, 

135 timeout: float = None, 

136 ) -> List[DocumentSnapshot]: 

137 """Read the documents in the collection that match this query. 

138 

139 This sends a ``RunQuery`` RPC and returns a list of documents 

140 returned in the stream of ``RunQueryResponse`` messages. 

141 

142 Args: 

143 transaction 

144 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

145 An existing transaction that this query will run in. 

146 If a ``transaction`` is used and it already has write operations 

147 added, this method cannot be used (i.e. read-after-write is not 

148 allowed). 

149 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

150 should be retried. Defaults to a system-specified policy. 

151 timeout (float): The timeout for this request. Defaults to a 

152 system-specified value. 

153 

154 Returns: 

155 list: The documents in the collection that match this query. 

156 """ 

157 is_limited_to_last = self._limit_to_last 

158 

159 if self._limit_to_last: 

160 # In order to fetch up to `self._limit` results from the end of the 

161 # query flip the defined ordering on the query to start from the 

162 # end, retrieving up to `self._limit` results from the backend. 

163 for order in self._orders: 

164 order.direction = _enum_from_direction( 

165 self.DESCENDING 

166 if order.direction.name == self.ASCENDING 

167 else self.ASCENDING 

168 ) 

169 self._limit_to_last = False 

170 

171 result = self.stream(transaction=transaction, retry=retry, timeout=timeout) 

172 if is_limited_to_last: 

173 result = reversed(list(result)) 

174 

175 return list(result) 

176 

177 def _chunkify( 

178 self, chunk_size: int 

179 ) -> Generator[List[DocumentSnapshot], None, None]: 

180 max_to_return: Optional[int] = self._limit 

181 num_returned: int = 0 

182 original: Query = self._copy() 

183 last_document: Optional[DocumentSnapshot] = None 

184 

185 while True: 

186 # Optionally trim the `chunk_size` down to honor a previously 

187 # applied limits as set by `self.limit()` 

188 _chunk_size: int = original._resolve_chunk_size(num_returned, chunk_size) 

189 

190 # Apply the optionally pruned limit and the cursor, if we are past 

191 # the first page. 

192 _q = original.limit(_chunk_size) 

193 

194 if last_document: 

195 _q = _q.start_after(last_document) 

196 

197 snapshots = _q.get() 

198 

199 if snapshots: 

200 last_document = snapshots[-1] 

201 

202 num_returned += len(snapshots) 

203 

204 yield snapshots 

205 

206 # Terminate the iterator if we have reached either of two end 

207 # conditions: 

208 # 1. There are no more documents, or 

209 # 2. We have reached the desired overall limit 

210 if len(snapshots) < _chunk_size or ( 

211 max_to_return and num_returned >= max_to_return 

212 ): 

213 return 

214 

215 def _get_stream_iterator(self, transaction, retry, timeout): 

216 """Helper method for :meth:`stream`.""" 

217 request, expected_prefix, kwargs = self._prep_stream( 

218 transaction, 

219 retry, 

220 timeout, 

221 ) 

222 

223 response_iterator = self._client._firestore_api.run_query( 

224 request=request, 

225 metadata=self._client._rpc_metadata, 

226 **kwargs, 

227 ) 

228 

229 return response_iterator, expected_prefix 

230 

231 def _retry_query_after_exception(self, exc, retry, transaction): 

232 """Helper method for :meth:`stream`.""" 

233 if transaction is None: # no snapshot-based retry inside transaction 

234 if retry is gapic_v1.method.DEFAULT: 

235 transport = self._client._firestore_api._transport 

236 gapic_callable = transport.run_query 

237 retry = gapic_callable._retry 

238 return retry._predicate(exc) 

239 

240 return False 

241 

242 def count( 

243 self, alias: str | None = None 

244 ) -> Type["firestore_v1.aggregation.AggregationQuery"]: 

245 """ 

246 Adds a count over the query. 

247 

248 :type alias: Optional[str] 

249 :param alias: Optional name of the field to store the result of the aggregation into. 

250 If not provided, Firestore will pick a default name following the format field_<incremental_id++>. 

251 """ 

252 return aggregation.AggregationQuery(self).count(alias=alias) 

253 

254 def sum( 

255 self, field_ref: str | FieldPath, alias: str | None = None 

256 ) -> Type["firestore_v1.aggregation.AggregationQuery"]: 

257 """ 

258 Adds a sum over the query. 

259 

260 :type field_ref: Union[str, google.cloud.firestore_v1.field_path.FieldPath] 

261 :param field_ref: The field to aggregate across. 

262 

263 :type alias: Optional[str] 

264 :param alias: Optional name of the field to store the result of the aggregation into. 

265 If not provided, Firestore will pick a default name following the format field_<incremental_id++>. 

266 """ 

267 return aggregation.AggregationQuery(self).sum(field_ref, alias=alias) 

268 

269 def avg( 

270 self, field_ref: str | FieldPath, alias: str | None = None 

271 ) -> Type["firestore_v1.aggregation.AggregationQuery"]: 

272 """ 

273 Adds an avg over the query. 

274 

275 :type field_ref: [Union[str, google.cloud.firestore_v1.field_path.FieldPath] 

276 :param field_ref: The field to aggregate across. 

277 

278 :type alias: Optional[str] 

279 :param alias: Optional name of the field to store the result of the aggregation into. 

280 If not provided, Firestore will pick a default name following the format field_<incremental_id++>. 

281 """ 

282 return aggregation.AggregationQuery(self).avg(field_ref, alias=alias) 

283 

284 def stream( 

285 self, 

286 transaction=None, 

287 retry: retries.Retry = gapic_v1.method.DEFAULT, 

288 timeout: float = None, 

289 ) -> Generator[document.DocumentSnapshot, Any, None]: 

290 """Read the documents in the collection that match this query. 

291 

292 This sends a ``RunQuery`` RPC and then returns an iterator which 

293 consumes each document returned in the stream of ``RunQueryResponse`` 

294 messages. 

295 

296 .. note:: 

297 

298 The underlying stream of responses will time out after 

299 the ``max_rpc_timeout_millis`` value set in the GAPIC 

300 client configuration for the ``RunQuery`` API. Snapshots 

301 not consumed from the iterator before that point will be lost. 

302 

303 If a ``transaction`` is used and it already has write operations 

304 added, this method cannot be used (i.e. read-after-write is not 

305 allowed). 

306 

307 Args: 

308 transaction 

309 (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): 

310 An existing transaction that this query will run in. 

311 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

312 should be retried. Defaults to a system-specified policy. 

313 timeout (float): The timeout for this request. Defaults to a 

314 system-specified value. 

315 

316 Yields: 

317 :class:`~google.cloud.firestore_v1.document.DocumentSnapshot`: 

318 The next document that fulfills the query. 

319 """ 

320 response_iterator, expected_prefix = self._get_stream_iterator( 

321 transaction, 

322 retry, 

323 timeout, 

324 ) 

325 

326 last_snapshot = None 

327 

328 while True: 

329 try: 

330 response = next(response_iterator, None) 

331 except exceptions.GoogleAPICallError as exc: 

332 if self._retry_query_after_exception(exc, retry, transaction): 

333 new_query = self.start_after(last_snapshot) 

334 response_iterator, _ = new_query._get_stream_iterator( 

335 transaction, 

336 retry, 

337 timeout, 

338 ) 

339 continue 

340 else: 

341 raise 

342 

343 if response is None: # EOI 

344 break 

345 

346 if self._all_descendants: 

347 snapshot = _collection_group_query_response_to_snapshot( 

348 response, self._parent 

349 ) 

350 else: 

351 snapshot = _query_response_to_snapshot( 

352 response, self._parent, expected_prefix 

353 ) 

354 if snapshot is not None: 

355 last_snapshot = snapshot 

356 yield snapshot 

357 

358 def on_snapshot(self, callback: Callable) -> Watch: 

359 """Monitor the documents in this collection that match this query. 

360 

361 This starts a watch on this query using a background thread. The 

362 provided callback is run on the snapshot of the documents. 

363 

364 Args: 

365 callback(Callable[[:class:`~google.cloud.firestore.query.QuerySnapshot`], NoneType]): 

366 a callback to run when a change occurs. 

367 

368 Example: 

369 

370 .. code-block:: python 

371 

372 from google.cloud import firestore_v1 

373 

374 db = firestore_v1.Client() 

375 query_ref = db.collection(u'users').where("user", "==", u'Ada') 

376 

377 def on_snapshot(docs, changes, read_time): 

378 for doc in docs: 

379 print(u'{} => {}'.format(doc.id, doc.to_dict())) 

380 

381 # Watch this query 

382 query_watch = query_ref.on_snapshot(on_snapshot) 

383 

384 # Terminate this watch 

385 query_watch.unsubscribe() 

386 """ 

387 return Watch.for_query(self, callback, document.DocumentSnapshot) 

388 

389 @staticmethod 

390 def _get_collection_reference_class() -> ( 

391 Type["firestore_v1.collection.CollectionReference"] 

392 ): 

393 from google.cloud.firestore_v1.collection import CollectionReference 

394 

395 return CollectionReference 

396 

397 

398class CollectionGroup(Query, BaseCollectionGroup): 

399 """Represents a Collection Group in the Firestore API. 

400 

401 This is a specialization of :class:`.Query` that includes all documents in the 

402 database that are contained in a collection or subcollection of the given 

403 parent. 

404 

405 Args: 

406 parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`): 

407 The collection that this query applies to. 

408 """ 

409 

410 def __init__( 

411 self, 

412 parent, 

413 projection=None, 

414 field_filters=(), 

415 orders=(), 

416 limit=None, 

417 limit_to_last=False, 

418 offset=None, 

419 start_at=None, 

420 end_at=None, 

421 all_descendants=True, 

422 recursive=False, 

423 ) -> None: 

424 super(CollectionGroup, self).__init__( 

425 parent=parent, 

426 projection=projection, 

427 field_filters=field_filters, 

428 orders=orders, 

429 limit=limit, 

430 limit_to_last=limit_to_last, 

431 offset=offset, 

432 start_at=start_at, 

433 end_at=end_at, 

434 all_descendants=all_descendants, 

435 recursive=recursive, 

436 ) 

437 

438 @staticmethod 

439 def _get_query_class(): 

440 return Query 

441 

442 def get_partitions( 

443 self, 

444 partition_count, 

445 retry: retries.Retry = gapic_v1.method.DEFAULT, 

446 timeout: float = None, 

447 ) -> Generator[QueryPartition, None, None]: 

448 """Partition a query for parallelization. 

449 

450 Partitions a query by returning partition cursors that can be used to run the 

451 query in parallel. The returned partition cursors are split points that can be 

452 used as starting/end points for the query results. 

453 

454 Args: 

455 partition_count (int): The desired maximum number of partition points. The 

456 number must be strictly positive. The actual number of partitions 

457 returned may be fewer. 

458 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

459 should be retried. Defaults to a system-specified policy. 

460 timeout (float): The timeout for this request. Defaults to a 

461 system-specified value. 

462 """ 

463 request, kwargs = self._prep_get_partitions(partition_count, retry, timeout) 

464 

465 pager = self._client._firestore_api.partition_query( 

466 request=request, 

467 metadata=self._client._rpc_metadata, 

468 **kwargs, 

469 ) 

470 

471 start_at = None 

472 for cursor_pb in pager: 

473 cursor = self._client.document(cursor_pb.values[0].reference_value) 

474 yield QueryPartition(self, start_at, cursor) 

475 start_at = cursor 

476 

477 yield QueryPartition(self, start_at, None)