1# -*- coding: utf-8 -*-
2# Copyright 2022 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import warnings
17from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union
18
19from google.api_core import gapic_v1
20from google.api_core import grpc_helpers_async
21from google.auth import credentials as ga_credentials # type: ignore
22from google.auth.transport.grpc import SslCredentials # type: ignore
23
24import grpc # type: ignore
25from grpc.experimental import aio # type: ignore
26
27from google.cloud.bigquery_storage_v1.types import storage
28from google.cloud.bigquery_storage_v1.types import stream
29from .base import BigQueryReadTransport, DEFAULT_CLIENT_INFO
30from .grpc import BigQueryReadGrpcTransport
31
32
33class BigQueryReadGrpcAsyncIOTransport(BigQueryReadTransport):
34 """gRPC AsyncIO backend transport for BigQueryRead.
35
36 BigQuery Read API.
37 The Read API can be used to read data from BigQuery.
38
39 This class defines the same methods as the primary client, so the
40 primary client can load the underlying transport implementation
41 and call it.
42
43 It sends protocol buffers over the wire using gRPC (which is built on
44 top of HTTP/2); the ``grpcio`` package must be installed.
45 """
46
47 _grpc_channel: aio.Channel
48 _stubs: Dict[str, Callable] = {}
49
50 @classmethod
51 def create_channel(
52 cls,
53 host: str = "bigquerystorage.googleapis.com",
54 credentials: Optional[ga_credentials.Credentials] = None,
55 credentials_file: Optional[str] = None,
56 scopes: Optional[Sequence[str]] = None,
57 quota_project_id: Optional[str] = None,
58 **kwargs,
59 ) -> aio.Channel:
60 """Create and return a gRPC AsyncIO channel object.
61 Args:
62 host (Optional[str]): The host for the channel to use.
63 credentials (Optional[~.Credentials]): The
64 authorization credentials to attach to requests. These
65 credentials identify this application to the service. If
66 none are specified, the client will attempt to ascertain
67 the credentials from the environment.
68 credentials_file (Optional[str]): A file with credentials that can
69 be loaded with :func:`google.auth.load_credentials_from_file`.
70 This argument is ignored if ``channel`` is provided.
71 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
72 service. These are only used when credentials are not specified and
73 are passed to :func:`google.auth.default`.
74 quota_project_id (Optional[str]): An optional project to use for billing
75 and quota.
76 kwargs (Optional[dict]): Keyword arguments, which are passed to the
77 channel creation.
78 Returns:
79 aio.Channel: A gRPC AsyncIO channel object.
80 """
81
82 return grpc_helpers_async.create_channel(
83 host,
84 credentials=credentials,
85 credentials_file=credentials_file,
86 quota_project_id=quota_project_id,
87 default_scopes=cls.AUTH_SCOPES,
88 scopes=scopes,
89 default_host=cls.DEFAULT_HOST,
90 **kwargs,
91 )
92
93 def __init__(
94 self,
95 *,
96 host: str = "bigquerystorage.googleapis.com",
97 credentials: Optional[ga_credentials.Credentials] = None,
98 credentials_file: Optional[str] = None,
99 scopes: Optional[Sequence[str]] = None,
100 channel: Optional[aio.Channel] = None,
101 api_mtls_endpoint: Optional[str] = None,
102 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
103 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
104 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
105 quota_project_id: Optional[str] = None,
106 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
107 always_use_jwt_access: Optional[bool] = False,
108 api_audience: Optional[str] = None,
109 ) -> None:
110 """Instantiate the transport.
111
112 Args:
113 host (Optional[str]):
114 The hostname to connect to.
115 credentials (Optional[google.auth.credentials.Credentials]): The
116 authorization credentials to attach to requests. These
117 credentials identify the application to the service; if none
118 are specified, the client will attempt to ascertain the
119 credentials from the environment.
120 This argument is ignored if ``channel`` is provided.
121 credentials_file (Optional[str]): A file with credentials that can
122 be loaded with :func:`google.auth.load_credentials_from_file`.
123 This argument is ignored if ``channel`` is provided.
124 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
125 service. These are only used when credentials are not specified and
126 are passed to :func:`google.auth.default`.
127 channel (Optional[aio.Channel]): A ``Channel`` instance through
128 which to make calls.
129 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
130 If provided, it overrides the ``host`` argument and tries to create
131 a mutual TLS channel with client SSL credentials from
132 ``client_cert_source`` or application default SSL credentials.
133 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
134 Deprecated. A callback to provide client SSL certificate bytes and
135 private key bytes, both in PEM format. It is ignored if
136 ``api_mtls_endpoint`` is None.
137 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
138 for the grpc channel. It is ignored if ``channel`` is provided.
139 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
140 A callback to provide client certificate bytes and private key bytes,
141 both in PEM format. It is used to configure a mutual TLS channel. It is
142 ignored if ``channel`` or ``ssl_channel_credentials`` is provided.
143 quota_project_id (Optional[str]): An optional project to use for billing
144 and quota.
145 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
146 The client info used to send a user-agent string along with
147 API requests. If ``None``, then default info will be used.
148 Generally, you only need to set this if you're developing
149 your own client library.
150 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
151 be used for service account credentials.
152
153 Raises:
154 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport
155 creation failed for any reason.
156 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
157 and ``credentials_file`` are passed.
158 """
159 self._grpc_channel = None
160 self._ssl_channel_credentials = ssl_channel_credentials
161 self._stubs: Dict[str, Callable] = {}
162
163 if api_mtls_endpoint:
164 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
165 if client_cert_source:
166 warnings.warn("client_cert_source is deprecated", DeprecationWarning)
167
168 if channel:
169 # Ignore credentials if a channel was passed.
170 credentials = False
171 # If a channel was explicitly provided, set it.
172 self._grpc_channel = channel
173 self._ssl_channel_credentials = None
174 else:
175 if api_mtls_endpoint:
176 host = api_mtls_endpoint
177
178 # Create SSL credentials with client_cert_source or application
179 # default SSL credentials.
180 if client_cert_source:
181 cert, key = client_cert_source()
182 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
183 certificate_chain=cert, private_key=key
184 )
185 else:
186 self._ssl_channel_credentials = SslCredentials().ssl_credentials
187
188 else:
189 if client_cert_source_for_mtls and not ssl_channel_credentials:
190 cert, key = client_cert_source_for_mtls()
191 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
192 certificate_chain=cert, private_key=key
193 )
194
195 # The base transport sets the host, credentials and scopes
196 super().__init__(
197 host=host,
198 credentials=credentials,
199 credentials_file=credentials_file,
200 scopes=scopes,
201 quota_project_id=quota_project_id,
202 client_info=client_info,
203 always_use_jwt_access=always_use_jwt_access,
204 api_audience=api_audience,
205 )
206
207 if not self._grpc_channel:
208 self._grpc_channel = type(self).create_channel(
209 self._host,
210 # use the credentials which are saved
211 credentials=self._credentials,
212 # Set ``credentials_file`` to ``None`` here as
213 # the credentials that we saved earlier should be used.
214 credentials_file=None,
215 scopes=self._scopes,
216 ssl_credentials=self._ssl_channel_credentials,
217 quota_project_id=quota_project_id,
218 options=[
219 ("grpc.max_send_message_length", -1),
220 ("grpc.max_receive_message_length", -1),
221 ],
222 )
223
224 # Wrap messages. This must be done after self._grpc_channel exists
225 self._prep_wrapped_messages(client_info)
226
227 @property
228 def grpc_channel(self) -> aio.Channel:
229 """Create the channel designed to connect to this service.
230
231 This property caches on the instance; repeated calls return
232 the same channel.
233 """
234 # Return the channel from cache.
235 return self._grpc_channel
236
237 @property
238 def create_read_session(
239 self,
240 ) -> Callable[[storage.CreateReadSessionRequest], Awaitable[stream.ReadSession]]:
241 r"""Return a callable for the create read session method over gRPC.
242
243 Creates a new read session. A read session divides
244 the contents of a BigQuery table into one or more
245 streams, which can then be used to read data from the
246 table. The read session also specifies properties of the
247 data to be read, such as a list of columns or a
248 push-down filter describing the rows to be returned.
249
250 A particular row can be read by at most one stream. When
251 the caller has reached the end of each stream in the
252 session, then all the data in the table has been read.
253
254 Data is assigned to each stream such that roughly the
255 same number of rows can be read from each stream.
256 Because the server-side unit for assigning data is
257 collections of rows, the API does not guarantee that
258 each stream will return the same number or rows.
259 Additionally, the limits are enforced based on the
260 number of pre-filtered rows, so some filters can lead to
261 lopsided assignments.
262
263 Read sessions automatically expire 6 hours after they
264 are created and do not require manual clean-up by the
265 caller.
266
267 Returns:
268 Callable[[~.CreateReadSessionRequest],
269 Awaitable[~.ReadSession]]:
270 A function that, when called, will call the underlying RPC
271 on the server.
272 """
273 # Generate a "stub function" on-the-fly which will actually make
274 # the request.
275 # gRPC handles serialization and deserialization, so we just need
276 # to pass in the functions for each.
277 if "create_read_session" not in self._stubs:
278 self._stubs["create_read_session"] = self.grpc_channel.unary_unary(
279 "/google.cloud.bigquery.storage.v1.BigQueryRead/CreateReadSession",
280 request_serializer=storage.CreateReadSessionRequest.serialize,
281 response_deserializer=stream.ReadSession.deserialize,
282 )
283 return self._stubs["create_read_session"]
284
285 @property
286 def read_rows(
287 self,
288 ) -> Callable[[storage.ReadRowsRequest], Awaitable[storage.ReadRowsResponse]]:
289 r"""Return a callable for the read rows method over gRPC.
290
291 Reads rows from the stream in the format prescribed
292 by the ReadSession. Each response contains one or more
293 table rows, up to a maximum of 100 MiB per response;
294 read requests which attempt to read individual rows
295 larger than 100 MiB will fail.
296
297 Each request also returns a set of stream statistics
298 reflecting the current state of the stream.
299
300 Returns:
301 Callable[[~.ReadRowsRequest],
302 Awaitable[~.ReadRowsResponse]]:
303 A function that, when called, will call the underlying RPC
304 on the server.
305 """
306 # Generate a "stub function" on-the-fly which will actually make
307 # the request.
308 # gRPC handles serialization and deserialization, so we just need
309 # to pass in the functions for each.
310 if "read_rows" not in self._stubs:
311 self._stubs["read_rows"] = self.grpc_channel.unary_stream(
312 "/google.cloud.bigquery.storage.v1.BigQueryRead/ReadRows",
313 request_serializer=storage.ReadRowsRequest.serialize,
314 response_deserializer=storage.ReadRowsResponse.deserialize,
315 )
316 return self._stubs["read_rows"]
317
318 @property
319 def split_read_stream(
320 self,
321 ) -> Callable[
322 [storage.SplitReadStreamRequest], Awaitable[storage.SplitReadStreamResponse]
323 ]:
324 r"""Return a callable for the split read stream method over gRPC.
325
326 Splits a given ``ReadStream`` into two ``ReadStream`` objects.
327 These ``ReadStream`` objects are referred to as the primary and
328 the residual streams of the split. The original ``ReadStream``
329 can still be read from in the same manner as before. Both of the
330 returned ``ReadStream`` objects can also be read from, and the
331 rows returned by both child streams will be the same as the rows
332 read from the original stream.
333
334 Moreover, the two child streams will be allocated back-to-back
335 in the original ``ReadStream``. Concretely, it is guaranteed
336 that for streams original, primary, and residual, that
337 original[0-j] = primary[0-j] and original[j-n] = residual[0-m]
338 once the streams have been read to completion.
339
340 Returns:
341 Callable[[~.SplitReadStreamRequest],
342 Awaitable[~.SplitReadStreamResponse]]:
343 A function that, when called, will call the underlying RPC
344 on the server.
345 """
346 # Generate a "stub function" on-the-fly which will actually make
347 # the request.
348 # gRPC handles serialization and deserialization, so we just need
349 # to pass in the functions for each.
350 if "split_read_stream" not in self._stubs:
351 self._stubs["split_read_stream"] = self.grpc_channel.unary_unary(
352 "/google.cloud.bigquery.storage.v1.BigQueryRead/SplitReadStream",
353 request_serializer=storage.SplitReadStreamRequest.serialize,
354 response_deserializer=storage.SplitReadStreamResponse.deserialize,
355 )
356 return self._stubs["split_read_stream"]
357
358 def close(self):
359 return self.grpc_channel.close()
360
361
362__all__ = ("BigQueryReadGrpcAsyncIOTransport",)