1# -*- coding: utf-8 -*-
2# Copyright 2024 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from typing import Callable, Dict, Optional, Sequence, Tuple, Union
17import warnings
18
19from google.api_core import gapic_v1, grpc_helpers
20import google.auth # type: ignore
21from google.auth import credentials as ga_credentials # type: ignore
22from google.auth.transport.grpc import SslCredentials # type: ignore
23import grpc # type: ignore
24
25from google.cloud.bigquery_storage_v1.types import storage, stream
26
27from .base import DEFAULT_CLIENT_INFO, BigQueryReadTransport
28
29
30class BigQueryReadGrpcTransport(BigQueryReadTransport):
31 """gRPC backend transport for BigQueryRead.
32
33 BigQuery Read API.
34
35 The Read API can be used to read data from BigQuery.
36
37 This class defines the same methods as the primary client, so the
38 primary client can load the underlying transport implementation
39 and call it.
40
41 It sends protocol buffers over the wire using gRPC (which is built on
42 top of HTTP/2); the ``grpcio`` package must be installed.
43 """
44
45 _stubs: Dict[str, Callable]
46
47 def __init__(
48 self,
49 *,
50 host: str = "bigquerystorage.googleapis.com",
51 credentials: Optional[ga_credentials.Credentials] = None,
52 credentials_file: Optional[str] = None,
53 scopes: Optional[Sequence[str]] = None,
54 channel: Optional[grpc.Channel] = None,
55 api_mtls_endpoint: Optional[str] = None,
56 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
57 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
58 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
59 quota_project_id: Optional[str] = None,
60 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
61 always_use_jwt_access: Optional[bool] = False,
62 api_audience: Optional[str] = None,
63 ) -> None:
64 """Instantiate the transport.
65
66 Args:
67 host (Optional[str]):
68 The hostname to connect to (default: 'bigquerystorage.googleapis.com').
69 credentials (Optional[google.auth.credentials.Credentials]): The
70 authorization credentials to attach to requests. These
71 credentials identify the application to the service; if none
72 are specified, the client will attempt to ascertain the
73 credentials from the environment.
74 This argument is ignored if ``channel`` is provided.
75 credentials_file (Optional[str]): A file with credentials that can
76 be loaded with :func:`google.auth.load_credentials_from_file`.
77 This argument is ignored if ``channel`` is provided.
78 scopes (Optional(Sequence[str])): A list of scopes. This argument is
79 ignored if ``channel`` is provided.
80 channel (Optional[grpc.Channel]): A ``Channel`` instance through
81 which to make calls.
82 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
83 If provided, it overrides the ``host`` argument and tries to create
84 a mutual TLS channel with client SSL credentials from
85 ``client_cert_source`` or application default SSL credentials.
86 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
87 Deprecated. A callback to provide client SSL certificate bytes and
88 private key bytes, both in PEM format. It is ignored if
89 ``api_mtls_endpoint`` is None.
90 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
91 for the grpc channel. It is ignored if ``channel`` is provided.
92 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
93 A callback to provide client certificate bytes and private key bytes,
94 both in PEM format. It is used to configure a mutual TLS channel. It is
95 ignored if ``channel`` or ``ssl_channel_credentials`` is provided.
96 quota_project_id (Optional[str]): An optional project to use for billing
97 and quota.
98 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
99 The client info used to send a user-agent string along with
100 API requests. If ``None``, then default info will be used.
101 Generally, you only need to set this if you're developing
102 your own client library.
103 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
104 be used for service account credentials.
105
106 Raises:
107 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport
108 creation failed for any reason.
109 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
110 and ``credentials_file`` are passed.
111 """
112 self._grpc_channel = None
113 self._ssl_channel_credentials = ssl_channel_credentials
114 self._stubs: Dict[str, Callable] = {}
115
116 if api_mtls_endpoint:
117 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
118 if client_cert_source:
119 warnings.warn("client_cert_source is deprecated", DeprecationWarning)
120
121 if channel:
122 # Ignore credentials if a channel was passed.
123 credentials = False
124 # If a channel was explicitly provided, set it.
125 self._grpc_channel = channel
126 self._ssl_channel_credentials = None
127
128 else:
129 if api_mtls_endpoint:
130 host = api_mtls_endpoint
131
132 # Create SSL credentials with client_cert_source or application
133 # default SSL credentials.
134 if client_cert_source:
135 cert, key = client_cert_source()
136 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
137 certificate_chain=cert, private_key=key
138 )
139 else:
140 self._ssl_channel_credentials = SslCredentials().ssl_credentials
141
142 else:
143 if client_cert_source_for_mtls and not ssl_channel_credentials:
144 cert, key = client_cert_source_for_mtls()
145 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
146 certificate_chain=cert, private_key=key
147 )
148
149 # The base transport sets the host, credentials and scopes
150 super().__init__(
151 host=host,
152 credentials=credentials,
153 credentials_file=credentials_file,
154 scopes=scopes,
155 quota_project_id=quota_project_id,
156 client_info=client_info,
157 always_use_jwt_access=always_use_jwt_access,
158 api_audience=api_audience,
159 )
160
161 if not self._grpc_channel:
162 self._grpc_channel = type(self).create_channel(
163 self._host,
164 # use the credentials which are saved
165 credentials=self._credentials,
166 # Set ``credentials_file`` to ``None`` here as
167 # the credentials that we saved earlier should be used.
168 credentials_file=None,
169 scopes=self._scopes,
170 ssl_credentials=self._ssl_channel_credentials,
171 quota_project_id=quota_project_id,
172 options=[
173 ("grpc.max_send_message_length", -1),
174 ("grpc.max_receive_message_length", -1),
175 ],
176 )
177
178 # Wrap messages. This must be done after self._grpc_channel exists
179 self._prep_wrapped_messages(client_info)
180
181 @classmethod
182 def create_channel(
183 cls,
184 host: str = "bigquerystorage.googleapis.com",
185 credentials: Optional[ga_credentials.Credentials] = None,
186 credentials_file: Optional[str] = None,
187 scopes: Optional[Sequence[str]] = None,
188 quota_project_id: Optional[str] = None,
189 **kwargs,
190 ) -> grpc.Channel:
191 """Create and return a gRPC channel object.
192 Args:
193 host (Optional[str]): The host for the channel to use.
194 credentials (Optional[~.Credentials]): The
195 authorization credentials to attach to requests. These
196 credentials identify this application to the service. If
197 none are specified, the client will attempt to ascertain
198 the credentials from the environment.
199 credentials_file (Optional[str]): A file with credentials that can
200 be loaded with :func:`google.auth.load_credentials_from_file`.
201 This argument is mutually exclusive with credentials.
202 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
203 service. These are only used when credentials are not specified and
204 are passed to :func:`google.auth.default`.
205 quota_project_id (Optional[str]): An optional project to use for billing
206 and quota.
207 kwargs (Optional[dict]): Keyword arguments, which are passed to the
208 channel creation.
209 Returns:
210 grpc.Channel: A gRPC channel object.
211
212 Raises:
213 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
214 and ``credentials_file`` are passed.
215 """
216
217 return grpc_helpers.create_channel(
218 host,
219 credentials=credentials,
220 credentials_file=credentials_file,
221 quota_project_id=quota_project_id,
222 default_scopes=cls.AUTH_SCOPES,
223 scopes=scopes,
224 default_host=cls.DEFAULT_HOST,
225 **kwargs,
226 )
227
228 @property
229 def grpc_channel(self) -> grpc.Channel:
230 """Return the channel designed to connect to this service."""
231 return self._grpc_channel
232
233 @property
234 def create_read_session(
235 self,
236 ) -> Callable[[storage.CreateReadSessionRequest], stream.ReadSession]:
237 r"""Return a callable for the create read session method over gRPC.
238
239 Creates a new read session. A read session divides
240 the contents of a BigQuery table into one or more
241 streams, which can then be used to read data from the
242 table. The read session also specifies properties of the
243 data to be read, such as a list of columns or a
244 push-down filter describing the rows to be returned.
245
246 A particular row can be read by at most one stream. When
247 the caller has reached the end of each stream in the
248 session, then all the data in the table has been read.
249
250 Data is assigned to each stream such that roughly the
251 same number of rows can be read from each stream.
252 Because the server-side unit for assigning data is
253 collections of rows, the API does not guarantee that
254 each stream will return the same number or rows.
255 Additionally, the limits are enforced based on the
256 number of pre-filtered rows, so some filters can lead to
257 lopsided assignments.
258
259 Read sessions automatically expire 6 hours after they
260 are created and do not require manual clean-up by the
261 caller.
262
263 Returns:
264 Callable[[~.CreateReadSessionRequest],
265 ~.ReadSession]:
266 A function that, when called, will call the underlying RPC
267 on the server.
268 """
269 # Generate a "stub function" on-the-fly which will actually make
270 # the request.
271 # gRPC handles serialization and deserialization, so we just need
272 # to pass in the functions for each.
273 if "create_read_session" not in self._stubs:
274 self._stubs["create_read_session"] = self.grpc_channel.unary_unary(
275 "/google.cloud.bigquery.storage.v1.BigQueryRead/CreateReadSession",
276 request_serializer=storage.CreateReadSessionRequest.serialize,
277 response_deserializer=stream.ReadSession.deserialize,
278 )
279 return self._stubs["create_read_session"]
280
281 @property
282 def read_rows(
283 self,
284 ) -> Callable[[storage.ReadRowsRequest], storage.ReadRowsResponse]:
285 r"""Return a callable for the read rows method over gRPC.
286
287 Reads rows from the stream in the format prescribed
288 by the ReadSession. Each response contains one or more
289 table rows, up to a maximum of 100 MiB per response;
290 read requests which attempt to read individual rows
291 larger than 100 MiB will fail.
292
293 Each request also returns a set of stream statistics
294 reflecting the current state of the stream.
295
296 Returns:
297 Callable[[~.ReadRowsRequest],
298 ~.ReadRowsResponse]:
299 A function that, when called, will call the underlying RPC
300 on the server.
301 """
302 # Generate a "stub function" on-the-fly which will actually make
303 # the request.
304 # gRPC handles serialization and deserialization, so we just need
305 # to pass in the functions for each.
306 if "read_rows" not in self._stubs:
307 self._stubs["read_rows"] = self.grpc_channel.unary_stream(
308 "/google.cloud.bigquery.storage.v1.BigQueryRead/ReadRows",
309 request_serializer=storage.ReadRowsRequest.serialize,
310 response_deserializer=storage.ReadRowsResponse.deserialize,
311 )
312 return self._stubs["read_rows"]
313
314 @property
315 def split_read_stream(
316 self,
317 ) -> Callable[[storage.SplitReadStreamRequest], storage.SplitReadStreamResponse]:
318 r"""Return a callable for the split read stream method over gRPC.
319
320 Splits a given ``ReadStream`` into two ``ReadStream`` objects.
321 These ``ReadStream`` objects are referred to as the primary and
322 the residual streams of the split. The original ``ReadStream``
323 can still be read from in the same manner as before. Both of the
324 returned ``ReadStream`` objects can also be read from, and the
325 rows returned by both child streams will be the same as the rows
326 read from the original stream.
327
328 Moreover, the two child streams will be allocated back-to-back
329 in the original ``ReadStream``. Concretely, it is guaranteed
330 that for streams original, primary, and residual, that
331 original[0-j] = primary[0-j] and original[j-n] = residual[0-m]
332 once the streams have been read to completion.
333
334 Returns:
335 Callable[[~.SplitReadStreamRequest],
336 ~.SplitReadStreamResponse]:
337 A function that, when called, will call the underlying RPC
338 on the server.
339 """
340 # Generate a "stub function" on-the-fly which will actually make
341 # the request.
342 # gRPC handles serialization and deserialization, so we just need
343 # to pass in the functions for each.
344 if "split_read_stream" not in self._stubs:
345 self._stubs["split_read_stream"] = self.grpc_channel.unary_unary(
346 "/google.cloud.bigquery.storage.v1.BigQueryRead/SplitReadStream",
347 request_serializer=storage.SplitReadStreamRequest.serialize,
348 response_deserializer=storage.SplitReadStreamResponse.deserialize,
349 )
350 return self._stubs["split_read_stream"]
351
352 def close(self):
353 self.grpc_channel.close()
354
355 @property
356 def kind(self) -> str:
357 return "grpc"
358
359
360__all__ = ("BigQueryReadGrpcTransport",)