Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/firestore_v1/base_client.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

188 statements  

1# Copyright 2017 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Client for interacting with the Google Cloud Firestore API. 

16 

17This is the base from which all interactions with the API occur. 

18 

19In the hierarchy of API concepts 

20 

21* a :class:`~google.cloud.firestore_v1.client.Client` owns a 

22 :class:`~google.cloud.firestore_v1.collection.CollectionReference` 

23* a :class:`~google.cloud.firestore_v1.client.Client` owns a 

24 :class:`~google.cloud.firestore_v1.document.DocumentReference` 

25""" 

26from __future__ import annotations 

27 

28import datetime 

29import os 

30from typing import ( 

31 Any, 

32 AsyncGenerator, 

33 Awaitable, 

34 Generator, 

35 Iterable, 

36 List, 

37 Optional, 

38 Tuple, 

39 Union, 

40 Type, 

41) 

42 

43import google.api_core.client_options 

44import google.api_core.path_template 

45import grpc # type: ignore 

46from google.api_core import retry as retries 

47from google.api_core.gapic_v1 import client_info 

48from google.auth.credentials import AnonymousCredentials 

49from google.cloud.client import ClientWithProject # type: ignore 

50 

51from google.cloud.firestore_v1 import __version__, _helpers, types 

52from google.cloud.firestore_v1.base_batch import BaseWriteBatch 

53 

54# Types needed only for Type Hints 

55from google.cloud.firestore_v1.base_collection import BaseCollectionReference 

56from google.cloud.firestore_v1.base_document import ( 

57 BaseDocumentReference, 

58 DocumentSnapshot, 

59) 

60from google.cloud.firestore_v1.base_query import BaseQuery 

61from google.cloud.firestore_v1.base_transaction import MAX_ATTEMPTS, BaseTransaction 

62from google.cloud.firestore_v1.bulk_writer import BulkWriter, BulkWriterOptions 

63from google.cloud.firestore_v1.field_path import render_field_path 

64from google.cloud.firestore_v1.services.firestore import client as firestore_client 

65from google.cloud.firestore_v1.pipeline_source import PipelineSource 

66from google.cloud.firestore_v1.base_pipeline import _BasePipeline 

67 

68DEFAULT_DATABASE = "(default)" 

69"""str: The default database used in a :class:`~google.cloud.firestore_v1.client.Client`.""" 

70_DEFAULT_EMULATOR_PROJECT = "google-cloud-firestore-emulator" 

71_BAD_OPTION_ERR = ( 

72 "Exactly one of ``last_update_time`` or ``exists`` " "must be provided." 

73) 

74_BAD_DOC_TEMPLATE: str = ( 

75 "Document {!r} appeared in response but was not present among references" 

76) 

77_ACTIVE_TXN: str = "There is already an active transaction." 

78_INACTIVE_TXN: str = "There is no active transaction." 

79_CLIENT_INFO: Any = client_info.ClientInfo(client_library_version=__version__) 

80_FIRESTORE_EMULATOR_HOST: str = "FIRESTORE_EMULATOR_HOST" 

81 

82 

83class BaseClient(ClientWithProject): 

84 """Client for interacting with Google Cloud Firestore API. 

85 

86 .. note:: 

87 

88 Since the Cloud Firestore API requires the gRPC transport, no 

89 ``_http`` argument is accepted by this class. 

90 

91 Args: 

92 project (Optional[str]): The project which the client acts on behalf 

93 of. If not passed, falls back to the default inferred 

94 from the environment. 

95 credentials (Optional[~google.auth.credentials.Credentials]): The 

96 OAuth2 Credentials to use for this client. If not passed, falls 

97 back to the default inferred from the environment. 

98 database (Optional[str]): The database name that the client targets. 

99 For now, :attr:`DEFAULT_DATABASE` (the default value) is the 

100 only valid database. 

101 client_info (Optional[google.api_core.gapic_v1.client_info.ClientInfo]): 

102 The client info used to send a user-agent string along with API 

