Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/firestore_v1/base_client.py: 43%

171 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-09 06:27 +0000

1# Copyright 2017 Google LLC All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Client for interacting with the Google Cloud Firestore API. 

16 

17This is the base from which all interactions with the API occur. 

18 

19In the hierarchy of API concepts 

20 

21* a :class:`~google.cloud.firestore_v1.client.Client` owns a 

22 :class:`~google.cloud.firestore_v1.collection.CollectionReference` 

23* a :class:`~google.cloud.firestore_v1.client.Client` owns a 

24 :class:`~google.cloud.firestore_v1.document.DocumentReference` 

25""" 

26 

27import os 

28import grpc # type: ignore 

29 

30from google.auth.credentials import AnonymousCredentials 

31import google.api_core.client_options 

32import google.api_core.path_template 

33from google.api_core import retry as retries 

34from google.api_core.gapic_v1 import client_info 

35from google.cloud.client import ClientWithProject # type: ignore 

36 

37from google.cloud.firestore_v1 import _helpers 

38from google.cloud.firestore_v1 import __version__ 

39from google.cloud.firestore_v1 import types 

40from google.cloud.firestore_v1.base_document import DocumentSnapshot 

41 

42from google.cloud.firestore_v1.field_path import render_field_path 

43from google.cloud.firestore_v1.bulk_writer import BulkWriter, BulkWriterOptions 

44from typing import ( 

45 Any, 

46 AsyncGenerator, 

47 Generator, 

48 Iterable, 

49 List, 

50 Optional, 

51 Tuple, 

52 Union, 

53) 

54 

55# Types needed only for Type Hints 

56from google.cloud.firestore_v1.base_collection import BaseCollectionReference 

57from google.cloud.firestore_v1.base_document import BaseDocumentReference 

58from google.cloud.firestore_v1.base_transaction import BaseTransaction 

59from google.cloud.firestore_v1.base_batch import BaseWriteBatch 

60from google.cloud.firestore_v1.base_query import BaseQuery 

61 

62 

63DEFAULT_DATABASE = "(default)" 

64"""str: The default database used in a :class:`~google.cloud.firestore_v1.client.Client`.""" 

65_DEFAULT_EMULATOR_PROJECT = "google-cloud-firestore-emulator" 

66_BAD_OPTION_ERR = ( 

67 "Exactly one of ``last_update_time`` or ``exists`` " "must be provided." 

68) 

69_BAD_DOC_TEMPLATE: str = ( 

70 "Document {!r} appeared in response but was not present among references" 

71) 

72_ACTIVE_TXN: str = "There is already an active transaction." 

73_INACTIVE_TXN: str = "There is no active transaction." 

74_CLIENT_INFO: Any = client_info.ClientInfo(client_library_version=__version__) 

75_FIRESTORE_EMULATOR_HOST: str = "FIRESTORE_EMULATOR_HOST" 

76 

77 

78class BaseClient(ClientWithProject): 

79 """Client for interacting with Google Cloud Firestore API. 

80 

81 .. note:: 

82 

83 Since the Cloud Firestore API requires the gRPC transport, no 

84 ``_http`` argument is accepted by this class. 

85 

86 Args: 

87 project (Optional[str]): The project which the client acts on behalf 

88 of. If not passed, falls back to the default inferred 

89 from the environment. 

90 credentials (Optional[~google.auth.credentials.Credentials]): The 

91 OAuth2 Credentials to use for this client. If not passed, falls 

92 back to the default inferred from the environment. 

93 database (Optional[str]): The database name that the client targets. 

94 For now, :attr:`DEFAULT_DATABASE` (the default value) is the 

95 only valid database. 

96 client_info (Optional[google.api_core.gapic_v1.client_info.ClientInfo]): 

97 The client info used to send a user-agent string along with API 

98 requests. If ``None``, then default info will be used. Generally, 

99 you only need to set this if you're developing your own library 

100 or partner tool. 

101 client_options (Union[dict, google.api_core.client_options.ClientOptions]): 

102 Client options used to set user options on the client. API Endpoint 

103 should be set through client_options. 

