1# -*- coding: utf-8 -*-
2# Copyright 2023 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import abc
17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union
18
19from google.cloud.firestore_v1 import gapic_version as package_version
20
21import google.auth # type: ignore
22import google.api_core
23from google.api_core import exceptions as core_exceptions
24from google.api_core import gapic_v1
25from google.api_core import retry as retries
26from google.auth import credentials as ga_credentials # type: ignore
27from google.oauth2 import service_account # type: ignore
28
29from google.cloud.firestore_v1.types import document
30from google.cloud.firestore_v1.types import document as gf_document
31from google.cloud.firestore_v1.types import firestore
32from google.cloud.location import locations_pb2 # type: ignore
33from google.longrunning import operations_pb2 # type: ignore
34from google.protobuf import empty_pb2 # type: ignore
35
36DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
37 gapic_version=package_version.__version__
38)
39
40
41class FirestoreTransport(abc.ABC):
42 """Abstract transport class for Firestore."""
43
44 AUTH_SCOPES = (
45 "https://www.googleapis.com/auth/cloud-platform",
46 "https://www.googleapis.com/auth/datastore",
47 )
48
49 DEFAULT_HOST: str = "firestore.googleapis.com"
50
51 def __init__(
52 self,
53 *,
54 host: str = DEFAULT_HOST,
55 credentials: Optional[ga_credentials.Credentials] = None,
56 credentials_file: Optional[str] = None,
57 scopes: Optional[Sequence[str]] = None,
58 quota_project_id: Optional[str] = None,
59 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
60 always_use_jwt_access: Optional[bool] = False,
61 api_audience: Optional[str] = None,
62 **kwargs,
63 ) -> None:
64 """Instantiate the transport.
65
66 Args:
67 host (Optional[str]):
68 The hostname to connect to.
69 credentials (Optional[google.auth.credentials.Credentials]): The
70 authorization credentials to attach to requests. These
71 credentials identify the application to the service; if none
72 are specified, the client will attempt to ascertain the
73 credentials from the environment.
74 credentials_file (Optional[str]): A file with credentials that can
75 be loaded with :func:`google.auth.load_credentials_from_file`.
76 This argument is mutually exclusive with credentials.
77 scopes (Optional[Sequence[str]]): A list of scopes.
78 quota_project_id (Optional[str]): An optional project to use for billing
79 and quota.
80 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
81 The client info used to send a user-agent string along with
82 API requests. If ``None``, then default info will be used.
83 Generally, you only need to set this if you're developing
84 your own client library.
85 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
86 be used for service account credentials.
87 """
88
89 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES}
90
91 # Save the scopes.
92 self._scopes = scopes
93
94 # If no credentials are provided, then determine the appropriate
95 # defaults.
96 if credentials and credentials_file:
97 raise core_exceptions.DuplicateCredentialArgs(
98 "'credentials_file' and 'credentials' are mutually exclusive"
99 )
100
101 if credentials_file is not None:
102 credentials, _ = google.auth.load_credentials_from_file(
103 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id
104 )
105 elif credentials is None:
106 credentials, _ = google.auth.default(
107 **scopes_kwargs, quota_project_id=quota_project_id
108 )
109 # Don't apply audience if the credentials file passed from user.
110 if hasattr(credentials, "with_gdch_audience"):
111 credentials = credentials.with_gdch_audience(
112 api_audience if api_audience else host
113 )
114
115 # If the credentials are service account credentials, then always try to use self signed JWT.
116 if (
117 always_use_jwt_access
118 and isinstance(credentials, service_account.Credentials)
119 and hasattr(service_account.Credentials, "with_always_use_jwt_access")
120 ):
121 credentials = credentials.with_always_use_jwt_access(True)
122
123 # Save the credentials.
124 self._credentials = credentials
125
126 # Save the hostname. Default to port 443 (HTTPS) if none is specified.
127 if ":" not in host:
128 host += ":443"
129 self._host = host
130
131 def _prep_wrapped_messages(self, client_info):
132 # Precompute the wrapped methods.
133 self._wrapped_methods = {
134 self.get_document: gapic_v1.method.wrap_method(
135 self.get_document,
136 default_retry=retries.Retry(
137 initial=0.1,
138 maximum=60.0,
139 multiplier=1.3,
140 predicate=retries.if_exception_type(
141 core_exceptions.DeadlineExceeded,
142 core_exceptions.InternalServerError,
143 core_exceptions.ResourceExhausted,
144 core_exceptions.ServiceUnavailable,
145 ),
146 deadline=60.0,
147 ),
148 default_timeout=60.0,
149 client_info=client_info,
150 ),
151 self.list_documents: gapic_v1.method.wrap_method(
152 self.list_documents,
153 default_retry=retries.Retry(
154 initial=0.1,
155 maximum=60.0,
156 multiplier=1.3,
157 predicate=retries.if_exception_type(
158 core_exceptions.DeadlineExceeded,
159 core_exceptions.InternalServerError,
160 core_exceptions.ResourceExhausted,
161 core_exceptions.ServiceUnavailable,
162 ),
163 deadline=60.0,
164 ),
165 default_timeout=60.0,
166 client_info=client_info,
167 ),
168 self.update_document: gapic_v1.method.wrap_method(
169 self.update_document,
170 default_retry=retries.Retry(
171 initial=0.1,
172 maximum=60.0,
173 multiplier=1.3,
174 predicate=retries.if_exception_type(
175 core_exceptions.ResourceExhausted,
176 core_exceptions.ServiceUnavailable,
177 ),
178 deadline=60.0,
179 ),
180 default_timeout=60.0,
181 client_info=client_info,
182 ),
183 self.delete_document: gapic_v1.method.wrap_method(
184 self.delete_document,
185 default_retry=retries.Retry(
186 initial=0.1,
187 maximum=60.0,
188 multiplier=1.3,
189 predicate=retries.if_exception_type(
190 core_exceptions.DeadlineExceeded,
191 core_exceptions.InternalServerError,
192 core_exceptions.ResourceExhausted,
193 core_exceptions.ServiceUnavailable,
194 ),
195 deadline=60.0,
196 ),
197 default_timeout=60.0,
198 client_info=client_info,
199 ),
200 self.batch_get_documents: gapic_v1.method.wrap_method(
201 self.batch_get_documents,
202 default_retry=retries.Retry(
203 initial=0.1,
204 maximum=60.0,
205 multiplier=1.3,
206 predicate=retries.if_exception_type(
207 core_exceptions.DeadlineExceeded,
208 core_exceptions.InternalServerError,
209 core_exceptions.ResourceExhausted,
210 core_exceptions.ServiceUnavailable,
211 ),
212 deadline=300.0,
213 ),
214 default_timeout=300.0,
215 client_info=client_info,
216 ),
217 self.begin_transaction: gapic_v1.method.wrap_method(
218 self.begin_transaction,
219 default_retry=retries.Retry(
220 initial=0.1,
221 maximum=60.0,
222 multiplier=1.3,
223 predicate=retries.if_exception_type(
224 core_exceptions.DeadlineExceeded,
225 core_exceptions.InternalServerError,
226 core_exceptions.ResourceExhausted,
227 core_exceptions.ServiceUnavailable,
228 ),
229 deadline=60.0,
230 ),
231 default_timeout=60.0,
232 client_info=client_info,
233 ),
234 self.commit: gapic_v1.method.wrap_method(
235 self.commit,
236 default_retry=retries.Retry(
237 initial=0.1,
238 maximum=60.0,
239 multiplier=1.3,
240 predicate=retries.if_exception_type(
241 core_exceptions.ResourceExhausted,
242 core_exceptions.ServiceUnavailable,
243 ),
244 deadline=60.0,
245 ),
246 default_timeout=60.0,
247 client_info=client_info,
248 ),
249 self.rollback: gapic_v1.method.wrap_method(
250 self.rollback,
251 default_retry=retries.Retry(
252 initial=0.1,
253 maximum=60.0,
254 multiplier=1.3,
255 predicate=retries.if_exception_type(
256 core_exceptions.DeadlineExceeded,
257 core_exceptions.InternalServerError,
258 core_exceptions.ResourceExhausted,
259 core_exceptions.ServiceUnavailable,
260 ),
261 deadline=60.0,
262 ),
263 default_timeout=60.0,
264 client_info=client_info,
265 ),
266 self.run_query: gapic_v1.method.wrap_method(
267 self.run_query,
268 default_retry=retries.Retry(
269 initial=0.1,
270 maximum=60.0,
271 multiplier=1.3,
272 predicate=retries.if_exception_type(
273 core_exceptions.DeadlineExceeded,
274 core_exceptions.InternalServerError,
275 core_exceptions.ResourceExhausted,
276 core_exceptions.ServiceUnavailable,
277 ),
278 deadline=300.0,
279 ),
280 default_timeout=300.0,
281 client_info=client_info,
282 ),
283 self.run_aggregation_query: gapic_v1.method.wrap_method(
284 self.run_aggregation_query,
285 default_retry=retries.Retry(
286 initial=0.1,
287 maximum=60.0,
288 multiplier=1.3,
289 predicate=retries.if_exception_type(
290 core_exceptions.DeadlineExceeded,
291 core_exceptions.InternalServerError,
292 core_exceptions.ResourceExhausted,
293 core_exceptions.ServiceUnavailable,
294 ),
295 deadline=300.0,
296 ),
297 default_timeout=300.0,
298 client_info=client_info,
299 ),
300 self.partition_query: gapic_v1.method.wrap_method(
301 self.partition_query,
302 default_retry=retries.Retry(
303 initial=0.1,
304 maximum=60.0,
305 multiplier=1.3,
306 predicate=retries.if_exception_type(
307 core_exceptions.DeadlineExceeded,
308 core_exceptions.InternalServerError,
309 core_exceptions.ResourceExhausted,
310 core_exceptions.ServiceUnavailable,
311 ),
312 deadline=300.0,
313 ),
314 default_timeout=300.0,
315 client_info=client_info,
316 ),
317 self.write: gapic_v1.method.wrap_method(
318 self.write,
319 default_timeout=86400.0,
320 client_info=client_info,
321 ),
322 self.listen: gapic_v1.method.wrap_method(
323 self.listen,
324 default_retry=retries.Retry(
325 initial=0.1,
326 maximum=60.0,
327 multiplier=1.3,
328 predicate=retries.if_exception_type(
329 core_exceptions.DeadlineExceeded,
330 core_exceptions.InternalServerError,
331 core_exceptions.ResourceExhausted,
332 core_exceptions.ServiceUnavailable,
333 ),
334 deadline=86400.0,
335 ),
336 default_timeout=86400.0,
337 client_info=client_info,
338 ),
339 self.list_collection_ids: gapic_v1.method.wrap_method(
340 self.list_collection_ids,
341 default_retry=retries.Retry(
342 initial=0.1,
343 maximum=60.0,
344 multiplier=1.3,
345 predicate=retries.if_exception_type(
346 core_exceptions.DeadlineExceeded,
347 core_exceptions.InternalServerError,
348 core_exceptions.ResourceExhausted,
349 core_exceptions.ServiceUnavailable,
350 ),
351 deadline=60.0,
352 ),
353 default_timeout=60.0,
354 client_info=client_info,
355 ),
356 self.batch_write: gapic_v1.method.wrap_method(
357 self.batch_write,
358 default_retry=retries.Retry(
359 initial=0.1,
360 maximum=60.0,
361 multiplier=1.3,
362 predicate=retries.if_exception_type(
363 core_exceptions.Aborted,
364 core_exceptions.ResourceExhausted,
365 core_exceptions.ServiceUnavailable,
366 ),
367 deadline=60.0,
368 ),
369 default_timeout=60.0,
370 client_info=client_info,
371 ),
372 self.create_document: gapic_v1.method.wrap_method(
373 self.create_document,
374 default_retry=retries.Retry(
375 initial=0.1,
376 maximum=60.0,
377 multiplier=1.3,
378 predicate=retries.if_exception_type(
379 core_exceptions.ResourceExhausted,
380 core_exceptions.ServiceUnavailable,
381 ),
382 deadline=60.0,
383 ),
384 default_timeout=60.0,
385 client_info=client_info,
386 ),
387 }
388
389 def close(self):
390 """Closes resources associated with the transport.
391
392 .. warning::
393 Only call this method if the transport is NOT shared
394 with other clients - this may cause errors in other clients!
395 """
396 raise NotImplementedError()
397
398 @property
399 def get_document(
400 self,
401 ) -> Callable[
402 [firestore.GetDocumentRequest],
403 Union[document.Document, Awaitable[document.Document]],
404 ]:
405 raise NotImplementedError()
406
407 @property
408 def list_documents(
409 self,
410 ) -> Callable[
411 [firestore.ListDocumentsRequest],
412 Union[
413 firestore.ListDocumentsResponse, Awaitable[firestore.ListDocumentsResponse]
414 ],
415 ]:
416 raise NotImplementedError()
417
418 @property
419 def update_document(
420 self,
421 ) -> Callable[
422 [firestore.UpdateDocumentRequest],
423 Union[gf_document.Document, Awaitable[gf_document.Document]],
424 ]:
425 raise NotImplementedError()
426
427 @property
428 def delete_document(
429 self,
430 ) -> Callable[
431 [firestore.DeleteDocumentRequest],
432 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]],
433 ]:
434 raise NotImplementedError()
435
436 @property
437 def batch_get_documents(
438 self,
439 ) -> Callable[
440 [firestore.BatchGetDocumentsRequest],
441 Union[
442 firestore.BatchGetDocumentsResponse,
443 Awaitable[firestore.BatchGetDocumentsResponse],
444 ],
445 ]:
446 raise NotImplementedError()
447
448 @property
449 def begin_transaction(
450 self,
451 ) -> Callable[
452 [firestore.BeginTransactionRequest],
453 Union[
454 firestore.BeginTransactionResponse,
455 Awaitable[firestore.BeginTransactionResponse],
456 ],
457 ]:
458 raise NotImplementedError()
459
460 @property
461 def commit(
462 self,
463 ) -> Callable[
464 [firestore.CommitRequest],
465 Union[firestore.CommitResponse, Awaitable[firestore.CommitResponse]],
466 ]:
467 raise NotImplementedError()
468
469 @property
470 def rollback(
471 self,
472 ) -> Callable[
473 [firestore.RollbackRequest], Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]]
474 ]:
475 raise NotImplementedError()
476
477 @property
478 def run_query(
479 self,
480 ) -> Callable[
481 [firestore.RunQueryRequest],
482 Union[firestore.RunQueryResponse, Awaitable[firestore.RunQueryResponse]],
483 ]:
484 raise NotImplementedError()
485
486 @property
487 def run_aggregation_query(
488 self,
489 ) -> Callable[
490 [firestore.RunAggregationQueryRequest],
491 Union[
492 firestore.RunAggregationQueryResponse,
493 Awaitable[firestore.RunAggregationQueryResponse],
494 ],
495 ]:
496 raise NotImplementedError()
497
498 @property
499 def partition_query(
500 self,
501 ) -> Callable[
502 [firestore.PartitionQueryRequest],
503 Union[
504 firestore.PartitionQueryResponse,
505 Awaitable[firestore.PartitionQueryResponse],
506 ],
507 ]:
508 raise NotImplementedError()
509
510 @property
511 def write(
512 self,
513 ) -> Callable[
514 [firestore.WriteRequest],
515 Union[firestore.WriteResponse, Awaitable[firestore.WriteResponse]],
516 ]:
517 raise NotImplementedError()
518
519 @property
520 def listen(
521 self,
522 ) -> Callable[
523 [firestore.ListenRequest],
524 Union[firestore.ListenResponse, Awaitable[firestore.ListenResponse]],
525 ]:
526 raise NotImplementedError()
527
528 @property
529 def list_collection_ids(
530 self,
531 ) -> Callable[
532 [firestore.ListCollectionIdsRequest],
533 Union[
534 firestore.ListCollectionIdsResponse,
535 Awaitable[firestore.ListCollectionIdsResponse],
536 ],
537 ]:
538 raise NotImplementedError()
539
540 @property
541 def batch_write(
542 self,
543 ) -> Callable[
544 [firestore.BatchWriteRequest],
545 Union[firestore.BatchWriteResponse, Awaitable[firestore.BatchWriteResponse]],
546 ]:
547 raise NotImplementedError()
548
549 @property
550 def create_document(
551 self,
552 ) -> Callable[
553 [firestore.CreateDocumentRequest],
554 Union[document.Document, Awaitable[document.Document]],
555 ]:
556 raise NotImplementedError()
557
558 @property
559 def list_operations(
560 self,
561 ) -> Callable[
562 [operations_pb2.ListOperationsRequest],
563 Union[
564 operations_pb2.ListOperationsResponse,
565 Awaitable[operations_pb2.ListOperationsResponse],
566 ],
567 ]:
568 raise NotImplementedError()
569
570 @property
571 def get_operation(
572 self,
573 ) -> Callable[
574 [operations_pb2.GetOperationRequest],
575 Union[operations_pb2.Operation, Awaitable[operations_pb2.Operation]],
576 ]:
577 raise NotImplementedError()
578
579 @property
580 def cancel_operation(
581 self,
582 ) -> Callable[[operations_pb2.CancelOperationRequest], None,]:
583 raise NotImplementedError()
584
585 @property
586 def delete_operation(
587 self,
588 ) -> Callable[[operations_pb2.DeleteOperationRequest], None,]:
589 raise NotImplementedError()
590
591 @property
592 def kind(self) -> str:
593 raise NotImplementedError()
594
595
596__all__ = ("FirestoreTransport",)