103 requests. If ``None``, then default info will be used. Generally, 

104 you only need to set this if you're developing your own library 

105 or partner tool. 

106 client_options (Union[dict, google.api_core.client_options.ClientOptions]): 

107 Client options used to set user options on the client. API Endpoint 

108 should be set through client_options. 

109 """ 

110 

111 SCOPE = ( 

112 "https://www.googleapis.com/auth/cloud-platform", 

113 "https://www.googleapis.com/auth/datastore", 

114 ) 

115 """The scopes required for authenticating with the Firestore service.""" 

116 

117 _firestore_api_internal = None 

118 _database_string_internal = None 

119 _rpc_metadata_internal = None 

120 

121 def __init__( 

122 self, 

123 project=None, 

124 credentials=None, 

125 database=None, 

126 client_info=_CLIENT_INFO, 

127 client_options=None, 

128 ) -> None: 

129 database = database or DEFAULT_DATABASE 

130 # NOTE: This API has no use for the _http argument, but sending it 

131 # will have no impact since the _http() @property only lazily 

132 # creates a working HTTP object. 

133 self._emulator_host = os.getenv(_FIRESTORE_EMULATOR_HOST) 

134 

135 if self._emulator_host is not None: 

136 if credentials is None: 

137 credentials = AnonymousCredentials() 

138 if project is None: 

139 # extract project from env var, or use system default 

140 project = ( 

141 os.getenv("GOOGLE_CLOUD_PROJECT") 

142 or os.getenv("GCLOUD_PROJECT") 

143 or _DEFAULT_EMULATOR_PROJECT 

144 ) 

145 

146 super(BaseClient, self).__init__( 

147 project=project, 

148 credentials=credentials, 

149 client_options=client_options, 

150 _http=None, 

151 ) 

152 self._client_info = client_info 

153 if client_options: 

154 if isinstance(client_options, dict): 

155 client_options = google.api_core.client_options.from_dict( 

156 client_options 

157 ) 

158 self._client_options = client_options 

159 

160 self._database = database 

161 

162 def _firestore_api_helper(self, transport, client_class, client_module) -> Any: 

163 """Lazy-loading getter GAPIC Firestore API. 

164 Returns: 

165 The GAPIC client with the credentials of the current client. 

166 """ 

167 if self._firestore_api_internal is None: 

168 # Use a custom channel. 

169 # We need this in order to set appropriate keepalive options. 

170 

171 if self._emulator_host is not None: 

172 channel = self._emulator_channel(transport) 

173 else: 

174 channel = transport.create_channel( 

175 self._target, 

176 credentials=self._credentials, 

177 options={"grpc.keepalive_time_ms": 30000}.items(), 

178 ) 

179 

180 self._transport = transport(host=self._target, channel=channel) 

181 

182 self._firestore_api_internal = client_class( 

183 transport=self._transport, client_options=self._client_options 

184 ) 

185 client_module._client_info = self._client_info 

186 

187 return self._firestore_api_internal 

188 

189 def _emulator_channel(self, transport): 

190 """ 

191 Creates an insecure channel to communicate with the local emulator. 

192 If credentials are provided the token is extracted and added to the 

193 headers. This supports local testing of firestore rules if the credentials 

194 have been created from a signed custom token. 

195 

196 :return: grpc.Channel or grpc.aio.Channel 

197 """ 

198 # Insecure channels are used for the emulator as secure channels 

199 # cannot be used to communicate on some environments. 

200 # https://github.com/googleapis/python-firestore/issues/359 

201 # Default the token to a non-empty string, in this case "owner". 

202 token = "owner" 

203 if ( 

204 self._credentials is not None 

205 and getattr(self._credentials, "id_token", None) is not None 

206 ): 

207 token = self._credentials.id_token 

208 options = [("Authorization", f"Bearer {token}")] 

209 

210 if "GrpcAsyncIOTransport" in str(transport.__name__): 

211 return grpc.aio.insecure_channel(self._emulator_host, options=options) 

212 else: 

213 return grpc.insecure_channel(self._emulator_host, options=options) 

214 

215 def _target_helper(self, client_class) -> str: 

216 """Return the target (where the API is). 