104 """ 

105 

106 SCOPE = ( 

107 "https://www.googleapis.com/auth/cloud-platform", 

108 "https://www.googleapis.com/auth/datastore", 

109 ) 

110 """The scopes required for authenticating with the Firestore service.""" 

111 

112 _firestore_api_internal = None 

113 _database_string_internal = None 

114 _rpc_metadata_internal = None 

115 

116 def __init__( 

117 self, 

118 project=None, 

119 credentials=None, 

120 database=None, 

121 client_info=_CLIENT_INFO, 

122 client_options=None, 

123 ) -> None: 

124 database = database or DEFAULT_DATABASE 

125 # NOTE: This API has no use for the _http argument, but sending it 

126 # will have no impact since the _http() @property only lazily 

127 # creates a working HTTP object. 

128 self._emulator_host = os.getenv(_FIRESTORE_EMULATOR_HOST) 

129 

130 if self._emulator_host is not None: 

131 if credentials is None: 

132 credentials = AnonymousCredentials() 

133 if project is None: 

134 project = _DEFAULT_EMULATOR_PROJECT 

135 

136 super(BaseClient, self).__init__( 

137 project=project, 

138 credentials=credentials, 

139 client_options=client_options, 

140 _http=None, 

141 ) 

142 self._client_info = client_info 

143 if client_options: 

144 if isinstance(client_options, dict): 

145 client_options = google.api_core.client_options.from_dict( 

146 client_options 

147 ) 

148 self._client_options = client_options 

149 

150 self._database = database 

151 

152 def _firestore_api_helper(self, transport, client_class, client_module) -> Any: 

153 """Lazy-loading getter GAPIC Firestore API. 

154 Returns: 

155 The GAPIC client with the credentials of the current client. 

156 """ 

157 if self._firestore_api_internal is None: 

158 # Use a custom channel. 

159 # We need this in order to set appropriate keepalive options. 

160 

161 if self._emulator_host is not None: 

162 channel = self._emulator_channel(transport) 

163 else: 

164 channel = transport.create_channel( 

165 self._target, 

166 credentials=self._credentials, 

167 options={"grpc.keepalive_time_ms": 30000}.items(), 

168 ) 

169 

170 self._transport = transport(host=self._target, channel=channel) 

171 

172 self._firestore_api_internal = client_class( 

173 transport=self._transport, client_options=self._client_options 

174 ) 

175 client_module._client_info = self._client_info 

176 

177 return self._firestore_api_internal 

178 

179 def _emulator_channel(self, transport): 

180 """ 

181 Creates an insecure channel to communicate with the local emulator. 

182 If credentials are provided the token is extracted and added to the 

183 headers. This supports local testing of firestore rules if the credentials 

184 have been created from a signed custom token. 

185 

186 :return: grpc.Channel or grpc.aio.Channel 

187 """ 

188 # Insecure channels are used for the emulator as secure channels 

189 # cannot be used to communicate on some environments. 

190 # https://github.com/googleapis/python-firestore/issues/359 

191 # Default the token to a non-empty string, in this case "owner". 

192 token = "owner" 

193 if ( 

194 self._credentials is not None 

195 and getattr(self._credentials, "id_token", None) is not None 

196 ): 

197 token = self._credentials.id_token 

198 options = [("Authorization", f"Bearer {token}")] 

199 

200 if "GrpcAsyncIOTransport" in str(transport.__name__): 

201 return grpc.aio.insecure_channel(self._emulator_host, options=options) 

202 else: 

203 return grpc.insecure_channel(self._emulator_host, options=options) 

204 

205 def _target_helper(self, client_class) -> str: 

206 """Return the target (where the API is). 

207 Eg. "firestore.googleapis.com" 

208 

209 Returns: 

210 str: The location of the API. 

211 """ 

212 if self._emulator_host is not None: 

213 return self._emulator_host 

214 elif self._client_options and self._client_options.api_endpoint: 

215 return self._client_options.api_endpoint 

216 else: 

217 return client_class.DEFAULT_ENDPOINT 

218 

219 @property 

220 def _database_string(self): 

221 """The database string corresponding to this client's project. 

222 

223 This value is lazy-loaded and cached. 

224 

225 Will be of the form 

226 

227 ``projects/{project_id}/databases/{database_id}`` 

228 

229 but ``database_id == '(default)'`` for the time being. 

230 

231 Returns: 

232 str: The fully-qualified database string for the current 

233 project. (The default database is also in this string.) 

234 """ 

235 if self._database_string_internal is None: 

236 db_str = google.api_core.path_template.expand( 

237 "projects/{project}/databases/{database}", 

238 project=self.project, 

239 database=self._database, 

240 ) 

241 

242 self._database_string_internal = db_str 

243 

244 return self._database_string_internal 

245 

246 @property 

247 def _rpc_metadata(self): 

248 """The RPC metadata for this client's associated database. 

