1# -*- coding: utf-8 -*-
2# Copyright 2022 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import warnings
17from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union
18
19from google.api_core import gapic_v1
20from google.api_core import grpc_helpers_async
21from google.auth import credentials as ga_credentials # type: ignore
22from google.auth.transport.grpc import SslCredentials # type: ignore
23
24import grpc # type: ignore
25from grpc.experimental import aio # type: ignore
26
27from google.cloud.bigquery_storage_v1.types import storage
28from google.cloud.bigquery_storage_v1.types import stream
29from .base import BigQueryWriteTransport, DEFAULT_CLIENT_INFO
30from .grpc import BigQueryWriteGrpcTransport
31
32
33class BigQueryWriteGrpcAsyncIOTransport(BigQueryWriteTransport):
34 """gRPC AsyncIO backend transport for BigQueryWrite.
35
36 BigQuery Write API.
37 The Write API can be used to write data to BigQuery.
38 For supplementary information about the Write API, see:
39 https://cloud.google.com/bigquery/docs/write-api
40
41 This class defines the same methods as the primary client, so the
42 primary client can load the underlying transport implementation
43 and call it.
44
45 It sends protocol buffers over the wire using gRPC (which is built on
46 top of HTTP/2); the ``grpcio`` package must be installed.
47 """
48
49 _grpc_channel: aio.Channel
50 _stubs: Dict[str, Callable] = {}
51
52 @classmethod
53 def create_channel(
54 cls,
55 host: str = "bigquerystorage.googleapis.com",
56 credentials: Optional[ga_credentials.Credentials] = None,
57 credentials_file: Optional[str] = None,
58 scopes: Optional[Sequence[str]] = None,
59 quota_project_id: Optional[str] = None,
60 **kwargs,
61 ) -> aio.Channel:
62 """Create and return a gRPC AsyncIO channel object.
63 Args:
64 host (Optional[str]): The host for the channel to use.
65 credentials (Optional[~.Credentials]): The
66 authorization credentials to attach to requests. These
67 credentials identify this application to the service. If
68 none are specified, the client will attempt to ascertain
69 the credentials from the environment.
70 credentials_file (Optional[str]): A file with credentials that can
71 be loaded with :func:`google.auth.load_credentials_from_file`.
72 This argument is ignored if ``channel`` is provided.
73 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
74 service. These are only used when credentials are not specified and
75 are passed to :func:`google.auth.default`.
76 quota_project_id (Optional[str]): An optional project to use for billing
77 and quota.
78 kwargs (Optional[dict]): Keyword arguments, which are passed to the
79 channel creation.
80 Returns:
81 aio.Channel: A gRPC AsyncIO channel object.
82 """
83
84 return grpc_helpers_async.create_channel(
85 host,
86 credentials=credentials,
87 credentials_file=credentials_file,
88 quota_project_id=quota_project_id,
89 default_scopes=cls.AUTH_SCOPES,
90 scopes=scopes,
91 default_host=cls.DEFAULT_HOST,
92 **kwargs,
93 )
94
95 def __init__(
96 self,
97 *,
98 host: str = "bigquerystorage.googleapis.com",
99 credentials: Optional[ga_credentials.Credentials] = None,
100 credentials_file: Optional[str] = None,
101 scopes: Optional[Sequence[str]] = None,
102 channel: Optional[aio.Channel] = None,
103 api_mtls_endpoint: Optional[str] = None,
104 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
105 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
106 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
107 quota_project_id: Optional[str] = None,
108 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
109 always_use_jwt_access: Optional[bool] = False,
110 api_audience: Optional[str] = None,
111 ) -> None:
112 """Instantiate the transport.
113
114 Args:
115 host (Optional[str]):
116 The hostname to connect to.
117 credentials (Optional[google.auth.credentials.Credentials]): The
118 authorization credentials to attach to requests. These
119 credentials identify the application to the service; if none
120 are specified, the client will attempt to ascertain the
121 credentials from the environment.
122 This argument is ignored if ``channel`` is provided.
123 credentials_file (Optional[str]): A file with credentials that can
124 be loaded with :func:`google.auth.load_credentials_from_file`.
125 This argument is ignored if ``channel`` is provided.
126 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
127 service. These are only used when credentials are not specified and
128 are passed to :func:`google.auth.default`.
129 channel (Optional[aio.Channel]): A ``Channel`` instance through
130 which to make calls.
131 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
132 If provided, it overrides the ``host`` argument and tries to create
133 a mutual TLS channel with client SSL credentials from
134 ``client_cert_source`` or application default SSL credentials.
135 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
136 Deprecated. A callback to provide client SSL certificate bytes and
137 private key bytes, both in PEM format. It is ignored if
138 ``api_mtls_endpoint`` is None.
139 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
140 for the grpc channel. It is ignored if ``channel`` is provided.
141 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
142 A callback to provide client certificate bytes and private key bytes,
143 both in PEM format. It is used to configure a mutual TLS channel. It is
144 ignored if ``channel`` or ``ssl_channel_credentials`` is provided.
145 quota_project_id (Optional[str]): An optional project to use for billing
146 and quota.
147 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
148 The client info used to send a user-agent string along with
149 API requests. If ``None``, then default info will be used.
150 Generally, you only need to set this if you're developing
151 your own client library.
152 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
153 be used for service account credentials.
154
155 Raises:
156 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport
157 creation failed for any reason.
158 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
159 and ``credentials_file`` are passed.
160 """
161 self._grpc_channel = None
162 self._ssl_channel_credentials = ssl_channel_credentials
163 self._stubs: Dict[str, Callable] = {}
164
165 if api_mtls_endpoint:
166 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
167 if client_cert_source:
168 warnings.warn("client_cert_source is deprecated", DeprecationWarning)
169
170 if channel:
171 # Ignore credentials if a channel was passed.
172 credentials = False
173 # If a channel was explicitly provided, set it.
174 self._grpc_channel = channel
175 self._ssl_channel_credentials = None
176 else:
177 if api_mtls_endpoint:
178 host = api_mtls_endpoint
179
180 # Create SSL credentials with client_cert_source or application
181 # default SSL credentials.
182 if client_cert_source:
183 cert, key = client_cert_source()
184 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
185 certificate_chain=cert, private_key=key
186 )
187 else:
188 self._ssl_channel_credentials = SslCredentials().ssl_credentials
189
190 else:
191 if client_cert_source_for_mtls and not ssl_channel_credentials:
192 cert, key = client_cert_source_for_mtls()
193 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
194 certificate_chain=cert, private_key=key
195 )
196
197 # The base transport sets the host, credentials and scopes
198 super().__init__(
199 host=host,
200 credentials=credentials,
201 credentials_file=credentials_file,
202 scopes=scopes,
203 quota_project_id=quota_project_id,
204 client_info=client_info,
205 always_use_jwt_access=always_use_jwt_access,
206 api_audience=api_audience,
207 )
208
209 if not self._grpc_channel:
210 self._grpc_channel = type(self).create_channel(
211 self._host,
212 # use the credentials which are saved
213 credentials=self._credentials,
214 # Set ``credentials_file`` to ``None`` here as
215 # the credentials that we saved earlier should be used.
216 credentials_file=None,
217 scopes=self._scopes,
218 ssl_credentials=self._ssl_channel_credentials,
219 quota_project_id=quota_project_id,
220 options=[
221 ("grpc.max_send_message_length", -1),
222 ("grpc.max_receive_message_length", -1),
223 ],
224 )
225
226 # Wrap messages. This must be done after self._grpc_channel exists
227 self._prep_wrapped_messages(client_info)
228
229 @property
230 def grpc_channel(self) -> aio.Channel:
231 """Create the channel designed to connect to this service.
232
233 This property caches on the instance; repeated calls return
234 the same channel.
235 """
236 # Return the channel from cache.
237 return self._grpc_channel
238
239 @property
240 def create_write_stream(
241 self,
242 ) -> Callable[[storage.CreateWriteStreamRequest], Awaitable[stream.WriteStream]]:
243 r"""Return a callable for the create write stream method over gRPC.
244
245 Creates a write stream to the given table. Additionally, every
246 table has a special stream named '_default' to which data can be
247 written. This stream doesn't need to be created using
248 CreateWriteStream. It is a stream that can be used
249 simultaneously by any number of clients. Data written to this
250 stream is considered committed as soon as an acknowledgement is
251 received.
252
253 Returns:
254 Callable[[~.CreateWriteStreamRequest],
255 Awaitable[~.WriteStream]]:
256 A function that, when called, will call the underlying RPC
257 on the server.
258 """
259 # Generate a "stub function" on-the-fly which will actually make
260 # the request.
261 # gRPC handles serialization and deserialization, so we just need
262 # to pass in the functions for each.
263 if "create_write_stream" not in self._stubs:
264 self._stubs["create_write_stream"] = self.grpc_channel.unary_unary(
265 "/google.cloud.bigquery.storage.v1.BigQueryWrite/CreateWriteStream",
266 request_serializer=storage.CreateWriteStreamRequest.serialize,
267 response_deserializer=stream.WriteStream.deserialize,
268 )
269 return self._stubs["create_write_stream"]
270
271 @property
272 def append_rows(
273 self,
274 ) -> Callable[[storage.AppendRowsRequest], Awaitable[storage.AppendRowsResponse]]:
275 r"""Return a callable for the append rows method over gRPC.
276
277 Appends data to the given stream.
278
279 If ``offset`` is specified, the ``offset`` is checked against
280 the end of stream. The server returns ``OUT_OF_RANGE`` in
281 ``AppendRowsResponse`` if an attempt is made to append to an
282 offset beyond the current end of the stream or
283 ``ALREADY_EXISTS`` if user provides an ``offset`` that has
284 already been written to. User can retry with adjusted offset
285 within the same RPC connection. If ``offset`` is not specified,
286 append happens at the end of the stream.
287
288 The response contains an optional offset at which the append
289 happened. No offset information will be returned for appends to
290 a default stream.
291
292 Responses are received in the same order in which requests are
293 sent. There will be one response for each successful inserted
294 request. Responses may optionally embed error information if the
295 originating AppendRequest was not successfully processed.
296
297 The specifics of when successfully appended data is made visible
298 to the table are governed by the type of stream:
299
300 - For COMMITTED streams (which includes the default stream),
301 data is visible immediately upon successful append.
302
303 - For BUFFERED streams, data is made visible via a subsequent
304 ``FlushRows`` rpc which advances a cursor to a newer offset
305 in the stream.
306
307 - For PENDING streams, data is not made visible until the
308 stream itself is finalized (via the ``FinalizeWriteStream``
309 rpc), and the stream is explicitly committed via the
310 ``BatchCommitWriteStreams`` rpc.
311
312 Returns:
313 Callable[[~.AppendRowsRequest],
314 Awaitable[~.AppendRowsResponse]]:
315 A function that, when called, will call the underlying RPC
316 on the server.
317 """
318 # Generate a "stub function" on-the-fly which will actually make
319 # the request.
320 # gRPC handles serialization and deserialization, so we just need
321 # to pass in the functions for each.
322 if "append_rows" not in self._stubs:
323 self._stubs["append_rows"] = self.grpc_channel.stream_stream(
324 "/google.cloud.bigquery.storage.v1.BigQueryWrite/AppendRows",
325 request_serializer=storage.AppendRowsRequest.serialize,
326 response_deserializer=storage.AppendRowsResponse.deserialize,
327 )
328 return self._stubs["append_rows"]
329
330 @property
331 def get_write_stream(
332 self,
333 ) -> Callable[[storage.GetWriteStreamRequest], Awaitable[stream.WriteStream]]:
334 r"""Return a callable for the get write stream method over gRPC.
335
336 Gets information about a write stream.
337
338 Returns:
339 Callable[[~.GetWriteStreamRequest],
340 Awaitable[~.WriteStream]]:
341 A function that, when called, will call the underlying RPC
342 on the server.
343 """
344 # Generate a "stub function" on-the-fly which will actually make
345 # the request.
346 # gRPC handles serialization and deserialization, so we just need
347 # to pass in the functions for each.
348 if "get_write_stream" not in self._stubs:
349 self._stubs["get_write_stream"] = self.grpc_channel.unary_unary(
350 "/google.cloud.bigquery.storage.v1.BigQueryWrite/GetWriteStream",
351 request_serializer=storage.GetWriteStreamRequest.serialize,
352 response_deserializer=stream.WriteStream.deserialize,
353 )
354 return self._stubs["get_write_stream"]
355
356 @property
357 def finalize_write_stream(
358 self,
359 ) -> Callable[
360 [storage.FinalizeWriteStreamRequest],
361 Awaitable[storage.FinalizeWriteStreamResponse],
362 ]:
363 r"""Return a callable for the finalize write stream method over gRPC.
364
365 Finalize a write stream so that no new data can be appended to
366 the stream. Finalize is not supported on the '_default' stream.
367
368 Returns:
369 Callable[[~.FinalizeWriteStreamRequest],
370 Awaitable[~.FinalizeWriteStreamResponse]]:
371 A function that, when called, will call the underlying RPC
372 on the server.
373 """
374 # Generate a "stub function" on-the-fly which will actually make
375 # the request.
376 # gRPC handles serialization and deserialization, so we just need
377 # to pass in the functions for each.
378 if "finalize_write_stream" not in self._stubs:
379 self._stubs["finalize_write_stream"] = self.grpc_channel.unary_unary(
380 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FinalizeWriteStream",
381 request_serializer=storage.FinalizeWriteStreamRequest.serialize,
382 response_deserializer=storage.FinalizeWriteStreamResponse.deserialize,
383 )
384 return self._stubs["finalize_write_stream"]
385
386 @property
387 def batch_commit_write_streams(
388 self,
389 ) -> Callable[
390 [storage.BatchCommitWriteStreamsRequest],
391 Awaitable[storage.BatchCommitWriteStreamsResponse],
392 ]:
393 r"""Return a callable for the batch commit write streams method over gRPC.
394
395 Atomically commits a group of ``PENDING`` streams that belong to
396 the same ``parent`` table.
397
398 Streams must be finalized before commit and cannot be committed
399 multiple times. Once a stream is committed, data in the stream
400 becomes available for read operations.
401
402 Returns:
403 Callable[[~.BatchCommitWriteStreamsRequest],
404 Awaitable[~.BatchCommitWriteStreamsResponse]]:
405 A function that, when called, will call the underlying RPC
406 on the server.
407 """
408 # Generate a "stub function" on-the-fly which will actually make
409 # the request.
410 # gRPC handles serialization and deserialization, so we just need
411 # to pass in the functions for each.
412 if "batch_commit_write_streams" not in self._stubs:
413 self._stubs["batch_commit_write_streams"] = self.grpc_channel.unary_unary(
414 "/google.cloud.bigquery.storage.v1.BigQueryWrite/BatchCommitWriteStreams",
415 request_serializer=storage.BatchCommitWriteStreamsRequest.serialize,
416 response_deserializer=storage.BatchCommitWriteStreamsResponse.deserialize,
417 )
418 return self._stubs["batch_commit_write_streams"]
419
420 @property
421 def flush_rows(
422 self,
423 ) -> Callable[[storage.FlushRowsRequest], Awaitable[storage.FlushRowsResponse]]:
424 r"""Return a callable for the flush rows method over gRPC.
425
426 Flushes rows to a BUFFERED stream.
427
428 If users are appending rows to BUFFERED stream, flush operation
429 is required in order for the rows to become available for
430 reading. A Flush operation flushes up to any previously flushed
431 offset in a BUFFERED stream, to the offset specified in the
432 request.
433
434 Flush is not supported on the \_default stream, since it is not
435 BUFFERED.
436
437 Returns:
438 Callable[[~.FlushRowsRequest],
439 Awaitable[~.FlushRowsResponse]]:
440 A function that, when called, will call the underlying RPC
441 on the server.
442 """
443 # Generate a "stub function" on-the-fly which will actually make
444 # the request.
445 # gRPC handles serialization and deserialization, so we just need
446 # to pass in the functions for each.
447 if "flush_rows" not in self._stubs:
448 self._stubs["flush_rows"] = self.grpc_channel.unary_unary(
449 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FlushRows",
450 request_serializer=storage.FlushRowsRequest.serialize,
451 response_deserializer=storage.FlushRowsResponse.deserialize,
452 )
453 return self._stubs["flush_rows"]
454
455 def close(self):
456 return self.grpc_channel.close()
457
458
459__all__ = ("BigQueryWriteGrpcAsyncIOTransport",)