1# -*- coding: utf-8 -*-
2# Copyright 2024 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union
17import warnings
18
19from google.api_core import gapic_v1, grpc_helpers_async
20from google.auth import credentials as ga_credentials # type: ignore
21from google.auth.transport.grpc import SslCredentials # type: ignore
22import grpc # type: ignore
23from grpc.experimental import aio # type: ignore
24
25from google.cloud.bigquery_storage_v1.types import storage, stream
26
27from .base import DEFAULT_CLIENT_INFO, BigQueryWriteTransport
28from .grpc import BigQueryWriteGrpcTransport
29
30
31class BigQueryWriteGrpcAsyncIOTransport(BigQueryWriteTransport):
32 """gRPC AsyncIO backend transport for BigQueryWrite.
33
34 BigQuery Write API.
35
36 The Write API can be used to write data to BigQuery.
37
38 For supplementary information about the Write API, see:
39
40 https://cloud.google.com/bigquery/docs/write-api
41
42 This class defines the same methods as the primary client, so the
43 primary client can load the underlying transport implementation
44 and call it.
45
46 It sends protocol buffers over the wire using gRPC (which is built on
47 top of HTTP/2); the ``grpcio`` package must be installed.
48 """
49
50 _grpc_channel: aio.Channel
51 _stubs: Dict[str, Callable] = {}
52
53 @classmethod
54 def create_channel(
55 cls,
56 host: str = "bigquerystorage.googleapis.com",
57 credentials: Optional[ga_credentials.Credentials] = None,
58 credentials_file: Optional[str] = None,
59 scopes: Optional[Sequence[str]] = None,
60 quota_project_id: Optional[str] = None,
61 **kwargs,
62 ) -> aio.Channel:
63 """Create and return a gRPC AsyncIO channel object.
64 Args:
65 host (Optional[str]): The host for the channel to use.
66 credentials (Optional[~.Credentials]): The
67 authorization credentials to attach to requests. These
68 credentials identify this application to the service. If
69 none are specified, the client will attempt to ascertain
70 the credentials from the environment.
71 credentials_file (Optional[str]): A file with credentials that can
72 be loaded with :func:`google.auth.load_credentials_from_file`.
73 This argument is ignored if ``channel`` is provided.
74 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
75 service. These are only used when credentials are not specified and
76 are passed to :func:`google.auth.default`.
77 quota_project_id (Optional[str]): An optional project to use for billing
78 and quota.
79 kwargs (Optional[dict]): Keyword arguments, which are passed to the
80 channel creation.
81 Returns:
82 aio.Channel: A gRPC AsyncIO channel object.
83 """
84
85 return grpc_helpers_async.create_channel(
86 host,
87 credentials=credentials,
88 credentials_file=credentials_file,
89 quota_project_id=quota_project_id,
90 default_scopes=cls.AUTH_SCOPES,
91 scopes=scopes,
92 default_host=cls.DEFAULT_HOST,
93 **kwargs,
94 )
95
96 def __init__(
97 self,
98 *,
99 host: str = "bigquerystorage.googleapis.com",
100 credentials: Optional[ga_credentials.Credentials] = None,
101 credentials_file: Optional[str] = None,
102 scopes: Optional[Sequence[str]] = None,
103 channel: Optional[aio.Channel] = None,
104 api_mtls_endpoint: Optional[str] = None,
105 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
106 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
107 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
108 quota_project_id: Optional[str] = None,
109 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
110 always_use_jwt_access: Optional[bool] = False,
111 api_audience: Optional[str] = None,
112 ) -> None:
113 """Instantiate the transport.
114
115 Args:
116 host (Optional[str]):
117 The hostname to connect to (default: 'bigquerystorage.googleapis.com').
118 credentials (Optional[google.auth.credentials.Credentials]): The
119 authorization credentials to attach to requests. These
120 credentials identify the application to the service; if none
121 are specified, the client will attempt to ascertain the
122 credentials from the environment.
123 This argument is ignored if ``channel`` is provided.
124 credentials_file (Optional[str]): A file with credentials that can
125 be loaded with :func:`google.auth.load_credentials_from_file`.
126 This argument is ignored if ``channel`` is provided.
127 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
128 service. These are only used when credentials are not specified and
129 are passed to :func:`google.auth.default`.
130 channel (Optional[aio.Channel]): A ``Channel`` instance through
131 which to make calls.
132 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
133 If provided, it overrides the ``host`` argument and tries to create
134 a mutual TLS channel with client SSL credentials from
135 ``client_cert_source`` or application default SSL credentials.
136 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
137 Deprecated. A callback to provide client SSL certificate bytes and
138 private key bytes, both in PEM format. It is ignored if
139 ``api_mtls_endpoint`` is None.
140 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
141 for the grpc channel. It is ignored if ``channel`` is provided.
142 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
143 A callback to provide client certificate bytes and private key bytes,
144 both in PEM format. It is used to configure a mutual TLS channel. It is
145 ignored if ``channel`` or ``ssl_channel_credentials`` is provided.
146 quota_project_id (Optional[str]): An optional project to use for billing
147 and quota.
148 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
149 The client info used to send a user-agent string along with
150 API requests. If ``None``, then default info will be used.
151 Generally, you only need to set this if you're developing
152 your own client library.
153 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
154 be used for service account credentials.
155
156 Raises:
157 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport
158 creation failed for any reason.
159 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
160 and ``credentials_file`` are passed.
161 """
162 self._grpc_channel = None
163 self._ssl_channel_credentials = ssl_channel_credentials
164 self._stubs: Dict[str, Callable] = {}
165
166 if api_mtls_endpoint:
167 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
168 if client_cert_source:
169 warnings.warn("client_cert_source is deprecated", DeprecationWarning)
170
171 if channel:
172 # Ignore credentials if a channel was passed.
173 credentials = False
174 # If a channel was explicitly provided, set it.
175 self._grpc_channel = channel
176 self._ssl_channel_credentials = None
177 else:
178 if api_mtls_endpoint:
179 host = api_mtls_endpoint
180
181 # Create SSL credentials with client_cert_source or application
182 # default SSL credentials.
183 if client_cert_source:
184 cert, key = client_cert_source()
185 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
186 certificate_chain=cert, private_key=key
187 )
188 else:
189 self._ssl_channel_credentials = SslCredentials().ssl_credentials
190
191 else:
192 if client_cert_source_for_mtls and not ssl_channel_credentials:
193 cert, key = client_cert_source_for_mtls()
194 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
195 certificate_chain=cert, private_key=key
196 )
197
198 # The base transport sets the host, credentials and scopes
199 super().__init__(
200 host=host,
201 credentials=credentials,
202 credentials_file=credentials_file,
203 scopes=scopes,
204 quota_project_id=quota_project_id,
205 client_info=client_info,
206 always_use_jwt_access=always_use_jwt_access,
207 api_audience=api_audience,
208 )
209
210 if not self._grpc_channel:
211 self._grpc_channel = type(self).create_channel(
212 self._host,
213 # use the credentials which are saved
214 credentials=self._credentials,
215 # Set ``credentials_file`` to ``None`` here as
216 # the credentials that we saved earlier should be used.
217 credentials_file=None,
218 scopes=self._scopes,
219 ssl_credentials=self._ssl_channel_credentials,
220 quota_project_id=quota_project_id,
221 options=[
222 ("grpc.max_send_message_length", -1),
223 ("grpc.max_receive_message_length", -1),
224 ],
225 )
226
227 # Wrap messages. This must be done after self._grpc_channel exists
228 self._prep_wrapped_messages(client_info)
229
230 @property
231 def grpc_channel(self) -> aio.Channel:
232 """Create the channel designed to connect to this service.
233
234 This property caches on the instance; repeated calls return
235 the same channel.
236 """
237 # Return the channel from cache.
238 return self._grpc_channel
239
240 @property
241 def create_write_stream(
242 self,
243 ) -> Callable[[storage.CreateWriteStreamRequest], Awaitable[stream.WriteStream]]:
244 r"""Return a callable for the create write stream method over gRPC.
245
246 Creates a write stream to the given table. Additionally, every
247 table has a special stream named '_default' to which data can be
248 written. This stream doesn't need to be created using
249 CreateWriteStream. It is a stream that can be used
250 simultaneously by any number of clients. Data written to this
251 stream is considered committed as soon as an acknowledgement is
252 received.
253
254 Returns:
255 Callable[[~.CreateWriteStreamRequest],
256 Awaitable[~.WriteStream]]:
257 A function that, when called, will call the underlying RPC
258 on the server.
259 """
260 # Generate a "stub function" on-the-fly which will actually make
261 # the request.
262 # gRPC handles serialization and deserialization, so we just need
263 # to pass in the functions for each.
264 if "create_write_stream" not in self._stubs:
265 self._stubs["create_write_stream"] = self.grpc_channel.unary_unary(
266 "/google.cloud.bigquery.storage.v1.BigQueryWrite/CreateWriteStream",
267 request_serializer=storage.CreateWriteStreamRequest.serialize,
268 response_deserializer=stream.WriteStream.deserialize,
269 )
270 return self._stubs["create_write_stream"]
271
272 @property
273 def append_rows(
274 self,
275 ) -> Callable[[storage.AppendRowsRequest], Awaitable[storage.AppendRowsResponse]]:
276 r"""Return a callable for the append rows method over gRPC.
277
278 Appends data to the given stream.
279
280 If ``offset`` is specified, the ``offset`` is checked against
281 the end of stream. The server returns ``OUT_OF_RANGE`` in
282 ``AppendRowsResponse`` if an attempt is made to append to an
283 offset beyond the current end of the stream or
284 ``ALREADY_EXISTS`` if user provides an ``offset`` that has
285 already been written to. User can retry with adjusted offset
286 within the same RPC connection. If ``offset`` is not specified,
287 append happens at the end of the stream.
288
289 The response contains an optional offset at which the append
290 happened. No offset information will be returned for appends to
291 a default stream.
292
293 Responses are received in the same order in which requests are
294 sent. There will be one response for each successful inserted
295 request. Responses may optionally embed error information if the
296 originating AppendRequest was not successfully processed.
297
298 The specifics of when successfully appended data is made visible
299 to the table are governed by the type of stream:
300
301 - For COMMITTED streams (which includes the default stream),
302 data is visible immediately upon successful append.
303
304 - For BUFFERED streams, data is made visible via a subsequent
305 ``FlushRows`` rpc which advances a cursor to a newer offset
306 in the stream.
307
308 - For PENDING streams, data is not made visible until the
309 stream itself is finalized (via the ``FinalizeWriteStream``
310 rpc), and the stream is explicitly committed via the
311 ``BatchCommitWriteStreams`` rpc.
312
313 Returns:
314 Callable[[~.AppendRowsRequest],
315 Awaitable[~.AppendRowsResponse]]:
316 A function that, when called, will call the underlying RPC
317 on the server.
318 """
319 # Generate a "stub function" on-the-fly which will actually make
320 # the request.
321 # gRPC handles serialization and deserialization, so we just need
322 # to pass in the functions for each.
323 if "append_rows" not in self._stubs:
324 self._stubs["append_rows"] = self.grpc_channel.stream_stream(
325 "/google.cloud.bigquery.storage.v1.BigQueryWrite/AppendRows",
326 request_serializer=storage.AppendRowsRequest.serialize,
327 response_deserializer=storage.AppendRowsResponse.deserialize,
328 )
329 return self._stubs["append_rows"]
330
331 @property
332 def get_write_stream(
333 self,
334 ) -> Callable[[storage.GetWriteStreamRequest], Awaitable[stream.WriteStream]]:
335 r"""Return a callable for the get write stream method over gRPC.
336
337 Gets information about a write stream.
338
339 Returns:
340 Callable[[~.GetWriteStreamRequest],
341 Awaitable[~.WriteStream]]:
342 A function that, when called, will call the underlying RPC
343 on the server.
344 """
345 # Generate a "stub function" on-the-fly which will actually make
346 # the request.
347 # gRPC handles serialization and deserialization, so we just need
348 # to pass in the functions for each.
349 if "get_write_stream" not in self._stubs:
350 self._stubs["get_write_stream"] = self.grpc_channel.unary_unary(
351 "/google.cloud.bigquery.storage.v1.BigQueryWrite/GetWriteStream",
352 request_serializer=storage.GetWriteStreamRequest.serialize,
353 response_deserializer=stream.WriteStream.deserialize,
354 )
355 return self._stubs["get_write_stream"]
356
357 @property
358 def finalize_write_stream(
359 self,
360 ) -> Callable[
361 [storage.FinalizeWriteStreamRequest],
362 Awaitable[storage.FinalizeWriteStreamResponse],
363 ]:
364 r"""Return a callable for the finalize write stream method over gRPC.
365
366 Finalize a write stream so that no new data can be appended to
367 the stream. Finalize is not supported on the '_default' stream.
368
369 Returns:
370 Callable[[~.FinalizeWriteStreamRequest],
371 Awaitable[~.FinalizeWriteStreamResponse]]:
372 A function that, when called, will call the underlying RPC
373 on the server.
374 """
375 # Generate a "stub function" on-the-fly which will actually make
376 # the request.
377 # gRPC handles serialization and deserialization, so we just need
378 # to pass in the functions for each.
379 if "finalize_write_stream" not in self._stubs:
380 self._stubs["finalize_write_stream"] = self.grpc_channel.unary_unary(
381 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FinalizeWriteStream",
382 request_serializer=storage.FinalizeWriteStreamRequest.serialize,
383 response_deserializer=storage.FinalizeWriteStreamResponse.deserialize,
384 )
385 return self._stubs["finalize_write_stream"]
386
387 @property
388 def batch_commit_write_streams(
389 self,
390 ) -> Callable[
391 [storage.BatchCommitWriteStreamsRequest],
392 Awaitable[storage.BatchCommitWriteStreamsResponse],
393 ]:
394 r"""Return a callable for the batch commit write streams method over gRPC.
395
396 Atomically commits a group of ``PENDING`` streams that belong to
397 the same ``parent`` table.
398
399 Streams must be finalized before commit and cannot be committed
400 multiple times. Once a stream is committed, data in the stream
401 becomes available for read operations.
402
403 Returns:
404 Callable[[~.BatchCommitWriteStreamsRequest],
405 Awaitable[~.BatchCommitWriteStreamsResponse]]:
406 A function that, when called, will call the underlying RPC
407 on the server.
408 """
409 # Generate a "stub function" on-the-fly which will actually make
410 # the request.
411 # gRPC handles serialization and deserialization, so we just need
412 # to pass in the functions for each.
413 if "batch_commit_write_streams" not in self._stubs:
414 self._stubs["batch_commit_write_streams"] = self.grpc_channel.unary_unary(
415 "/google.cloud.bigquery.storage.v1.BigQueryWrite/BatchCommitWriteStreams",
416 request_serializer=storage.BatchCommitWriteStreamsRequest.serialize,
417 response_deserializer=storage.BatchCommitWriteStreamsResponse.deserialize,
418 )
419 return self._stubs["batch_commit_write_streams"]
420
421 @property
422 def flush_rows(
423 self,
424 ) -> Callable[[storage.FlushRowsRequest], Awaitable[storage.FlushRowsResponse]]:
425 r"""Return a callable for the flush rows method over gRPC.
426
427 Flushes rows to a BUFFERED stream.
428
429 If users are appending rows to BUFFERED stream, flush operation
430 is required in order for the rows to become available for
431 reading. A Flush operation flushes up to any previously flushed
432 offset in a BUFFERED stream, to the offset specified in the
433 request.
434
435 Flush is not supported on the \_default stream, since it is not
436 BUFFERED.
437
438 Returns:
439 Callable[[~.FlushRowsRequest],
440 Awaitable[~.FlushRowsResponse]]:
441 A function that, when called, will call the underlying RPC
442 on the server.
443 """
444 # Generate a "stub function" on-the-fly which will actually make
445 # the request.
446 # gRPC handles serialization and deserialization, so we just need
447 # to pass in the functions for each.
448 if "flush_rows" not in self._stubs:
449 self._stubs["flush_rows"] = self.grpc_channel.unary_unary(
450 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FlushRows",
451 request_serializer=storage.FlushRowsRequest.serialize,
452 response_deserializer=storage.FlushRowsResponse.deserialize,
453 )
454 return self._stubs["flush_rows"]
455
456 def close(self):
457 return self.grpc_channel.close()
458
459
460__all__ = ("BigQueryWriteGrpcAsyncIOTransport",)