249 

250 Returns: 

251 Sequence[Tuple(str, str)]: RPC metadata with resource prefix 

252 for the database associated with this client. 

253 """ 

254 if self._rpc_metadata_internal is None: 

255 self._rpc_metadata_internal = _helpers.metadata_with_prefix( 

256 self._database_string 

257 ) 

258 

259 if self._emulator_host is not None: 

260 # The emulator requires additional metadata to be set. 

261 self._rpc_metadata_internal.append(("authorization", "Bearer owner")) 

262 

263 return self._rpc_metadata_internal 

264 

265 def collection(self, *collection_path) -> BaseCollectionReference[BaseQuery]: 

266 raise NotImplementedError 

267 

268 def collection_group(self, collection_id: str) -> BaseQuery: 

269 raise NotImplementedError 

270 

271 def _get_collection_reference( 

272 self, collection_id: str 

273 ) -> BaseCollectionReference[BaseQuery]: 

274 """Checks validity of collection_id and then uses subclasses collection implementation. 

275 

276 Args: 

277 collection_id (str) Identifies the collections to query over. 

278 

279 Every collection or subcollection with this ID as the last segment of its 

280 path will be included. Cannot contain a slash. 

281 

282 Returns: 

283 The created collection. 

284 """ 

285 if "/" in collection_id: 

286 raise ValueError( 

287 "Invalid collection_id " 

288 + collection_id 

289 + ". Collection IDs must not contain '/'." 

290 ) 

291 

292 return self.collection(collection_id) 

293 

294 def document(self, *document_path) -> BaseDocumentReference: 

295 raise NotImplementedError 

296 

297 def bulk_writer(self, options: Optional[BulkWriterOptions] = None) -> BulkWriter: 

298 """Get a BulkWriter instance from this client. 

299 

300 Args: 

301 :class:`@google.cloud.firestore_v1.bulk_writer.BulkWriterOptions`: 

302 Optional control parameters for the 

303 :class:`@google.cloud.firestore_v1.bulk_writer.BulkWriter` returned. 

304 

305 Returns: 

306 :class:`@google.cloud.firestore_v1.bulk_writer.BulkWriter`: 

307 A utility to efficiently create and save many `WriteBatch` instances 

308 to the server. 

309 """ 

310 return BulkWriter(client=self, options=options) 

311 

312 def _document_path_helper(self, *document_path) -> List[str]: 

313 """Standardize the format of path to tuple of path segments and strip the database string from path if present. 

314 

315 Args: 

316 document_path (Tuple[str, ...]): Can either be 

317 

318 * A single ``/``-delimited path to a document 

319 * A tuple of document path segments 

320 """ 

321 path = _path_helper(document_path) 

322 base_path = self._database_string + "/documents/" 

323 joined_path = _helpers.DOCUMENT_PATH_DELIMITER.join(path) 

324 if joined_path.startswith(base_path): 

325 joined_path = joined_path[len(base_path) :] 

326 return joined_path.split(_helpers.DOCUMENT_PATH_DELIMITER) 

327 

328 def recursive_delete( 

329 self, 

330 reference: Union[BaseCollectionReference[BaseQuery], BaseDocumentReference], 

331 bulk_writer: Optional["BulkWriter"] = None, # type: ignore 

332 ) -> int: 

333 raise NotImplementedError 

334 

335 @staticmethod 

336 def field_path(*field_names: str) -> str: 

337 """Create a **field path** from a list of nested field names. 

338 

339 A **field path** is a ``.``-delimited concatenation of the field 

340 names. It is used to represent a nested field. For example, 

341 in the data 

342 

343 .. code-block:: python 

344 

345 data = { 

346 'aa': { 

347 'bb': { 

348 'cc': 10, 

349 }, 

350 }, 

351 } 

352 

353 the field path ``'aa.bb.cc'`` represents the data stored in 

354 ``data['aa']['bb']['cc']``. 

355 

356 Args: 

357 field_names: The list of field names. 

358 

359 Returns: 

360 str: The ``.``-delimited field path. 

361 """ 

362 return render_field_path(field_names) 

363 

364 @staticmethod 

365 def write_option( 

366 **kwargs, 

367 ) -> Union[_helpers.ExistsOption, _helpers.LastUpdateOption]: 

368 """Create a write option for write operations. 

369 

370 Write operations include :meth:`~google.cloud.DocumentReference.set`, 

