1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import abc
17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union
18
19import google.api_core
20from google.api_core import exceptions as core_exceptions
21from google.api_core import gapic_v1
22from google.api_core import retry as retries
23import google.auth # type: ignore
24from google.auth import credentials as ga_credentials # type: ignore
25from google.oauth2 import service_account # type: ignore
26import google.protobuf
27
28from google.cloud.bigquery_storage_v1 import gapic_version as package_version
29from google.cloud.bigquery_storage_v1.types import storage, stream
30
31DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
32 gapic_version=package_version.__version__
33)
34
35if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER
36 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__
37
38
39class BigQueryWriteTransport(abc.ABC):
40 """Abstract transport class for BigQueryWrite."""
41
42 AUTH_SCOPES = (
43 "https://www.googleapis.com/auth/bigquery",
44 "https://www.googleapis.com/auth/bigquery.insertdata",
45 "https://www.googleapis.com/auth/cloud-platform",
46 )
47
48 DEFAULT_HOST: str = "bigquerystorage.googleapis.com"
49
50 def __init__(
51 self,
52 *,
53 host: str = DEFAULT_HOST,
54 credentials: Optional[ga_credentials.Credentials] = None,
55 credentials_file: Optional[str] = None,
56 scopes: Optional[Sequence[str]] = None,
57 quota_project_id: Optional[str] = None,
58 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
59 always_use_jwt_access: Optional[bool] = False,
60 api_audience: Optional[str] = None,
61 **kwargs,
62 ) -> None:
63 """Instantiate the transport.
64
65 Args:
66 host (Optional[str]):
67 The hostname to connect to (default: 'bigquerystorage.googleapis.com').
68 credentials (Optional[google.auth.credentials.Credentials]): The
69 authorization credentials to attach to requests. These
70 credentials identify the application to the service; if none
71 are specified, the client will attempt to ascertain the
72 credentials from the environment.
73 credentials_file (Optional[str]): A file with credentials that can
74 be loaded with :func:`google.auth.load_credentials_from_file`.
75 This argument is mutually exclusive with credentials.
76 scopes (Optional[Sequence[str]]): A list of scopes.
77 quota_project_id (Optional[str]): An optional project to use for billing
78 and quota.
79 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
80 The client info used to send a user-agent string along with
81 API requests. If ``None``, then default info will be used.
82 Generally, you only need to set this if you're developing
83 your own client library.
84 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
85 be used for service account credentials.
86 """
87
88 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES}
89
90 # Save the scopes.
91 self._scopes = scopes
92 if not hasattr(self, "_ignore_credentials"):
93 self._ignore_credentials: bool = False
94
95 # If no credentials are provided, then determine the appropriate
96 # defaults.
97 if credentials and credentials_file:
98 raise core_exceptions.DuplicateCredentialArgs(
99 "'credentials_file' and 'credentials' are mutually exclusive"
100 )
101
102 if credentials_file is not None:
103 credentials, _ = google.auth.load_credentials_from_file(
104 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id
105 )
106 elif credentials is None and not self._ignore_credentials:
107 credentials, _ = google.auth.default(
108 **scopes_kwargs, quota_project_id=quota_project_id
109 )
110 # Don't apply audience if the credentials file passed from user.
111 if hasattr(credentials, "with_gdch_audience"):
112 credentials = credentials.with_gdch_audience(
113 api_audience if api_audience else host
114 )
115
116 # If the credentials are service account credentials, then always try to use self signed JWT.
117 if (
118 always_use_jwt_access
119 and isinstance(credentials, service_account.Credentials)
120 and hasattr(service_account.Credentials, "with_always_use_jwt_access")
121 ):
122 credentials = credentials.with_always_use_jwt_access(True)
123
124 # Save the credentials.
125 self._credentials = credentials
126
127 # Save the hostname. Default to port 443 (HTTPS) if none is specified.
128 if ":" not in host:
129 host += ":443"
130 self._host = host
131
132 @property
133 def host(self):
134 return self._host
135
136 def _prep_wrapped_messages(self, client_info):
137 # Precompute the wrapped methods.
138 self._wrapped_methods = {
139 self.create_write_stream: gapic_v1.method.wrap_method(
140 self.create_write_stream,
141 default_retry=retries.Retry(
142 initial=10.0,
143 maximum=120.0,
144 multiplier=1.3,
145 predicate=retries.if_exception_type(
146 core_exceptions.DeadlineExceeded,
147 core_exceptions.ResourceExhausted,
148 core_exceptions.ServiceUnavailable,
149 ),
150 deadline=1200.0,
151 ),
152 default_timeout=1200.0,
153 client_info=client_info,
154 ),
155 self.append_rows: gapic_v1.method.wrap_method(
156 self.append_rows,
157 default_retry=retries.Retry(
158 initial=0.1,
159 maximum=60.0,
160 multiplier=1.3,
161 predicate=retries.if_exception_type(
162 core_exceptions.ServiceUnavailable,
163 ),
164 deadline=86400.0,
165 ),
166 default_timeout=86400.0,
167 client_info=client_info,
168 ),
169 self.get_write_stream: gapic_v1.method.wrap_method(
170 self.get_write_stream,
171 default_retry=retries.Retry(
172 initial=0.1,
173 maximum=60.0,
174 multiplier=1.3,
175 predicate=retries.if_exception_type(
176 core_exceptions.DeadlineExceeded,
177 core_exceptions.ResourceExhausted,
178 core_exceptions.ServiceUnavailable,
179 ),
180 deadline=600.0,
181 ),
182 default_timeout=600.0,
183 client_info=client_info,
184 ),
185 self.finalize_write_stream: gapic_v1.method.wrap_method(
186 self.finalize_write_stream,
187 default_retry=retries.Retry(
188 initial=0.1,
189 maximum=60.0,
190 multiplier=1.3,
191 predicate=retries.if_exception_type(
192 core_exceptions.DeadlineExceeded,
193 core_exceptions.ResourceExhausted,
194 core_exceptions.ServiceUnavailable,
195 ),
196 deadline=600.0,
197 ),
198 default_timeout=600.0,
199 client_info=client_info,
200 ),
201 self.batch_commit_write_streams: gapic_v1.method.wrap_method(
202 self.batch_commit_write_streams,
203 default_retry=retries.Retry(
204 initial=0.1,
205 maximum=60.0,
206 multiplier=1.3,
207 predicate=retries.if_exception_type(
208 core_exceptions.DeadlineExceeded,
209 core_exceptions.ResourceExhausted,
210 core_exceptions.ServiceUnavailable,
211 ),
212 deadline=600.0,
213 ),
214 default_timeout=600.0,
215 client_info=client_info,
216 ),
217 self.flush_rows: gapic_v1.method.wrap_method(
218 self.flush_rows,
219 default_retry=retries.Retry(
220 initial=0.1,
221 maximum=60.0,
222 multiplier=1.3,
223 predicate=retries.if_exception_type(
224 core_exceptions.DeadlineExceeded,
225 core_exceptions.ResourceExhausted,
226 core_exceptions.ServiceUnavailable,
227 ),
228 deadline=600.0,
229 ),
230 default_timeout=600.0,
231 client_info=client_info,
232 ),
233 }
234
235 def close(self):
236 """Closes resources associated with the transport.
237
238 .. warning::
239 Only call this method if the transport is NOT shared
240 with other clients - this may cause errors in other clients!
241 """
242 raise NotImplementedError()
243
244 @property
245 def create_write_stream(
246 self,
247 ) -> Callable[
248 [storage.CreateWriteStreamRequest],
249 Union[stream.WriteStream, Awaitable[stream.WriteStream]],
250 ]:
251 raise NotImplementedError()
252
253 @property
254 def append_rows(
255 self,
256 ) -> Callable[
257 [storage.AppendRowsRequest],
258 Union[storage.AppendRowsResponse, Awaitable[storage.AppendRowsResponse]],
259 ]:
260 raise NotImplementedError()
261
262 @property
263 def get_write_stream(
264 self,
265 ) -> Callable[
266 [storage.GetWriteStreamRequest],
267 Union[stream.WriteStream, Awaitable[stream.WriteStream]],
268 ]:
269 raise NotImplementedError()
270
271 @property
272 def finalize_write_stream(
273 self,
274 ) -> Callable[
275 [storage.FinalizeWriteStreamRequest],
276 Union[
277 storage.FinalizeWriteStreamResponse,
278 Awaitable[storage.FinalizeWriteStreamResponse],
279 ],
280 ]:
281 raise NotImplementedError()
282
283 @property
284 def batch_commit_write_streams(
285 self,
286 ) -> Callable[
287 [storage.BatchCommitWriteStreamsRequest],
288 Union[
289 storage.BatchCommitWriteStreamsResponse,
290 Awaitable[storage.BatchCommitWriteStreamsResponse],
291 ],
292 ]:
293 raise NotImplementedError()
294
295 @property
296 def flush_rows(
297 self,
298 ) -> Callable[
299 [storage.FlushRowsRequest],
300 Union[storage.FlushRowsResponse, Awaitable[storage.FlushRowsResponse]],
301 ]:
302 raise NotImplementedError()
303
304 @property
305 def kind(self) -> str:
306 raise NotImplementedError()
307
308
309__all__ = ("BigQueryWriteTransport",)