217 Eg. "firestore.googleapis.com" 

218 

219 Returns: 

220 str: The location of the API. 

221 """ 

222 if self._emulator_host is not None: 

223 return self._emulator_host 

224 elif self._client_options and self._client_options.api_endpoint: 

225 return self._client_options.api_endpoint 

226 else: 

227 return client_class.DEFAULT_ENDPOINT 

228 

229 @property 

230 def _target(self): 

231 """Return the target (where the API is). 

232 Eg. "firestore.googleapis.com" 

233 

234 Returns: 

235 str: The location of the API. 

236 """ 

237 return self._target_helper(firestore_client.FirestoreClient) 

238 

239 @property 

240 def _database_string(self): 

241 """The database string corresponding to this client's project. 

242 

243 This value is lazy-loaded and cached. 

244 

245 Will be of the form 

246 

247 ``projects/{project_id}/databases/{database_id}`` 

248 

249 but ``database_id == '(default)'`` for the time being. 

250 

251 Returns: 

252 str: The fully-qualified database string for the current 

253 project. (The default database is also in this string.) 

254 """ 

255 if self._database_string_internal is None: 

256 db_str = google.api_core.path_template.expand( 

257 "projects/{project}/databases/{database}", 

258 project=self.project, 

259 database=self._database, 

260 ) 

261 

262 self._database_string_internal = db_str 

263 

264 return self._database_string_internal 

265 

266 @property 

267 def _rpc_metadata(self): 

268 """The RPC metadata for this client's associated database. 

269 

270 Returns: 

271 Sequence[Tuple(str, str)]: RPC metadata with resource prefix 

272 for the database associated with this client. 

273 """ 

274 if self._rpc_metadata_internal is None: 

275 self._rpc_metadata_internal = _helpers.metadata_with_prefix( 

276 self._database_string 

277 ) 

278 

279 if self._emulator_host is not None: 

280 # The emulator requires additional metadata to be set. 

281 self._rpc_metadata_internal.append(("authorization", "Bearer owner")) 

282 

283 return self._rpc_metadata_internal 

284 

285 def collection(self, *collection_path) -> BaseCollectionReference: 

286 raise NotImplementedError 

287 

288 def collection_group(self, collection_id: str) -> BaseQuery: 

289 raise NotImplementedError 

290 

291 def _get_collection_reference( 

292 self, collection_id: str 

293 ) -> BaseCollectionReference[BaseQuery]: 

294 """Checks validity of collection_id and then uses subclasses collection implementation. 

295 

296 Args: 

297 collection_id (str) Identifies the collections to query over. 

298 

299 Every collection or subcollection with this ID as the last segment of its 

300 path will be included. Cannot contain a slash. 

301 

302 Returns: 

303 The created collection. 

304 """ 

305 if "/" in collection_id: 

306 raise ValueError( 

307 "Invalid collection_id " 

308 + collection_id 

309 + ". Collection IDs must not contain '/'." 

310 ) 

311 

312 return self.collection(collection_id) 

313 

314 def document(self, *document_path) -> BaseDocumentReference: 

315 raise NotImplementedError 

316 

317 def bulk_writer(self, options: Optional[BulkWriterOptions] = None) -> BulkWriter: 

318 """Get a BulkWriter instance from this client. 

319 

320 Args: 

321 :class:`@google.cloud.firestore_v1.bulk_writer.BulkWriterOptions`: 

322 Optional control parameters for the 

323 :class:`@google.cloud.firestore_v1.bulk_writer.BulkWriter` returned. 

324 

325 Returns: 

326 :class:`@google.cloud.firestore_v1.bulk_writer.BulkWriter`: 

327 A utility to efficiently create and save many `WriteBatch` instances 

328 to the server. 

