1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import abc
17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union
18
19import google.api_core
20from google.api_core import exceptions as core_exceptions
21from google.api_core import gapic_v1
22from google.api_core import retry as retries
23import google.auth # type: ignore
24from google.auth import credentials as ga_credentials # type: ignore
25from google.oauth2 import service_account # type: ignore
26import google.protobuf
27
28from google.cloud.bigquery_storage_v1 import gapic_version as package_version
29from google.cloud.bigquery_storage_v1.types import storage, stream
30
31DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
32 gapic_version=package_version.__version__
33)
34
35if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER
36 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__
37
38
39class BigQueryReadTransport(abc.ABC):
40 """Abstract transport class for BigQueryRead."""
41
42 AUTH_SCOPES = (
43 "https://www.googleapis.com/auth/bigquery",
44 "https://www.googleapis.com/auth/cloud-platform",
45 )
46
47 DEFAULT_HOST: str = "bigquerystorage.googleapis.com"
48
49 def __init__(
50 self,
51 *,
52 host: str = DEFAULT_HOST,
53 credentials: Optional[ga_credentials.Credentials] = None,
54 credentials_file: Optional[str] = None,
55 scopes: Optional[Sequence[str]] = None,
56 quota_project_id: Optional[str] = None,
57 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
58 always_use_jwt_access: Optional[bool] = False,
59 api_audience: Optional[str] = None,
60 **kwargs,
61 ) -> None:
62 """Instantiate the transport.
63
64 Args:
65 host (Optional[str]):
66 The hostname to connect to (default: 'bigquerystorage.googleapis.com').
67 credentials (Optional[google.auth.credentials.Credentials]): The
68 authorization credentials to attach to requests. These
69 credentials identify the application to the service; if none
70 are specified, the client will attempt to ascertain the
71 credentials from the environment.
72 credentials_file (Optional[str]): A file with credentials that can
73 be loaded with :func:`google.auth.load_credentials_from_file`.
74 This argument is mutually exclusive with credentials.
75 scopes (Optional[Sequence[str]]): A list of scopes.
76 quota_project_id (Optional[str]): An optional project to use for billing
77 and quota.
78 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
79 The client info used to send a user-agent string along with
80 API requests. If ``None``, then default info will be used.
81 Generally, you only need to set this if you're developing
82 your own client library.
83 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
84 be used for service account credentials.
85 """
86
87 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES}
88
89 # Save the scopes.
90 self._scopes = scopes
91 if not hasattr(self, "_ignore_credentials"):
92 self._ignore_credentials: bool = False
93
94 # If no credentials are provided, then determine the appropriate
95 # defaults.
96 if credentials and credentials_file:
97 raise core_exceptions.DuplicateCredentialArgs(
98 "'credentials_file' and 'credentials' are mutually exclusive"
99 )
100
101 if credentials_file is not None:
102 credentials, _ = google.auth.load_credentials_from_file(
103 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id
104 )
105 elif credentials is None and not self._ignore_credentials:
106 credentials, _ = google.auth.default(
107 **scopes_kwargs, quota_project_id=quota_project_id
108 )
109 # Don't apply audience if the credentials file passed from user.
110 if hasattr(credentials, "with_gdch_audience"):
111 credentials = credentials.with_gdch_audience(
112 api_audience if api_audience else host
113 )
114
115 # If the credentials are service account credentials, then always try to use self signed JWT.
116 if (
117 always_use_jwt_access
118 and isinstance(credentials, service_account.Credentials)
119 and hasattr(service_account.Credentials, "with_always_use_jwt_access")
120 ):
121 credentials = credentials.with_always_use_jwt_access(True)
122
123 # Save the credentials.
124 self._credentials = credentials
125
126 # Save the hostname. Default to port 443 (HTTPS) if none is specified.
127 if ":" not in host:
128 host += ":443"
129 self._host = host
130
131 @property
132 def host(self):
133 return self._host
134
135 def _prep_wrapped_messages(self, client_info):
136 # Precompute the wrapped methods.
137 self._wrapped_methods = {
138 self.create_read_session: gapic_v1.method.wrap_method(
139 self.create_read_session,
140 default_retry=retries.Retry(
141 initial=0.1,
142 maximum=60.0,
143 multiplier=1.3,
144 predicate=retries.if_exception_type(
145 core_exceptions.DeadlineExceeded,
146 core_exceptions.ServiceUnavailable,
147 ),
148 deadline=600.0,
149 ),
150 default_timeout=600.0,
151 client_info=client_info,
152 ),
153 self.read_rows: gapic_v1.method.wrap_method(
154 self.read_rows,
155 default_retry=retries.Retry(
156 initial=0.1,
157 maximum=60.0,
158 multiplier=1.3,
159 predicate=retries.if_exception_type(
160 core_exceptions.ServiceUnavailable,
161 ),
162 deadline=86400.0,
163 ),
164 default_timeout=86400.0,
165 client_info=client_info,
166 ),
167 self.split_read_stream: gapic_v1.method.wrap_method(
168 self.split_read_stream,
169 default_retry=retries.Retry(
170 initial=0.1,
171 maximum=60.0,
172 multiplier=1.3,
173 predicate=retries.if_exception_type(
174 core_exceptions.DeadlineExceeded,
175 core_exceptions.ServiceUnavailable,
176 ),
177 deadline=600.0,
178 ),
179 default_timeout=600.0,
180 client_info=client_info,
181 ),
182 }
183
184 def close(self):
185 """Closes resources associated with the transport.
186
187 .. warning::
188 Only call this method if the transport is NOT shared
189 with other clients - this may cause errors in other clients!
190 """
191 raise NotImplementedError()
192
193 @property
194 def create_read_session(
195 self,
196 ) -> Callable[
197 [storage.CreateReadSessionRequest],
198 Union[stream.ReadSession, Awaitable[stream.ReadSession]],
199 ]:
200 raise NotImplementedError()
201
202 @property
203 def read_rows(
204 self,
205 ) -> Callable[
206 [storage.ReadRowsRequest],
207 Union[storage.ReadRowsResponse, Awaitable[storage.ReadRowsResponse]],
208 ]:
209 raise NotImplementedError()
210
211 @property
212 def split_read_stream(
213 self,
214 ) -> Callable[
215 [storage.SplitReadStreamRequest],
216 Union[
217 storage.SplitReadStreamResponse, Awaitable[storage.SplitReadStreamResponse]
218 ],
219 ]:
220 raise NotImplementedError()
221
222 @property
223 def kind(self) -> str:
224 raise NotImplementedError()
225
226
227__all__ = ("BigQueryReadTransport",)