1# -*- coding: utf-8 -*-
2# Copyright 2024 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import abc
17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union
18
19import google.api_core
20from google.api_core import exceptions as core_exceptions
21from google.api_core import gapic_v1
22from google.api_core import retry as retries
23import google.auth # type: ignore
24from google.auth import credentials as ga_credentials # type: ignore
25from google.oauth2 import service_account # type: ignore
26
27from google.cloud.bigquery_storage_v1 import gapic_version as package_version
28from google.cloud.bigquery_storage_v1.types import storage, stream
29
30DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
31 gapic_version=package_version.__version__
32)
33
34
35class BigQueryWriteTransport(abc.ABC):
36 """Abstract transport class for BigQueryWrite."""
37
38 AUTH_SCOPES = (
39 "https://www.googleapis.com/auth/bigquery",
40 "https://www.googleapis.com/auth/bigquery.insertdata",
41 "https://www.googleapis.com/auth/cloud-platform",
42 )
43
44 DEFAULT_HOST: str = "bigquerystorage.googleapis.com"
45
46 def __init__(
47 self,
48 *,
49 host: str = DEFAULT_HOST,
50 credentials: Optional[ga_credentials.Credentials] = None,
51 credentials_file: Optional[str] = None,
52 scopes: Optional[Sequence[str]] = None,
53 quota_project_id: Optional[str] = None,
54 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
55 always_use_jwt_access: Optional[bool] = False,
56 api_audience: Optional[str] = None,
57 **kwargs,
58 ) -> None:
59 """Instantiate the transport.
60
61 Args:
62 host (Optional[str]):
63 The hostname to connect to (default: 'bigquerystorage.googleapis.com').
64 credentials (Optional[google.auth.credentials.Credentials]): The
65 authorization credentials to attach to requests. These
66 credentials identify the application to the service; if none
67 are specified, the client will attempt to ascertain the
68 credentials from the environment.
69 credentials_file (Optional[str]): A file with credentials that can
70 be loaded with :func:`google.auth.load_credentials_from_file`.
71 This argument is mutually exclusive with credentials.
72 scopes (Optional[Sequence[str]]): A list of scopes.
73 quota_project_id (Optional[str]): An optional project to use for billing
74 and quota.
75 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
76 The client info used to send a user-agent string along with
77 API requests. If ``None``, then default info will be used.
78 Generally, you only need to set this if you're developing
79 your own client library.
80 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
81 be used for service account credentials.
82 """
83
84 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES}
85
86 # Save the scopes.
87 self._scopes = scopes
88
89 # If no credentials are provided, then determine the appropriate
90 # defaults.
91 if credentials and credentials_file:
92 raise core_exceptions.DuplicateCredentialArgs(
93 "'credentials_file' and 'credentials' are mutually exclusive"
94 )
95
96 if credentials_file is not None:
97 credentials, _ = google.auth.load_credentials_from_file(
98 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id
99 )
100 elif credentials is None:
101 credentials, _ = google.auth.default(
102 **scopes_kwargs, quota_project_id=quota_project_id
103 )
104 # Don't apply audience if the credentials file passed from user.
105 if hasattr(credentials, "with_gdch_audience"):
106 credentials = credentials.with_gdch_audience(
107 api_audience if api_audience else host
108 )
109
110 # If the credentials are service account credentials, then always try to use self signed JWT.
111 if (
112 always_use_jwt_access
113 and isinstance(credentials, service_account.Credentials)
114 and hasattr(service_account.Credentials, "with_always_use_jwt_access")
115 ):
116 credentials = credentials.with_always_use_jwt_access(True)
117
118 # Save the credentials.
119 self._credentials = credentials
120
121 # Save the hostname. Default to port 443 (HTTPS) if none is specified.
122 if ":" not in host:
123 host += ":443"
124 self._host = host
125
126 @property
127 def host(self):
128 return self._host
129
130 def _prep_wrapped_messages(self, client_info):
131 # Precompute the wrapped methods.
132 self._wrapped_methods = {
133 self.create_write_stream: gapic_v1.method.wrap_method(
134 self.create_write_stream,
135 default_retry=retries.Retry(
136 initial=10.0,
137 maximum=120.0,
138 multiplier=1.3,
139 predicate=retries.if_exception_type(
140 core_exceptions.DeadlineExceeded,
141 core_exceptions.ResourceExhausted,
142 core_exceptions.ServiceUnavailable,
143 ),
144 deadline=1200.0,
145 ),
146 default_timeout=1200.0,
147 client_info=client_info,
148 ),
149 self.append_rows: gapic_v1.method.wrap_method(
150 self.append_rows,
151 default_retry=retries.Retry(
152 initial=0.1,
153 maximum=60.0,
154 multiplier=1.3,
155 predicate=retries.if_exception_type(
156 core_exceptions.ServiceUnavailable,
157 ),
158 deadline=86400.0,
159 ),
160 default_timeout=86400.0,
161 client_info=client_info,
162 ),
163 self.get_write_stream: gapic_v1.method.wrap_method(
164 self.get_write_stream,
165 default_retry=retries.Retry(
166 initial=0.1,
167 maximum=60.0,
168 multiplier=1.3,
169 predicate=retries.if_exception_type(
170 core_exceptions.DeadlineExceeded,
171 core_exceptions.ResourceExhausted,
172 core_exceptions.ServiceUnavailable,
173 ),
174 deadline=600.0,
175 ),
176 default_timeout=600.0,
177 client_info=client_info,
178 ),
179 self.finalize_write_stream: gapic_v1.method.wrap_method(
180 self.finalize_write_stream,
181 default_retry=retries.Retry(
182 initial=0.1,
183 maximum=60.0,
184 multiplier=1.3,
185 predicate=retries.if_exception_type(
186 core_exceptions.DeadlineExceeded,
187 core_exceptions.ResourceExhausted,
188 core_exceptions.ServiceUnavailable,
189 ),
190 deadline=600.0,
191 ),
192 default_timeout=600.0,
193 client_info=client_info,
194 ),
195 self.batch_commit_write_streams: gapic_v1.method.wrap_method(
196 self.batch_commit_write_streams,
197 default_retry=retries.Retry(
198 initial=0.1,
199 maximum=60.0,
200 multiplier=1.3,
201 predicate=retries.if_exception_type(
202 core_exceptions.DeadlineExceeded,
203 core_exceptions.ResourceExhausted,
204 core_exceptions.ServiceUnavailable,
205 ),
206 deadline=600.0,
207 ),
208 default_timeout=600.0,
209 client_info=client_info,
210 ),
211 self.flush_rows: gapic_v1.method.wrap_method(
212 self.flush_rows,
213 default_retry=retries.Retry(
214 initial=0.1,
215 maximum=60.0,
216 multiplier=1.3,
217 predicate=retries.if_exception_type(
218 core_exceptions.DeadlineExceeded,
219 core_exceptions.ResourceExhausted,
220 core_exceptions.ServiceUnavailable,
221 ),
222 deadline=600.0,
223 ),
224 default_timeout=600.0,
225 client_info=client_info,
226 ),
227 }
228
229 def close(self):
230 """Closes resources associated with the transport.
231
232 .. warning::
233 Only call this method if the transport is NOT shared
234 with other clients - this may cause errors in other clients!
235 """
236 raise NotImplementedError()
237
238 @property
239 def create_write_stream(
240 self,
241 ) -> Callable[
242 [storage.CreateWriteStreamRequest],
243 Union[stream.WriteStream, Awaitable[stream.WriteStream]],
244 ]:
245 raise NotImplementedError()
246
247 @property
248 def append_rows(
249 self,
250 ) -> Callable[
251 [storage.AppendRowsRequest],
252 Union[storage.AppendRowsResponse, Awaitable[storage.AppendRowsResponse]],
253 ]:
254 raise NotImplementedError()
255
256 @property
257 def get_write_stream(
258 self,
259 ) -> Callable[
260 [storage.GetWriteStreamRequest],
261 Union[stream.WriteStream, Awaitable[stream.WriteStream]],
262 ]:
263 raise NotImplementedError()
264
265 @property
266 def finalize_write_stream(
267 self,
268 ) -> Callable[
269 [storage.FinalizeWriteStreamRequest],
270 Union[
271 storage.FinalizeWriteStreamResponse,
272 Awaitable[storage.FinalizeWriteStreamResponse],
273 ],
274 ]:
275 raise NotImplementedError()
276
277 @property
278 def batch_commit_write_streams(
279 self,
280 ) -> Callable[
281 [storage.BatchCommitWriteStreamsRequest],
282 Union[
283 storage.BatchCommitWriteStreamsResponse,
284 Awaitable[storage.BatchCommitWriteStreamsResponse],
285 ],
286 ]:
287 raise NotImplementedError()
288
289 @property
290 def flush_rows(
291 self,
292 ) -> Callable[
293 [storage.FlushRowsRequest],
294 Union[storage.FlushRowsResponse, Awaitable[storage.FlushRowsResponse]],
295 ]:
296 raise NotImplementedError()
297
298 @property
299 def kind(self) -> str:
300 raise NotImplementedError()
301
302
303__all__ = ("BigQueryWriteTransport",)