329 """ 

330 return BulkWriter(client=self, options=options) 

331 

332 def _document_path_helper(self, *document_path) -> List[str]: 

333 """Standardize the format of path to tuple of path segments and strip the database string from path if present. 

334 

335 Args: 

336 document_path (Tuple[str, ...]): Can either be 

337 

338 * A single ``/``-delimited path to a document 

339 * A tuple of document path segments 

340 """ 

341 path = _path_helper(document_path) 

342 base_path = self._database_string + "/documents/" 

343 joined_path = _helpers.DOCUMENT_PATH_DELIMITER.join(path) 

344 if joined_path.startswith(base_path): 

345 joined_path = joined_path[len(base_path) :] 

346 return joined_path.split(_helpers.DOCUMENT_PATH_DELIMITER) 

347 

348 def recursive_delete( 

349 self, 

350 reference, 

351 *, 

352 bulk_writer: Optional["BulkWriter"] = None, 

353 chunk_size: int = 5000, 

354 ) -> int | Awaitable[int]: 

355 raise NotImplementedError 

356 

357 @staticmethod 

358 def field_path(*field_names: str) -> str: 

359 """Create a **field path** from a list of nested field names. 

360 

361 A **field path** is a ``.``-delimited concatenation of the field 

362 names. It is used to represent a nested field. For example, 

363 in the data 

364 

365 .. code-block:: python 

366 

367 data = { 

368 'aa': { 

369 'bb': { 

370 'cc': 10, 

371 }, 

372 }, 

373 } 

374 

375 the field path ``'aa.bb.cc'`` represents the data stored in 

376 ``data['aa']['bb']['cc']``. 

377 

378 Args: 

379 field_names: The list of field names. 

380 

381 Returns: 

382 str: The ``.``-delimited field path. 

383 """ 

384 return render_field_path(field_names) 

385 

386 @staticmethod 

387 def write_option( 

388 **kwargs, 

389 ) -> Union[_helpers.ExistsOption, _helpers.LastUpdateOption]: 

390 """Create a write option for write operations. 

391 

392 Write operations include :meth:`~google.cloud.DocumentReference.set`, 

393 :meth:`~google.cloud.DocumentReference.update` and 

394 :meth:`~google.cloud.DocumentReference.delete`. 

395 

396 One of the following keyword arguments must be provided: 

397 

398 * ``last_update_time`` (:class:`google.protobuf.timestamp_pb2.\ 

399 Timestamp`): A timestamp. When set, the target document must 

400 exist and have been last updated at that time. Protobuf 

401 ``update_time`` timestamps are typically returned from methods 

402 that perform write operations as part of a "write result" 

403 protobuf or directly. 

404 * ``exists`` (:class:`bool`): Indicates if the document being modified 

405 should already exist. 

406 

407 Providing no argument would make the option have no effect (so 

408 it is not allowed). Providing multiple would be an apparent 

409 contradiction, since ``last_update_time`` assumes that the 

