1# -*- coding: utf-8 -*-
2# Copyright 2022 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import abc
17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union
18
19from google.cloud.bigquery_storage_v1 import gapic_version as package_version
20
21import google.auth # type: ignore
22import google.api_core
23from google.api_core import exceptions as core_exceptions
24from google.api_core import gapic_v1
25from google.api_core import retry as retries
26from google.auth import credentials as ga_credentials # type: ignore
27from google.oauth2 import service_account # type: ignore
28
29from google.cloud.bigquery_storage_v1.types import storage
30from google.cloud.bigquery_storage_v1.types import stream
31
32DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
33 gapic_version=package_version.__version__
34)
35
36
37class BigQueryReadTransport(abc.ABC):
38 """Abstract transport class for BigQueryRead."""
39
40 AUTH_SCOPES = (
41 "https://www.googleapis.com/auth/bigquery",
42 "https://www.googleapis.com/auth/cloud-platform",
43 )
44
45 DEFAULT_HOST: str = "bigquerystorage.googleapis.com"
46
47 def __init__(
48 self,
49 *,
50 host: str = DEFAULT_HOST,
51 credentials: Optional[ga_credentials.Credentials] = None,
52 credentials_file: Optional[str] = None,
53 scopes: Optional[Sequence[str]] = None,
54 quota_project_id: Optional[str] = None,
55 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
56 always_use_jwt_access: Optional[bool] = False,
57 api_audience: Optional[str] = None,
58 **kwargs,
59 ) -> None:
60 """Instantiate the transport.
61
62 Args:
63 host (Optional[str]):
64 The hostname to connect to.
65 credentials (Optional[google.auth.credentials.Credentials]): The
66 authorization credentials to attach to requests. These
67 credentials identify the application to the service; if none
68 are specified, the client will attempt to ascertain the
69 credentials from the environment.
70 credentials_file (Optional[str]): A file with credentials that can
71 be loaded with :func:`google.auth.load_credentials_from_file`.
72 This argument is mutually exclusive with credentials.
73 scopes (Optional[Sequence[str]]): A list of scopes.
74 quota_project_id (Optional[str]): An optional project to use for billing
75 and quota.
76 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
77 The client info used to send a user-agent string along with
78 API requests. If ``None``, then default info will be used.
79 Generally, you only need to set this if you're developing
80 your own client library.
81 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
82 be used for service account credentials.
83 """
84
85 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES}
86
87 # Save the scopes.
88 self._scopes = scopes
89
90 # If no credentials are provided, then determine the appropriate
91 # defaults.
92 if credentials and credentials_file:
93 raise core_exceptions.DuplicateCredentialArgs(
94 "'credentials_file' and 'credentials' are mutually exclusive"
95 )
96
97 if credentials_file is not None:
98 credentials, _ = google.auth.load_credentials_from_file(
99 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id
100 )
101 elif credentials is None:
102 credentials, _ = google.auth.default(
103 **scopes_kwargs, quota_project_id=quota_project_id
104 )
105 # Don't apply audience if the credentials file passed from user.
106 if hasattr(credentials, "with_gdch_audience"):
107 credentials = credentials.with_gdch_audience(
108 api_audience if api_audience else host
109 )
110
111 # If the credentials are service account credentials, then always try to use self signed JWT.
112 if (
113 always_use_jwt_access
114 and isinstance(credentials, service_account.Credentials)
115 and hasattr(service_account.Credentials, "with_always_use_jwt_access")
116 ):
117 credentials = credentials.with_always_use_jwt_access(True)
118
119 # Save the credentials.
120 self._credentials = credentials
121
122 # Save the hostname. Default to port 443 (HTTPS) if none is specified.
123 if ":" not in host:
124 host += ":443"
125 self._host = host
126
127 def _prep_wrapped_messages(self, client_info):
128 # Precompute the wrapped methods.
129 self._wrapped_methods = {
130 self.create_read_session: gapic_v1.method.wrap_method(
131 self.create_read_session,
132 default_retry=retries.Retry(
133 initial=0.1,
134 maximum=60.0,
135 multiplier=1.3,
136 predicate=retries.if_exception_type(
137 core_exceptions.DeadlineExceeded,
138 core_exceptions.ServiceUnavailable,
139 ),
140 deadline=600.0,
141 ),
142 default_timeout=600.0,
143 client_info=client_info,
144 ),
145 self.read_rows: gapic_v1.method.wrap_method(
146 self.read_rows,
147 default_retry=retries.Retry(
148 initial=0.1,
149 maximum=60.0,
150 multiplier=1.3,
151 predicate=retries.if_exception_type(
152 core_exceptions.ServiceUnavailable,
153 ),
154 deadline=86400.0,
155 ),
156 default_timeout=86400.0,
157 client_info=client_info,
158 ),
159 self.split_read_stream: gapic_v1.method.wrap_method(
160 self.split_read_stream,
161 default_retry=retries.Retry(
162 initial=0.1,
163 maximum=60.0,
164 multiplier=1.3,
165 predicate=retries.if_exception_type(
166 core_exceptions.DeadlineExceeded,
167 core_exceptions.ServiceUnavailable,
168 ),
169 deadline=600.0,
170 ),
171 default_timeout=600.0,
172 client_info=client_info,
173 ),
174 }
175
176 def close(self):
177 """Closes resources associated with the transport.
178
179 .. warning::
180 Only call this method if the transport is NOT shared
181 with other clients - this may cause errors in other clients!
182 """
183 raise NotImplementedError()
184
185 @property
186 def create_read_session(
187 self,
188 ) -> Callable[
189 [storage.CreateReadSessionRequest],
190 Union[stream.ReadSession, Awaitable[stream.ReadSession]],
191 ]:
192 raise NotImplementedError()
193
194 @property
195 def read_rows(
196 self,
197 ) -> Callable[
198 [storage.ReadRowsRequest],
199 Union[storage.ReadRowsResponse, Awaitable[storage.ReadRowsResponse]],
200 ]:
201 raise NotImplementedError()
202
203 @property
204 def split_read_stream(
205 self,
206 ) -> Callable[
207 [storage.SplitReadStreamRequest],
208 Union[
209 storage.SplitReadStreamResponse, Awaitable[storage.SplitReadStreamResponse]
210 ],
211 ]:
212 raise NotImplementedError()
213
214 @property
215 def kind(self) -> str:
216 raise NotImplementedError()
217
218
219__all__ = ("BigQueryReadTransport",)