1# -*- coding: utf-8 -*-
2# Copyright 2022 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import warnings
17from typing import Callable, Dict, Optional, Sequence, Tuple, Union
18
19from google.api_core import grpc_helpers
20from google.api_core import gapic_v1
21import google.auth # type: ignore
22from google.auth import credentials as ga_credentials # type: ignore
23from google.auth.transport.grpc import SslCredentials # type: ignore
24
25import grpc # type: ignore
26
27from google.cloud.bigquery_storage_v1.types import storage
28from google.cloud.bigquery_storage_v1.types import stream
29from .base import BigQueryWriteTransport, DEFAULT_CLIENT_INFO
30
31
32class BigQueryWriteGrpcTransport(BigQueryWriteTransport):
33 """gRPC backend transport for BigQueryWrite.
34
35 BigQuery Write API.
36 The Write API can be used to write data to BigQuery.
37 For supplementary information about the Write API, see:
38 https://cloud.google.com/bigquery/docs/write-api
39
40 This class defines the same methods as the primary client, so the
41 primary client can load the underlying transport implementation
42 and call it.
43
44 It sends protocol buffers over the wire using gRPC (which is built on
45 top of HTTP/2); the ``grpcio`` package must be installed.
46 """
47
48 _stubs: Dict[str, Callable]
49
50 def __init__(
51 self,
52 *,
53 host: str = "bigquerystorage.googleapis.com",
54 credentials: Optional[ga_credentials.Credentials] = None,
55 credentials_file: Optional[str] = None,
56 scopes: Optional[Sequence[str]] = None,
57 channel: Optional[grpc.Channel] = None,
58 api_mtls_endpoint: Optional[str] = None,
59 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
60 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
61 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
62 quota_project_id: Optional[str] = None,
63 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
64 always_use_jwt_access: Optional[bool] = False,
65 api_audience: Optional[str] = None,
66 ) -> None:
67 """Instantiate the transport.
68
69 Args:
70 host (Optional[str]):
71 The hostname to connect to.
72 credentials (Optional[google.auth.credentials.Credentials]): The
73 authorization credentials to attach to requests. These
74 credentials identify the application to the service; if none
75 are specified, the client will attempt to ascertain the
76 credentials from the environment.
77 This argument is ignored if ``channel`` is provided.
78 credentials_file (Optional[str]): A file with credentials that can
79 be loaded with :func:`google.auth.load_credentials_from_file`.
80 This argument is ignored if ``channel`` is provided.
81 scopes (Optional(Sequence[str])): A list of scopes. This argument is
82 ignored if ``channel`` is provided.
83 channel (Optional[grpc.Channel]): A ``Channel`` instance through
84 which to make calls.
85 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
86 If provided, it overrides the ``host`` argument and tries to create
87 a mutual TLS channel with client SSL credentials from
88 ``client_cert_source`` or application default SSL credentials.
89 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
90 Deprecated. A callback to provide client SSL certificate bytes and
91 private key bytes, both in PEM format. It is ignored if
92 ``api_mtls_endpoint`` is None.
93 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
94 for the grpc channel. It is ignored if ``channel`` is provided.
95 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
96 A callback to provide client certificate bytes and private key bytes,
97 both in PEM format. It is used to configure a mutual TLS channel. It is
98 ignored if ``channel`` or ``ssl_channel_credentials`` is provided.
99 quota_project_id (Optional[str]): An optional project to use for billing
100 and quota.
101 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
102 The client info used to send a user-agent string along with
103 API requests. If ``None``, then default info will be used.
104 Generally, you only need to set this if you're developing
105 your own client library.
106 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
107 be used for service account credentials.
108
109 Raises:
110 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport
111 creation failed for any reason.
112 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
113 and ``credentials_file`` are passed.
114 """
115 self._grpc_channel = None
116 self._ssl_channel_credentials = ssl_channel_credentials
117 self._stubs: Dict[str, Callable] = {}
118
119 if api_mtls_endpoint:
120 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
121 if client_cert_source:
122 warnings.warn("client_cert_source is deprecated", DeprecationWarning)
123
124 if channel:
125 # Ignore credentials if a channel was passed.
126 credentials = False
127 # If a channel was explicitly provided, set it.
128 self._grpc_channel = channel
129 self._ssl_channel_credentials = None
130
131 else:
132 if api_mtls_endpoint:
133 host = api_mtls_endpoint
134
135 # Create SSL credentials with client_cert_source or application
136 # default SSL credentials.
137 if client_cert_source:
138 cert, key = client_cert_source()
139 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
140 certificate_chain=cert, private_key=key
141 )
142 else:
143 self._ssl_channel_credentials = SslCredentials().ssl_credentials
144
145 else:
146 if client_cert_source_for_mtls and not ssl_channel_credentials:
147 cert, key = client_cert_source_for_mtls()
148 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
149 certificate_chain=cert, private_key=key
150 )
151
152 # The base transport sets the host, credentials and scopes
153 super().__init__(
154 host=host,
155 credentials=credentials,
156 credentials_file=credentials_file,
157 scopes=scopes,
158 quota_project_id=quota_project_id,
159 client_info=client_info,
160 always_use_jwt_access=always_use_jwt_access,
161 api_audience=api_audience,
162 )
163
164 if not self._grpc_channel:
165 self._grpc_channel = type(self).create_channel(
166 self._host,
167 # use the credentials which are saved
168 credentials=self._credentials,
169 # Set ``credentials_file`` to ``None`` here as
170 # the credentials that we saved earlier should be used.
171 credentials_file=None,
172 scopes=self._scopes,
173 ssl_credentials=self._ssl_channel_credentials,
174 quota_project_id=quota_project_id,
175 options=[
176 ("grpc.max_send_message_length", -1),
177 ("grpc.max_receive_message_length", -1),
178 ],
179 )
180
181 # Wrap messages. This must be done after self._grpc_channel exists
182 self._prep_wrapped_messages(client_info)
183
184 @classmethod
185 def create_channel(
186 cls,
187 host: str = "bigquerystorage.googleapis.com",
188 credentials: Optional[ga_credentials.Credentials] = None,
189 credentials_file: Optional[str] = None,
190 scopes: Optional[Sequence[str]] = None,
191 quota_project_id: Optional[str] = None,
192 **kwargs,
193 ) -> grpc.Channel:
194 """Create and return a gRPC channel object.
195 Args:
196 host (Optional[str]): The host for the channel to use.
197 credentials (Optional[~.Credentials]): The
198 authorization credentials to attach to requests. These
199 credentials identify this application to the service. If
200 none are specified, the client will attempt to ascertain
201 the credentials from the environment.
202 credentials_file (Optional[str]): A file with credentials that can
203 be loaded with :func:`google.auth.load_credentials_from_file`.
204 This argument is mutually exclusive with credentials.
205 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
206 service. These are only used when credentials are not specified and
207 are passed to :func:`google.auth.default`.
208 quota_project_id (Optional[str]): An optional project to use for billing
209 and quota.
210 kwargs (Optional[dict]): Keyword arguments, which are passed to the
211 channel creation.
212 Returns:
213 grpc.Channel: A gRPC channel object.
214
215 Raises:
216 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
217 and ``credentials_file`` are passed.
218 """
219
220 return grpc_helpers.create_channel(
221 host,
222 credentials=credentials,
223 credentials_file=credentials_file,
224 quota_project_id=quota_project_id,
225 default_scopes=cls.AUTH_SCOPES,
226 scopes=scopes,
227 default_host=cls.DEFAULT_HOST,
228 **kwargs,
229 )
230
231 @property
232 def grpc_channel(self) -> grpc.Channel:
233 """Return the channel designed to connect to this service."""
234 return self._grpc_channel
235
236 @property
237 def create_write_stream(
238 self,
239 ) -> Callable[[storage.CreateWriteStreamRequest], stream.WriteStream]:
240 r"""Return a callable for the create write stream method over gRPC.
241
242 Creates a write stream to the given table. Additionally, every
243 table has a special stream named '_default' to which data can be
244 written. This stream doesn't need to be created using
245 CreateWriteStream. It is a stream that can be used
246 simultaneously by any number of clients. Data written to this
247 stream is considered committed as soon as an acknowledgement is
248 received.
249
250 Returns:
251 Callable[[~.CreateWriteStreamRequest],
252 ~.WriteStream]:
253 A function that, when called, will call the underlying RPC
254 on the server.
255 """
256 # Generate a "stub function" on-the-fly which will actually make
257 # the request.
258 # gRPC handles serialization and deserialization, so we just need
259 # to pass in the functions for each.
260 if "create_write_stream" not in self._stubs:
261 self._stubs["create_write_stream"] = self.grpc_channel.unary_unary(
262 "/google.cloud.bigquery.storage.v1.BigQueryWrite/CreateWriteStream",
263 request_serializer=storage.CreateWriteStreamRequest.serialize,
264 response_deserializer=stream.WriteStream.deserialize,
265 )
266 return self._stubs["create_write_stream"]
267
268 @property
269 def append_rows(
270 self,
271 ) -> Callable[[storage.AppendRowsRequest], storage.AppendRowsResponse]:
272 r"""Return a callable for the append rows method over gRPC.
273
274 Appends data to the given stream.
275
276 If ``offset`` is specified, the ``offset`` is checked against
277 the end of stream. The server returns ``OUT_OF_RANGE`` in
278 ``AppendRowsResponse`` if an attempt is made to append to an
279 offset beyond the current end of the stream or
280 ``ALREADY_EXISTS`` if user provides an ``offset`` that has
281 already been written to. User can retry with adjusted offset
282 within the same RPC connection. If ``offset`` is not specified,
283 append happens at the end of the stream.
284
285 The response contains an optional offset at which the append
286 happened. No offset information will be returned for appends to
287 a default stream.
288
289 Responses are received in the same order in which requests are
290 sent. There will be one response for each successful inserted
291 request. Responses may optionally embed error information if the
292 originating AppendRequest was not successfully processed.
293
294 The specifics of when successfully appended data is made visible
295 to the table are governed by the type of stream:
296
297 - For COMMITTED streams (which includes the default stream),
298 data is visible immediately upon successful append.
299
300 - For BUFFERED streams, data is made visible via a subsequent
301 ``FlushRows`` rpc which advances a cursor to a newer offset
302 in the stream.
303
304 - For PENDING streams, data is not made visible until the
305 stream itself is finalized (via the ``FinalizeWriteStream``
306 rpc), and the stream is explicitly committed via the
307 ``BatchCommitWriteStreams`` rpc.
308
309 Returns:
310 Callable[[~.AppendRowsRequest],
311 ~.AppendRowsResponse]:
312 A function that, when called, will call the underlying RPC
313 on the server.
314 """
315 # Generate a "stub function" on-the-fly which will actually make
316 # the request.
317 # gRPC handles serialization and deserialization, so we just need
318 # to pass in the functions for each.
319 if "append_rows" not in self._stubs:
320 self._stubs["append_rows"] = self.grpc_channel.stream_stream(
321 "/google.cloud.bigquery.storage.v1.BigQueryWrite/AppendRows",
322 request_serializer=storage.AppendRowsRequest.serialize,
323 response_deserializer=storage.AppendRowsResponse.deserialize,
324 )
325 return self._stubs["append_rows"]
326
327 @property
328 def get_write_stream(
329 self,
330 ) -> Callable[[storage.GetWriteStreamRequest], stream.WriteStream]:
331 r"""Return a callable for the get write stream method over gRPC.
332
333 Gets information about a write stream.
334
335 Returns:
336 Callable[[~.GetWriteStreamRequest],
337 ~.WriteStream]:
338 A function that, when called, will call the underlying RPC
339 on the server.
340 """
341 # Generate a "stub function" on-the-fly which will actually make
342 # the request.
343 # gRPC handles serialization and deserialization, so we just need
344 # to pass in the functions for each.
345 if "get_write_stream" not in self._stubs:
346 self._stubs["get_write_stream"] = self.grpc_channel.unary_unary(
347 "/google.cloud.bigquery.storage.v1.BigQueryWrite/GetWriteStream",
348 request_serializer=storage.GetWriteStreamRequest.serialize,
349 response_deserializer=stream.WriteStream.deserialize,
350 )
351 return self._stubs["get_write_stream"]
352
353 @property
354 def finalize_write_stream(
355 self,
356 ) -> Callable[
357 [storage.FinalizeWriteStreamRequest], storage.FinalizeWriteStreamResponse
358 ]:
359 r"""Return a callable for the finalize write stream method over gRPC.
360
361 Finalize a write stream so that no new data can be appended to
362 the stream. Finalize is not supported on the '_default' stream.
363
364 Returns:
365 Callable[[~.FinalizeWriteStreamRequest],
366 ~.FinalizeWriteStreamResponse]:
367 A function that, when called, will call the underlying RPC
368 on the server.
369 """
370 # Generate a "stub function" on-the-fly which will actually make
371 # the request.
372 # gRPC handles serialization and deserialization, so we just need
373 # to pass in the functions for each.
374 if "finalize_write_stream" not in self._stubs:
375 self._stubs["finalize_write_stream"] = self.grpc_channel.unary_unary(
376 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FinalizeWriteStream",
377 request_serializer=storage.FinalizeWriteStreamRequest.serialize,
378 response_deserializer=storage.FinalizeWriteStreamResponse.deserialize,
379 )
380 return self._stubs["finalize_write_stream"]
381
382 @property
383 def batch_commit_write_streams(
384 self,
385 ) -> Callable[
386 [storage.BatchCommitWriteStreamsRequest],
387 storage.BatchCommitWriteStreamsResponse,
388 ]:
389 r"""Return a callable for the batch commit write streams method over gRPC.
390
391 Atomically commits a group of ``PENDING`` streams that belong to
392 the same ``parent`` table.
393
394 Streams must be finalized before commit and cannot be committed
395 multiple times. Once a stream is committed, data in the stream
396 becomes available for read operations.
397
398 Returns:
399 Callable[[~.BatchCommitWriteStreamsRequest],
400 ~.BatchCommitWriteStreamsResponse]:
401 A function that, when called, will call the underlying RPC
402 on the server.
403 """
404 # Generate a "stub function" on-the-fly which will actually make
405 # the request.
406 # gRPC handles serialization and deserialization, so we just need
407 # to pass in the functions for each.
408 if "batch_commit_write_streams" not in self._stubs:
409 self._stubs["batch_commit_write_streams"] = self.grpc_channel.unary_unary(
410 "/google.cloud.bigquery.storage.v1.BigQueryWrite/BatchCommitWriteStreams",
411 request_serializer=storage.BatchCommitWriteStreamsRequest.serialize,
412 response_deserializer=storage.BatchCommitWriteStreamsResponse.deserialize,
413 )
414 return self._stubs["batch_commit_write_streams"]
415
416 @property
417 def flush_rows(
418 self,
419 ) -> Callable[[storage.FlushRowsRequest], storage.FlushRowsResponse]:
420 r"""Return a callable for the flush rows method over gRPC.
421
422 Flushes rows to a BUFFERED stream.
423
424 If users are appending rows to BUFFERED stream, flush operation
425 is required in order for the rows to become available for
426 reading. A Flush operation flushes up to any previously flushed
427 offset in a BUFFERED stream, to the offset specified in the
428 request.
429
430 Flush is not supported on the \_default stream, since it is not
431 BUFFERED.
432
433 Returns:
434 Callable[[~.FlushRowsRequest],
435 ~.FlushRowsResponse]:
436 A function that, when called, will call the underlying RPC
437 on the server.
438 """
439 # Generate a "stub function" on-the-fly which will actually make
440 # the request.
441 # gRPC handles serialization and deserialization, so we just need
442 # to pass in the functions for each.
443 if "flush_rows" not in self._stubs:
444 self._stubs["flush_rows"] = self.grpc_channel.unary_unary(
445 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FlushRows",
446 request_serializer=storage.FlushRowsRequest.serialize,
447 response_deserializer=storage.FlushRowsResponse.deserialize,
448 )
449 return self._stubs["flush_rows"]
450
451 def close(self):
452 self.grpc_channel.close()
453
454 @property
455 def kind(self) -> str:
456 return "grpc"
457
458
459__all__ = ("BigQueryWriteGrpcTransport",)