410 document **was** updated (it can't have been updated if it 

411 doesn't exist) and ``exists`` indicate that it is unknown if the 

412 document exists or not. 

413 

414 Args: 

415 kwargs (Dict[str, Any]): The keyword arguments described above. 

416 

417 Raises: 

418 TypeError: If anything other than exactly one argument is 

419 provided by the caller. 

420 

421 Returns: 

422 :class:`~google.cloud.firestore_v1.client.WriteOption`: 

423 The option to be used to configure a write message. 

424 """ 

425 if len(kwargs) != 1: 

426 raise TypeError(_BAD_OPTION_ERR) 

427 

428 name, value = kwargs.popitem() 

429 if name == "last_update_time": 

430 return _helpers.LastUpdateOption(value) 

431 elif name == "exists": 

432 return _helpers.ExistsOption(value) 

433 else: 

434 extra = "{!r} was provided".format(name) 

435 raise TypeError(_BAD_OPTION_ERR, extra) 

436 

437 def _prep_get_all( 

438 self, 

439 references: list, 

440 field_paths: Iterable[str] | None = None, 

441 transaction: BaseTransaction | None = None, 

442 retry: retries.Retry | retries.AsyncRetry | object | None = None, 

443 timeout: float | None = None, 

444 read_time: datetime.datetime | None = None, 

445 ) -> Tuple[dict, dict, dict]: 

446 """Shared setup for async/sync :meth:`get_all`.""" 

447 document_paths, reference_map = _reference_info(references) 

448 mask = _get_doc_mask(field_paths) 

449 request = { 

450 "database": self._database_string, 

451 "documents": document_paths, 

452 "mask": mask, 

453 "transaction": _helpers.get_transaction_id(transaction), 

454 } 

455 if read_time is not None: 

456 request["read_time"] = read_time 

457 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout) 

458 

459 return request, reference_map, kwargs 

460 

461 def get_all( 

462 self, 

463 references: list, 

464 field_paths: Iterable[str] | None = None, 

465 transaction=None, 

466 retry: retries.Retry | retries.AsyncRetry | object | None = None, 

467 timeout: float | None = None, 

468 *, 

469 read_time: datetime.datetime | None = None, 

470 ) -> Union[ 

471 AsyncGenerator[DocumentSnapshot, Any], Generator[DocumentSnapshot, Any, Any] 

472 ]: 

473 raise NotImplementedError 

474 

475 def _prep_collections( 

476 self, 

477 retry: retries.Retry | retries.AsyncRetry | object | None = None, 

478 timeout: float | None = None, 

479 read_time: datetime.datetime | None = None, 

480 ) -> Tuple[dict, dict]: 

481 """Shared setup for async/sync :meth:`collections`.""" 

482 request: dict[str, Any] = { 

483 "parent": "{}/documents".format(self._database_string), 

484 } 

485 if read_time is not None: 

486 request["read_time"] = read_time 

487 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout) 

488 

489 return request, kwargs 

490 

491 def collections( 

492 self, 

493 retry: retries.Retry | retries.AsyncRetry | object | None = None, 

494 timeout: float | None = None, 

495 *, 

496 read_time: datetime.datetime | None = None, 

497 ): 

498 raise NotImplementedError 

499 

500 def batch(self) -> BaseWriteBatch: 

501 raise NotImplementedError 

502 

503 def transaction( 

504 self, max_attempts: int = MAX_ATTEMPTS, read_only: bool = False 

505 ) -> BaseTransaction: 

506 raise NotImplementedError 

507 

508 def pipeline(self) -> PipelineSource: 

509 """ 

510 Start a pipeline with this client. 

511 

512 Returns: 

513 :class:`~google.cloud.firestore_v1.pipeline_source.PipelineSource`: 

514 A pipeline that uses this client` 

515 """ 

516 raise NotImplementedError 

517 

518 @property 

519 def _pipeline_cls(self) -> Type["_BasePipeline"]: 

520 raise NotImplementedError 

521 

522 

523def _reference_info(references: list) -> Tuple[list, dict]: 

524 """Get information about document references. 

525 

526 Helper for :meth:`~google.cloud.firestore_v1.client.Client.get_all`. 

527 

528 Args: 

529 references (List[.DocumentReference, ...]): Iterable of document 

530 references. 

531 

532 Returns: 

533 Tuple[List[str, ...], Dict[str, .DocumentReference]]: A two-tuple of 

534 

535 * fully-qualified documents paths for each reference in ``references`` 

536 * a mapping from the paths to the original reference. (If multiple 

537 ``references`` contains multiple references to the same document, 

538 that key will be overwritten in the result.) 

539 """ 

540 document_paths = [] 

541 reference_map = {} 

542 for reference in references: 

543 doc_path = reference._document_path 

544 document_paths.append(doc_path) 

545 reference_map[doc_path] = reference 

546 

547 return document_paths, reference_map 

548 

549 

550def _get_reference(document_path: str, reference_map: dict) -> BaseDocumentReference: 

551 """Get a document reference from a dictionary. 

552 

553 This just wraps a simple dictionary look-up with a helpful error that is 

554 specific to :meth:`~google.cloud.firestore.client.Client.get_all`, the 

555 **public** caller of this function. 

556 

557 Args: 

558 document_path (str): A fully-qualified document path. 

559 reference_map (Dict[str, .DocumentReference]): A mapping (produced 

560 by :func:`_reference_info`) of fully-qualified document paths to 

561 document references. 

562 

563 Returns: 

564 .DocumentReference: The matching reference. 

565 

566 Raises: 

567 ValueError: If ``document_path`` has not been encountered. 

568 """ 

569 try: 

570 return reference_map[document_path] 

571 except KeyError: 

572 msg = _BAD_DOC_TEMPLATE.format(document_path) 

573 raise ValueError(msg) 

574 

575 

576def _parse_batch_get( 

577 get_doc_response: types.BatchGetDocumentsResponse, 

578 reference_map: dict, 

579 client: BaseClient, 

580) -> DocumentSnapshot: 

581 """Parse a `BatchGetDocumentsResponse` protobuf. 

582 

583 Args: 

584 get_doc_response (~google.cloud.firestore_v1.\ 

585 firestore.BatchGetDocumentsResponse): A single response (from 

586 a stream) containing the "get" response for a document. 

587 reference_map (Dict[str, .DocumentReference]): A mapping (produced 

588 by :func:`_reference_info`) of fully-qualified document paths to 

589 document references. 

590 client (:class:`~google.cloud.firestore_v1.client.Client`): 

591 A client that has a document factory. 

592 

593 Returns: 

594 [.DocumentSnapshot]: The retrieved snapshot. 

595 

596 Raises: 

597 ValueError: If the response has a ``result`` field (a oneof) other 

598 than ``found`` or ``missing``. 

599 """ 

600 result_type = get_doc_response._pb.WhichOneof("result") 

601 if result_type == "found": 

602 reference = _get_reference(get_doc_response.found.name, reference_map) 

603 data = _helpers.decode_dict(get_doc_response.found.fields, client) 

604 snapshot = DocumentSnapshot( 

605 reference, 

606 data, 

607 exists=True, 

608 read_time=get_doc_response.read_time, 

609 create_time=get_doc_response.found.create_time, 

610 update_time=get_doc_response.found.update_time, 

611 ) 

612 elif result_type == "missing": 

613 reference = _get_reference(get_doc_response.missing, reference_map) 

614 snapshot = DocumentSnapshot( 

615 reference, 

616 None, 

617 exists=False, 

618 read_time=get_doc_response.read_time, 

619 create_time=None, 

620 update_time=None, 

621 ) 

622 else: 

623 raise ValueError( 

624 "`BatchGetDocumentsResponse.result` (a oneof) had a field other " 

625 "than `found` or `missing` set, or was unset" 

626 ) 

627 return snapshot 

628 

629 

630def _get_doc_mask( 

631 field_paths: Iterable[str] | None, 

632) -> Optional[types.common.DocumentMask]: 

633 """Get a document mask if field paths are provided. 

634 

635 Args: 

636 field_paths (Optional[Iterable[str, ...]]): An iterable of field 

637 paths (``.``-delimited list of field names) to use as a 

638 projection of document fields in the returned results. 

639 

640 Returns: 

641 Optional[google.cloud.firestore_v1.types.common.DocumentMask]: A mask 

642 to project documents to a restricted set of field paths. 

643 """ 

644 if field_paths is None: 

645 return None 

646 else: 

647 return types.DocumentMask(field_paths=field_paths) 

648 

649 

650def _path_helper(path: tuple) -> Tuple[str]: 

651 """Standardize path into a tuple of path segments. 

652 

653 Args: 

654 path (Tuple[str, ...]): Can either be 

655 

656 * A single ``/``-delimited path 

657 * A tuple of path segments 

658 """ 

659 if len(path) == 1: 

660 return path[0].split(_helpers.DOCUMENT_PATH_DELIMITER) 

661 else: 

662 return path