1# -*- coding: utf-8 -*-
2# Copyright 2024 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union
17import warnings
18
19from google.api_core import gapic_v1, grpc_helpers_async
20from google.auth import credentials as ga_credentials # type: ignore
21from google.auth.transport.grpc import SslCredentials # type: ignore
22import grpc # type: ignore
23from grpc.experimental import aio # type: ignore
24
25from google.cloud.bigquery_storage_v1.types import storage, stream
26
27from .base import DEFAULT_CLIENT_INFO, BigQueryReadTransport
28from .grpc import BigQueryReadGrpcTransport
29
30
31class BigQueryReadGrpcAsyncIOTransport(BigQueryReadTransport):
32 """gRPC AsyncIO backend transport for BigQueryRead.
33
34 BigQuery Read API.
35
36 The Read API can be used to read data from BigQuery.
37
38 This class defines the same methods as the primary client, so the
39 primary client can load the underlying transport implementation
40 and call it.
41
42 It sends protocol buffers over the wire using gRPC (which is built on
43 top of HTTP/2); the ``grpcio`` package must be installed.
44 """
45
46 _grpc_channel: aio.Channel
47 _stubs: Dict[str, Callable] = {}
48
49 @classmethod
50 def create_channel(
51 cls,
52 host: str = "bigquerystorage.googleapis.com",
53 credentials: Optional[ga_credentials.Credentials] = None,
54 credentials_file: Optional[str] = None,
55 scopes: Optional[Sequence[str]] = None,
56 quota_project_id: Optional[str] = None,
57 **kwargs,
58 ) -> aio.Channel:
59 """Create and return a gRPC AsyncIO channel object.
60 Args:
61 host (Optional[str]): The host for the channel to use.
62 credentials (Optional[~.Credentials]): The
63 authorization credentials to attach to requests. These
64 credentials identify this application to the service. If
65 none are specified, the client will attempt to ascertain
66 the credentials from the environment.
67 credentials_file (Optional[str]): A file with credentials that can
68 be loaded with :func:`google.auth.load_credentials_from_file`.
69 This argument is ignored if ``channel`` is provided.
70 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
71 service. These are only used when credentials are not specified and
72 are passed to :func:`google.auth.default`.
73 quota_project_id (Optional[str]): An optional project to use for billing
74 and quota.
75 kwargs (Optional[dict]): Keyword arguments, which are passed to the
76 channel creation.
77 Returns:
78 aio.Channel: A gRPC AsyncIO channel object.
79 """
80
81 return grpc_helpers_async.create_channel(
82 host,
83 credentials=credentials,
84 credentials_file=credentials_file,
85 quota_project_id=quota_project_id,
86 default_scopes=cls.AUTH_SCOPES,
87 scopes=scopes,
88 default_host=cls.DEFAULT_HOST,
89 **kwargs,
90 )
91
92 def __init__(
93 self,
94 *,
95 host: str = "bigquerystorage.googleapis.com",
96 credentials: Optional[ga_credentials.Credentials] = None,
97 credentials_file: Optional[str] = None,
98 scopes: Optional[Sequence[str]] = None,
99 channel: Optional[aio.Channel] = None,
100 api_mtls_endpoint: Optional[str] = None,
101 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
102 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
103 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
104 quota_project_id: Optional[str] = None,
105 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
106 always_use_jwt_access: Optional[bool] = False,
107 api_audience: Optional[str] = None,
108 ) -> None:
109 """Instantiate the transport.
110
111 Args:
112 host (Optional[str]):
113 The hostname to connect to (default: 'bigquerystorage.googleapis.com').
114 credentials (Optional[google.auth.credentials.Credentials]): The
115 authorization credentials to attach to requests. These
116 credentials identify the application to the service; if none
117 are specified, the client will attempt to ascertain the
118 credentials from the environment.
119 This argument is ignored if ``channel`` is provided.
120 credentials_file (Optional[str]): A file with credentials that can
121 be loaded with :func:`google.auth.load_credentials_from_file`.
122 This argument is ignored if ``channel`` is provided.
123 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
124 service. These are only used when credentials are not specified and
125 are passed to :func:`google.auth.default`.
126 channel (Optional[aio.Channel]): A ``Channel`` instance through
127 which to make calls.
128 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
129 If provided, it overrides the ``host`` argument and tries to create
130 a mutual TLS channel with client SSL credentials from
131 ``client_cert_source`` or application default SSL credentials.
132 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
133 Deprecated. A callback to provide client SSL certificate bytes and
134 private key bytes, both in PEM format. It is ignored if
135 ``api_mtls_endpoint`` is None.
136 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
137 for the grpc channel. It is ignored if ``channel`` is provided.
138 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
139 A callback to provide client certificate bytes and private key bytes,
140 both in PEM format. It is used to configure a mutual TLS channel. It is
141 ignored if ``channel`` or ``ssl_channel_credentials`` is provided.
142 quota_project_id (Optional[str]): An optional project to use for billing
143 and quota.
144 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
145 The client info used to send a user-agent string along with
146 API requests. If ``None``, then default info will be used.
147 Generally, you only need to set this if you're developing
148 your own client library.
149 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
150 be used for service account credentials.
151
152 Raises:
153 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport
154 creation failed for any reason.
155 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
156 and ``credentials_file`` are passed.
157 """
158 self._grpc_channel = None
159 self._ssl_channel_credentials = ssl_channel_credentials
160 self._stubs: Dict[str, Callable] = {}
161
162 if api_mtls_endpoint:
163 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
164 if client_cert_source:
165 warnings.warn("client_cert_source is deprecated", DeprecationWarning)
166
167 if channel:
168 # Ignore credentials if a channel was passed.
169 credentials = False
170 # If a channel was explicitly provided, set it.
171 self._grpc_channel = channel
172 self._ssl_channel_credentials = None
173 else:
174 if api_mtls_endpoint:
175 host = api_mtls_endpoint
176
177 # Create SSL credentials with client_cert_source or application
178 # default SSL credentials.
179 if client_cert_source:
180 cert, key = client_cert_source()
181 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
182 certificate_chain=cert, private_key=key
183 )
184 else:
185 self._ssl_channel_credentials = SslCredentials().ssl_credentials
186
187 else:
188 if client_cert_source_for_mtls and not ssl_channel_credentials:
189 cert, key = client_cert_source_for_mtls()
190 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
191 certificate_chain=cert, private_key=key
192 )
193
194 # The base transport sets the host, credentials and scopes
195 super().__init__(
196 host=host,
197 credentials=credentials,
198 credentials_file=credentials_file,
199 scopes=scopes,
200 quota_project_id=quota_project_id,
201 client_info=client_info,
202 always_use_jwt_access=always_use_jwt_access,
203 api_audience=api_audience,
204 )
205
206 if not self._grpc_channel:
207 self._grpc_channel = type(self).create_channel(
208 self._host,
209 # use the credentials which are saved
210 credentials=self._credentials,
211 # Set ``credentials_file`` to ``None`` here as
212 # the credentials that we saved earlier should be used.
213 credentials_file=None,
214 scopes=self._scopes,
215 ssl_credentials=self._ssl_channel_credentials,
216 quota_project_id=quota_project_id,
217 options=[
218 ("grpc.max_send_message_length", -1),
219 ("grpc.max_receive_message_length", -1),
220 ],
221 )
222
223 # Wrap messages. This must be done after self._grpc_channel exists
224 self._prep_wrapped_messages(client_info)
225
226 @property
227 def grpc_channel(self) -> aio.Channel:
228 """Create the channel designed to connect to this service.
229
230 This property caches on the instance; repeated calls return
231 the same channel.
232 """
233 # Return the channel from cache.
234 return self._grpc_channel
235
236 @property
237 def create_read_session(
238 self,
239 ) -> Callable[[storage.CreateReadSessionRequest], Awaitable[stream.ReadSession]]:
240 r"""Return a callable for the create read session method over gRPC.
241
242 Creates a new read session. A read session divides
243 the contents of a BigQuery table into one or more
244 streams, which can then be used to read data from the
245 table. The read session also specifies properties of the
246 data to be read, such as a list of columns or a
247 push-down filter describing the rows to be returned.
248
249 A particular row can be read by at most one stream. When
250 the caller has reached the end of each stream in the
251 session, then all the data in the table has been read.
252
253 Data is assigned to each stream such that roughly the
254 same number of rows can be read from each stream.
255 Because the server-side unit for assigning data is
256 collections of rows, the API does not guarantee that
257 each stream will return the same number or rows.
258 Additionally, the limits are enforced based on the
259 number of pre-filtered rows, so some filters can lead to
260 lopsided assignments.
261
262 Read sessions automatically expire 6 hours after they
263 are created and do not require manual clean-up by the
264 caller.
265
266 Returns:
267 Callable[[~.CreateReadSessionRequest],
268 Awaitable[~.ReadSession]]:
269 A function that, when called, will call the underlying RPC
270 on the server.
271 """
272 # Generate a "stub function" on-the-fly which will actually make
273 # the request.
274 # gRPC handles serialization and deserialization, so we just need
275 # to pass in the functions for each.
276 if "create_read_session" not in self._stubs:
277 self._stubs["create_read_session"] = self.grpc_channel.unary_unary(
278 "/google.cloud.bigquery.storage.v1.BigQueryRead/CreateReadSession",
279 request_serializer=storage.CreateReadSessionRequest.serialize,
280 response_deserializer=stream.ReadSession.deserialize,
281 )
282 return self._stubs["create_read_session"]
283
284 @property
285 def read_rows(
286 self,
287 ) -> Callable[[storage.ReadRowsRequest], Awaitable[storage.ReadRowsResponse]]:
288 r"""Return a callable for the read rows method over gRPC.
289
290 Reads rows from the stream in the format prescribed
291 by the ReadSession. Each response contains one or more
292 table rows, up to a maximum of 100 MiB per response;
293 read requests which attempt to read individual rows
294 larger than 100 MiB will fail.
295
296 Each request also returns a set of stream statistics
297 reflecting the current state of the stream.
298
299 Returns:
300 Callable[[~.ReadRowsRequest],
301 Awaitable[~.ReadRowsResponse]]:
302 A function that, when called, will call the underlying RPC
303 on the server.
304 """
305 # Generate a "stub function" on-the-fly which will actually make
306 # the request.
307 # gRPC handles serialization and deserialization, so we just need
308 # to pass in the functions for each.
309 if "read_rows" not in self._stubs:
310 self._stubs["read_rows"] = self.grpc_channel.unary_stream(
311 "/google.cloud.bigquery.storage.v1.BigQueryRead/ReadRows",
312 request_serializer=storage.ReadRowsRequest.serialize,
313 response_deserializer=storage.ReadRowsResponse.deserialize,
314 )
315 return self._stubs["read_rows"]
316
317 @property
318 def split_read_stream(
319 self,
320 ) -> Callable[
321 [storage.SplitReadStreamRequest], Awaitable[storage.SplitReadStreamResponse]
322 ]:
323 r"""Return a callable for the split read stream method over gRPC.
324
325 Splits a given ``ReadStream`` into two ``ReadStream`` objects.
326 These ``ReadStream`` objects are referred to as the primary and
327 the residual streams of the split. The original ``ReadStream``
328 can still be read from in the same manner as before. Both of the
329 returned ``ReadStream`` objects can also be read from, and the
330 rows returned by both child streams will be the same as the rows
331 read from the original stream.
332
333 Moreover, the two child streams will be allocated back-to-back
334 in the original ``ReadStream``. Concretely, it is guaranteed
335 that for streams original, primary, and residual, that
336 original[0-j] = primary[0-j] and original[j-n] = residual[0-m]
337 once the streams have been read to completion.
338
339 Returns:
340 Callable[[~.SplitReadStreamRequest],
341 Awaitable[~.SplitReadStreamResponse]]:
342 A function that, when called, will call the underlying RPC
343 on the server.
344 """
345 # Generate a "stub function" on-the-fly which will actually make
346 # the request.
347 # gRPC handles serialization and deserialization, so we just need
348 # to pass in the functions for each.
349 if "split_read_stream" not in self._stubs:
350 self._stubs["split_read_stream"] = self.grpc_channel.unary_unary(
351 "/google.cloud.bigquery.storage.v1.BigQueryRead/SplitReadStream",
352 request_serializer=storage.SplitReadStreamRequest.serialize,
353 response_deserializer=storage.SplitReadStreamResponse.deserialize,
354 )
355 return self._stubs["split_read_stream"]
356
357 def close(self):
358 return self.grpc_channel.close()
359
360
361__all__ = ("BigQueryReadGrpcAsyncIOTransport",)