1# -*- coding: utf-8 -*-
2# Copyright 2024 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import abc
17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union
18
19import google.api_core
20from google.api_core import exceptions as core_exceptions
21from google.api_core import gapic_v1
22from google.api_core import retry as retries
23import google.auth # type: ignore
24from google.auth import credentials as ga_credentials # type: ignore
25from google.oauth2 import service_account # type: ignore
26
27from google.cloud.bigquery_storage_v1 import gapic_version as package_version
28from google.cloud.bigquery_storage_v1.types import storage, stream
29
30DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
31 gapic_version=package_version.__version__
32)
33
34
35class BigQueryReadTransport(abc.ABC):
36 """Abstract transport class for BigQueryRead."""
37
38 AUTH_SCOPES = (
39 "https://www.googleapis.com/auth/bigquery",
40 "https://www.googleapis.com/auth/cloud-platform",
41 )
42
43 DEFAULT_HOST: str = "bigquerystorage.googleapis.com"
44
45 def __init__(
46 self,
47 *,
48 host: str = DEFAULT_HOST,
49 credentials: Optional[ga_credentials.Credentials] = None,
50 credentials_file: Optional[str] = None,
51 scopes: Optional[Sequence[str]] = None,
52 quota_project_id: Optional[str] = None,
53 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
54 always_use_jwt_access: Optional[bool] = False,
55 api_audience: Optional[str] = None,
56 **kwargs,
57 ) -> None:
58 """Instantiate the transport.
59
60 Args:
61 host (Optional[str]):
62 The hostname to connect to (default: 'bigquerystorage.googleapis.com').
63 credentials (Optional[google.auth.credentials.Credentials]): The
64 authorization credentials to attach to requests. These
65 credentials identify the application to the service; if none
66 are specified, the client will attempt to ascertain the
67 credentials from the environment.
68 credentials_file (Optional[str]): A file with credentials that can
69 be loaded with :func:`google.auth.load_credentials_from_file`.
70 This argument is mutually exclusive with credentials.
71 scopes (Optional[Sequence[str]]): A list of scopes.
72 quota_project_id (Optional[str]): An optional project to use for billing
73 and quota.
74 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
75 The client info used to send a user-agent string along with
76 API requests. If ``None``, then default info will be used.
77 Generally, you only need to set this if you're developing
78 your own client library.
79 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
80 be used for service account credentials.
81 """
82
83 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES}
84
85 # Save the scopes.
86 self._scopes = scopes
87
88 # If no credentials are provided, then determine the appropriate
89 # defaults.
90 if credentials and credentials_file:
91 raise core_exceptions.DuplicateCredentialArgs(
92 "'credentials_file' and 'credentials' are mutually exclusive"
93 )
94
95 if credentials_file is not None:
96 credentials, _ = google.auth.load_credentials_from_file(
97 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id
98 )
99 elif credentials is None:
100 credentials, _ = google.auth.default(
101 **scopes_kwargs, quota_project_id=quota_project_id
102 )
103 # Don't apply audience if the credentials file passed from user.
104 if hasattr(credentials, "with_gdch_audience"):
105 credentials = credentials.with_gdch_audience(
106 api_audience if api_audience else host
107 )
108
109 # If the credentials are service account credentials, then always try to use self signed JWT.
110 if (
111 always_use_jwt_access
112 and isinstance(credentials, service_account.Credentials)
113 and hasattr(service_account.Credentials, "with_always_use_jwt_access")
114 ):
115 credentials = credentials.with_always_use_jwt_access(True)
116
117 # Save the credentials.
118 self._credentials = credentials
119
120 # Save the hostname. Default to port 443 (HTTPS) if none is specified.
121 if ":" not in host:
122 host += ":443"
123 self._host = host
124
125 @property
126 def host(self):
127 return self._host
128
129 def _prep_wrapped_messages(self, client_info):
130 # Precompute the wrapped methods.
131 self._wrapped_methods = {
132 self.create_read_session: gapic_v1.method.wrap_method(
133 self.create_read_session,
134 default_retry=retries.Retry(
135 initial=0.1,
136 maximum=60.0,
137 multiplier=1.3,
138 predicate=retries.if_exception_type(
139 core_exceptions.DeadlineExceeded,
140 core_exceptions.ServiceUnavailable,
141 ),
142 deadline=600.0,
143 ),
144 default_timeout=600.0,
145 client_info=client_info,
146 ),
147 self.read_rows: gapic_v1.method.wrap_method(
148 self.read_rows,
149 default_retry=retries.Retry(
150 initial=0.1,
151 maximum=60.0,
152 multiplier=1.3,
153 predicate=retries.if_exception_type(
154 core_exceptions.ServiceUnavailable,
155 ),
156 deadline=86400.0,
157 ),
158 default_timeout=86400.0,
159 client_info=client_info,
160 ),
161 self.split_read_stream: gapic_v1.method.wrap_method(
162 self.split_read_stream,
163 default_retry=retries.Retry(
164 initial=0.1,
165 maximum=60.0,
166 multiplier=1.3,
167 predicate=retries.if_exception_type(
168 core_exceptions.DeadlineExceeded,
169 core_exceptions.ServiceUnavailable,
170 ),
171 deadline=600.0,
172 ),
173 default_timeout=600.0,
174 client_info=client_info,
175 ),
176 }
177
178 def close(self):
179 """Closes resources associated with the transport.
180
181 .. warning::
182 Only call this method if the transport is NOT shared
183 with other clients - this may cause errors in other clients!
184 """
185 raise NotImplementedError()
186
187 @property
188 def create_read_session(
189 self,
190 ) -> Callable[
191 [storage.CreateReadSessionRequest],
192 Union[stream.ReadSession, Awaitable[stream.ReadSession]],
193 ]:
194 raise NotImplementedError()
195
196 @property
197 def read_rows(
198 self,
199 ) -> Callable[
200 [storage.ReadRowsRequest],
201 Union[storage.ReadRowsResponse, Awaitable[storage.ReadRowsResponse]],
202 ]:
203 raise NotImplementedError()
204
205 @property
206 def split_read_stream(
207 self,
208 ) -> Callable[
209 [storage.SplitReadStreamRequest],
210 Union[
211 storage.SplitReadStreamResponse, Awaitable[storage.SplitReadStreamResponse]
212 ],
213 ]:
214 raise NotImplementedError()
215
216 @property
217 def kind(self) -> str:
218 raise NotImplementedError()
219
220
221__all__ = ("BigQueryReadTransport",)