1# -*- coding: utf-8 -*-
2# Copyright 2022 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import abc
17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union
18
19from google.cloud.bigquery_storage_v1 import gapic_version as package_version
20
21import google.auth # type: ignore
22import google.api_core
23from google.api_core import exceptions as core_exceptions
24from google.api_core import gapic_v1
25from google.api_core import retry as retries
26from google.auth import credentials as ga_credentials # type: ignore
27from google.oauth2 import service_account # type: ignore
28
29from google.cloud.bigquery_storage_v1.types import storage
30from google.cloud.bigquery_storage_v1.types import stream
31
32DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
33 gapic_version=package_version.__version__
34)
35
36
37class BigQueryWriteTransport(abc.ABC):
38 """Abstract transport class for BigQueryWrite."""
39
40 AUTH_SCOPES = (
41 "https://www.googleapis.com/auth/bigquery",
42 "https://www.googleapis.com/auth/bigquery.insertdata",
43 "https://www.googleapis.com/auth/cloud-platform",
44 )
45
46 DEFAULT_HOST: str = "bigquerystorage.googleapis.com"
47
48 def __init__(
49 self,
50 *,
51 host: str = DEFAULT_HOST,
52 credentials: Optional[ga_credentials.Credentials] = None,
53 credentials_file: Optional[str] = None,
54 scopes: Optional[Sequence[str]] = None,
55 quota_project_id: Optional[str] = None,
56 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
57 always_use_jwt_access: Optional[bool] = False,
58 api_audience: Optional[str] = None,
59 **kwargs,
60 ) -> None:
61 """Instantiate the transport.
62
63 Args:
64 host (Optional[str]):
65 The hostname to connect to.
66 credentials (Optional[google.auth.credentials.Credentials]): The
67 authorization credentials to attach to requests. These
68 credentials identify the application to the service; if none
69 are specified, the client will attempt to ascertain the
70 credentials from the environment.
71 credentials_file (Optional[str]): A file with credentials that can
72 be loaded with :func:`google.auth.load_credentials_from_file`.
73 This argument is mutually exclusive with credentials.
74 scopes (Optional[Sequence[str]]): A list of scopes.
75 quota_project_id (Optional[str]): An optional project to use for billing
76 and quota.
77 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
78 The client info used to send a user-agent string along with
79 API requests. If ``None``, then default info will be used.
80 Generally, you only need to set this if you're developing
81 your own client library.
82 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
83 be used for service account credentials.
84 """
85
86 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES}
87
88 # Save the scopes.
89 self._scopes = scopes
90
91 # If no credentials are provided, then determine the appropriate
92 # defaults.
93 if credentials and credentials_file:
94 raise core_exceptions.DuplicateCredentialArgs(
95 "'credentials_file' and 'credentials' are mutually exclusive"
96 )
97
98 if credentials_file is not None:
99 credentials, _ = google.auth.load_credentials_from_file(
100 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id
101 )
102 elif credentials is None:
103 credentials, _ = google.auth.default(
104 **scopes_kwargs, quota_project_id=quota_project_id
105 )
106 # Don't apply audience if the credentials file passed from user.
107 if hasattr(credentials, "with_gdch_audience"):
108 credentials = credentials.with_gdch_audience(
109 api_audience if api_audience else host
110 )
111
112 # If the credentials are service account credentials, then always try to use self signed JWT.
113 if (
114 always_use_jwt_access
115 and isinstance(credentials, service_account.Credentials)
116 and hasattr(service_account.Credentials, "with_always_use_jwt_access")
117 ):
118 credentials = credentials.with_always_use_jwt_access(True)
119
120 # Save the credentials.
121 self._credentials = credentials
122
123 # Save the hostname. Default to port 443 (HTTPS) if none is specified.
124 if ":" not in host:
125 host += ":443"
126 self._host = host
127
128 def _prep_wrapped_messages(self, client_info):
129 # Precompute the wrapped methods.
130 self._wrapped_methods = {
131 self.create_write_stream: gapic_v1.method.wrap_method(
132 self.create_write_stream,
133 default_retry=retries.Retry(
134 initial=10.0,
135 maximum=120.0,
136 multiplier=1.3,
137 predicate=retries.if_exception_type(
138 core_exceptions.DeadlineExceeded,
139 core_exceptions.ResourceExhausted,
140 core_exceptions.ServiceUnavailable,
141 ),
142 deadline=1200.0,
143 ),
144 default_timeout=1200.0,
145 client_info=client_info,
146 ),
147 self.append_rows: gapic_v1.method.wrap_method(
148 self.append_rows,
149 default_retry=retries.Retry(
150 initial=0.1,
151 maximum=60.0,
152 multiplier=1.3,
153 predicate=retries.if_exception_type(
154 core_exceptions.ServiceUnavailable,
155 ),
156 deadline=86400.0,
157 ),
158 default_timeout=86400.0,
159 client_info=client_info,
160 ),
161 self.get_write_stream: gapic_v1.method.wrap_method(
162 self.get_write_stream,
163 default_retry=retries.Retry(
164 initial=0.1,
165 maximum=60.0,
166 multiplier=1.3,
167 predicate=retries.if_exception_type(
168 core_exceptions.DeadlineExceeded,
169 core_exceptions.ServiceUnavailable,
170 ),
171 deadline=600.0,
172 ),
173 default_timeout=600.0,
174 client_info=client_info,
175 ),
176 self.finalize_write_stream: gapic_v1.method.wrap_method(
177 self.finalize_write_stream,
178 default_retry=retries.Retry(
179 initial=0.1,
180 maximum=60.0,
181 multiplier=1.3,
182 predicate=retries.if_exception_type(
183 core_exceptions.DeadlineExceeded,
184 core_exceptions.ServiceUnavailable,
185 ),
186 deadline=600.0,
187 ),
188 default_timeout=600.0,
189 client_info=client_info,
190 ),
191 self.batch_commit_write_streams: gapic_v1.method.wrap_method(
192 self.batch_commit_write_streams,
193 default_retry=retries.Retry(
194 initial=0.1,
195 maximum=60.0,
196 multiplier=1.3,
197 predicate=retries.if_exception_type(
198 core_exceptions.DeadlineExceeded,
199 core_exceptions.ServiceUnavailable,
200 ),
201 deadline=600.0,
202 ),
203 default_timeout=600.0,
204 client_info=client_info,
205 ),
206 self.flush_rows: gapic_v1.method.wrap_method(
207 self.flush_rows,
208 default_retry=retries.Retry(
209 initial=0.1,
210 maximum=60.0,
211 multiplier=1.3,
212 predicate=retries.if_exception_type(
213 core_exceptions.DeadlineExceeded,
214 core_exceptions.ServiceUnavailable,
215 ),
216 deadline=600.0,
217 ),
218 default_timeout=600.0,
219 client_info=client_info,
220 ),
221 }
222
223 def close(self):
224 """Closes resources associated with the transport.
225
226 .. warning::
227 Only call this method if the transport is NOT shared
228 with other clients - this may cause errors in other clients!
229 """
230 raise NotImplementedError()
231
232 @property
233 def create_write_stream(
234 self,
235 ) -> Callable[
236 [storage.CreateWriteStreamRequest],
237 Union[stream.WriteStream, Awaitable[stream.WriteStream]],
238 ]:
239 raise NotImplementedError()
240
241 @property
242 def append_rows(
243 self,
244 ) -> Callable[
245 [storage.AppendRowsRequest],
246 Union[storage.AppendRowsResponse, Awaitable[storage.AppendRowsResponse]],
247 ]:
248 raise NotImplementedError()
249
250 @property
251 def get_write_stream(
252 self,
253 ) -> Callable[
254 [storage.GetWriteStreamRequest],
255 Union[stream.WriteStream, Awaitable[stream.WriteStream]],
256 ]:
257 raise NotImplementedError()
258
259 @property
260 def finalize_write_stream(
261 self,
262 ) -> Callable[
263 [storage.FinalizeWriteStreamRequest],
264 Union[
265 storage.FinalizeWriteStreamResponse,
266 Awaitable[storage.FinalizeWriteStreamResponse],
267 ],
268 ]:
269 raise NotImplementedError()
270
271 @property
272 def batch_commit_write_streams(
273 self,
274 ) -> Callable[
275 [storage.BatchCommitWriteStreamsRequest],
276 Union[
277 storage.BatchCommitWriteStreamsResponse,
278 Awaitable[storage.BatchCommitWriteStreamsResponse],
279 ],
280 ]:
281 raise NotImplementedError()
282
283 @property
284 def flush_rows(
285 self,
286 ) -> Callable[
287 [storage.FlushRowsRequest],
288 Union[storage.FlushRowsResponse, Awaitable[storage.FlushRowsResponse]],
289 ]:
290 raise NotImplementedError()
291
292 @property
293 def kind(self) -> str:
294 raise NotImplementedError()
295
296
297__all__ = ("BigQueryWriteTransport",)