371 :meth:`~google.cloud.DocumentReference.update` and 

372 :meth:`~google.cloud.DocumentReference.delete`. 

373 

374 One of the following keyword arguments must be provided: 

375 

376 * ``last_update_time`` (:class:`google.protobuf.timestamp_pb2.\ 

377 Timestamp`): A timestamp. When set, the target document must 

378 exist and have been last updated at that time. Protobuf 

379 ``update_time`` timestamps are typically returned from methods 

380 that perform write operations as part of a "write result" 

381 protobuf or directly. 

382 * ``exists`` (:class:`bool`): Indicates if the document being modified 

383 should already exist. 

384 

385 Providing no argument would make the option have no effect (so 

386 it is not allowed). Providing multiple would be an apparent 

387 contradiction, since ``last_update_time`` assumes that the 

388 document **was** updated (it can't have been updated if it 

389 doesn't exist) and ``exists`` indicate that it is unknown if the 

390 document exists or not. 

391 

392 Args: 

393 kwargs (Dict[str, Any]): The keyword arguments described above. 

394 

395 Raises: 

396 TypeError: If anything other than exactly one argument is 

397 provided by the caller. 

398 

399 Returns: 

400 :class:`~google.cloud.firestore_v1.client.WriteOption`: 

401 The option to be used to configure a write message. 

402 """ 

403 if len(kwargs) != 1: 

404 raise TypeError(_BAD_OPTION_ERR) 

405 

406 name, value = kwargs.popitem() 

407 if name == "last_update_time": 

408 return _helpers.LastUpdateOption(value) 

409 elif name == "exists": 

410 return _helpers.ExistsOption(value) 

411 else: 

412 extra = "{!r} was provided".format(name) 

413 raise TypeError(_BAD_OPTION_ERR, extra) 

414 

415 def _prep_get_all( 

416 self, 

417 references: list, 

418 field_paths: Iterable[str] = None, 

419 transaction: BaseTransaction = None, 

420 retry: retries.Retry = None, 

421 timeout: float = None, 

422 ) -> Tuple[dict, dict, dict]: 

423 """Shared setup for async/sync :meth:`get_all`.""" 

424 document_paths, reference_map = _reference_info(references) 

425 mask = _get_doc_mask(field_paths) 

426 request = { 

427 "database": self._database_string, 

428 "documents": document_paths, 

429 "mask": mask, 

430 "transaction": _helpers.get_transaction_id(transaction), 

431 } 

432 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout) 

433 

434 return request, reference_map, kwargs 

435 

436 def get_all( 

437 self, 

438 references: list, 

439 field_paths: Iterable[str] = None, 

440 transaction: BaseTransaction = None, 

441 retry: retries.Retry = None, 

442 timeout: float = None, 

443 ) -> Union[ 

444 AsyncGenerator[DocumentSnapshot, Any], Generator[DocumentSnapshot, Any, Any] 

445 ]: 

446 raise NotImplementedError 

447 

448 def _prep_collections( 

449 self, 

450 retry: retries.Retry = None, 

451 timeout: float = None, 

452 ) -> Tuple[dict, dict]: 

453 """Shared setup for async/sync :meth:`collections`.""" 

454 request = {"parent": "{}/documents".format(self._database_string)} 

455 kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout) 

456 

457 return request, kwargs 

458 

459 def collections( 

460 self, 

461 retry: retries.Retry = None, 

462 timeout: float = None, 

463 ) -> Union[ 

464 AsyncGenerator[BaseCollectionReference[BaseQuery], Any], 

465 Generator[BaseCollectionReference[BaseQuery], Any, Any], 

466 ]: 

467 raise NotImplementedError 

468 

469 def batch(self) -> BaseWriteBatch: 

470 raise NotImplementedError 

471 

472 def transaction(self, **kwargs) -> BaseTransaction: 

473 raise NotImplementedError 

474 

475 

476def _reference_info(references: list) -> Tuple[list, dict]: 

477 """Get information about document references. 

478 

479 Helper for :meth:`~google.cloud.firestore_v1.client.Client.get_all`. 

480 

481 Args: 

482 references (List[.DocumentReference, ...]): Iterable of document 

483 references. 

484 

485 Returns: 

486 Tuple[List[str, ...], Dict[str, .DocumentReference]]: A two-tuple of 

487 

488 * fully-qualified documents paths for each reference in ``references`` 

489 * a mapping from the paths to the original reference. (If multiple 

490 ``references`` contains multiple references to the same document, 

491 that key will be overwritten in the result.) 

492 """ 

493 document_paths = [] 

494 reference_map = {} 

495 for reference in references: 

496 doc_path = reference._document_path 

497 document_paths.append(doc_path) 

498 reference_map[doc_path] = reference 

499 

500 return document_paths, reference_map 

501 

502 

503def _get_reference(document_path: str, reference_map: dict) -> BaseDocumentReference: 

504 """Get a document reference from a dictionary. 

505 

506 This just wraps a simple dictionary look-up with a helpful error that is 

507 specific to :meth:`~google.cloud.firestore.client.Client.get_all`, the 

508 **public** caller of this function. 

509 

510 Args: 

511 document_path (str): A fully-qualified document path. 

512 reference_map (Dict[str, .DocumentReference]): A mapping (produced 

513 by :func:`_reference_info`) of fully-qualified document paths to 

514 document references. 

515 

516 Returns: 

517 .DocumentReference: The matching reference. 

518 

519 Raises: 

520 ValueError: If ``document_path`` has not been encountered. 

521 """ 

522 try: 

523 return reference_map[document_path] 

524 except KeyError: 

525 msg = _BAD_DOC_TEMPLATE.format(document_path) 

526 raise ValueError(msg) 

527 

528 

529def _parse_batch_get( 

530 get_doc_response: types.BatchGetDocumentsResponse, 

531 reference_map: dict, 

532 client: BaseClient, 

533) -> DocumentSnapshot: 

534 """Parse a `BatchGetDocumentsResponse` protobuf. 

535 

536 Args: 

537 get_doc_response (~google.cloud.proto.firestore.v1.\ 

538 firestore.BatchGetDocumentsResponse): A single response (from 

539 a stream) containing the "get" response for a document. 

540 reference_map (Dict[str, .DocumentReference]): A mapping (produced 

541 by :func:`_reference_info`) of fully-qualified document paths to 

542 document references. 

543 client (:class:`~google.cloud.firestore_v1.client.Client`): 

544 A client that has a document factory. 

545 

546 Returns: 

547 [.DocumentSnapshot]: The retrieved snapshot. 

548 

549 Raises: 

550 ValueError: If the response has a ``result`` field (a oneof) other 

551 than ``found`` or ``missing``. 

552 """ 

553 result_type = get_doc_response._pb.WhichOneof("result") 

554 if result_type == "found": 

555 reference = _get_reference(get_doc_response.found.name, reference_map) 

556 data = _helpers.decode_dict(get_doc_response.found.fields, client) 

557 snapshot = DocumentSnapshot( 

558 reference, 

559 data, 

560 exists=True, 

561 read_time=get_doc_response.read_time, 

562 create_time=get_doc_response.found.create_time, 

563 update_time=get_doc_response.found.update_time, 

564 ) 

565 elif result_type == "missing": 

566 reference = _get_reference(get_doc_response.missing, reference_map) 

567 snapshot = DocumentSnapshot( 

568 reference, 

569 None, 

570 exists=False, 

571 read_time=get_doc_response.read_time, 

572 create_time=None, 

573 update_time=None, 

574 ) 

575 else: 

576 raise ValueError( 

577 "`BatchGetDocumentsResponse.result` (a oneof) had a field other " 

578 "than `found` or `missing` set, or was unset" 

579 ) 

580 return snapshot 

581 

582 

583def _get_doc_mask(field_paths: Iterable[str]) -> Optional[types.common.DocumentMask]: 

584 """Get a document mask if field paths are provided. 

585 

586 Args: 

587 field_paths (Optional[Iterable[str, ...]]): An iterable of field 

588 paths (``.``-delimited list of field names) to use as a 

589 projection of document fields in the returned results. 

590 

591 Returns: 

592 Optional[google.cloud.firestore_v1.types.common.DocumentMask]: A mask 

593 to project documents to a restricted set of field paths. 

594 """ 

595 if field_paths is None: 

596 return None 

597 else: 

598 return types.DocumentMask(field_paths=field_paths) 

599 

600 

601def _path_helper(path: tuple) -> Tuple[str]: 

602 """Standardize path into a tuple of path segments. 

603 

604 Args: 

605 path (Tuple[str, ...]): Can either be 

606 

607 * A single ``/``-delimited path 

608 * A tuple of path segments 

609 """ 

610 if len(path) == 1: 

611 return path[0].split(_helpers.DOCUMENT_PATH_DELIMITER) 

612 else: 

613 return path