1# -*- coding: utf-8 -*-
2# Copyright 2024 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from typing import Callable, Dict, Optional, Sequence, Tuple, Union
17import warnings
18
19from google.api_core import gapic_v1, grpc_helpers
20import google.auth # type: ignore
21from google.auth import credentials as ga_credentials # type: ignore
22from google.auth.transport.grpc import SslCredentials # type: ignore
23import grpc # type: ignore
24
25from google.cloud.bigquery_storage_v1.types import storage, stream
26
27from .base import DEFAULT_CLIENT_INFO, BigQueryWriteTransport
28
29
30class BigQueryWriteGrpcTransport(BigQueryWriteTransport):
31 """gRPC backend transport for BigQueryWrite.
32
33 BigQuery Write API.
34
35 The Write API can be used to write data to BigQuery.
36
37 For supplementary information about the Write API, see:
38
39 https://cloud.google.com/bigquery/docs/write-api
40
41 This class defines the same methods as the primary client, so the
42 primary client can load the underlying transport implementation
43 and call it.
44
45 It sends protocol buffers over the wire using gRPC (which is built on
46 top of HTTP/2); the ``grpcio`` package must be installed.
47 """
48
49 _stubs: Dict[str, Callable]
50
51 def __init__(
52 self,
53 *,
54 host: str = "bigquerystorage.googleapis.com",
55 credentials: Optional[ga_credentials.Credentials] = None,
56 credentials_file: Optional[str] = None,
57 scopes: Optional[Sequence[str]] = None,
58 channel: Optional[grpc.Channel] = None,
59 api_mtls_endpoint: Optional[str] = None,
60 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
61 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
62 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
63 quota_project_id: Optional[str] = None,
64 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
65 always_use_jwt_access: Optional[bool] = False,
66 api_audience: Optional[str] = None,
67 ) -> None:
68 """Instantiate the transport.
69
70 Args:
71 host (Optional[str]):
72 The hostname to connect to (default: 'bigquerystorage.googleapis.com').
73 credentials (Optional[google.auth.credentials.Credentials]): The
74 authorization credentials to attach to requests. These
75 credentials identify the application to the service; if none
76 are specified, the client will attempt to ascertain the
77 credentials from the environment.
78 This argument is ignored if ``channel`` is provided.
79 credentials_file (Optional[str]): A file with credentials that can
80 be loaded with :func:`google.auth.load_credentials_from_file`.
81 This argument is ignored if ``channel`` is provided.
82 scopes (Optional(Sequence[str])): A list of scopes. This argument is
83 ignored if ``channel`` is provided.
84 channel (Optional[grpc.Channel]): A ``Channel`` instance through
85 which to make calls.
86 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
87 If provided, it overrides the ``host`` argument and tries to create
88 a mutual TLS channel with client SSL credentials from
89 ``client_cert_source`` or application default SSL credentials.
90 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
91 Deprecated. A callback to provide client SSL certificate bytes and
92 private key bytes, both in PEM format. It is ignored if
93 ``api_mtls_endpoint`` is None.
94 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
95 for the grpc channel. It is ignored if ``channel`` is provided.
96 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
97 A callback to provide client certificate bytes and private key bytes,
98 both in PEM format. It is used to configure a mutual TLS channel. It is
99 ignored if ``channel`` or ``ssl_channel_credentials`` is provided.
100 quota_project_id (Optional[str]): An optional project to use for billing
101 and quota.
102 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
103 The client info used to send a user-agent string along with
104 API requests. If ``None``, then default info will be used.
105 Generally, you only need to set this if you're developing
106 your own client library.
107 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
108 be used for service account credentials.
109
110 Raises:
111 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport
112 creation failed for any reason.
113 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
114 and ``credentials_file`` are passed.
115 """
116 self._grpc_channel = None
117 self._ssl_channel_credentials = ssl_channel_credentials
118 self._stubs: Dict[str, Callable] = {}
119
120 if api_mtls_endpoint:
121 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
122 if client_cert_source:
123 warnings.warn("client_cert_source is deprecated", DeprecationWarning)
124
125 if channel:
126 # Ignore credentials if a channel was passed.
127 credentials = False
128 # If a channel was explicitly provided, set it.
129 self._grpc_channel = channel
130 self._ssl_channel_credentials = None
131
132 else:
133 if api_mtls_endpoint:
134 host = api_mtls_endpoint
135
136 # Create SSL credentials with client_cert_source or application
137 # default SSL credentials.
138 if client_cert_source:
139 cert, key = client_cert_source()
140 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
141 certificate_chain=cert, private_key=key
142 )
143 else:
144 self._ssl_channel_credentials = SslCredentials().ssl_credentials
145
146 else:
147 if client_cert_source_for_mtls and not ssl_channel_credentials:
148 cert, key = client_cert_source_for_mtls()
149 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
150 certificate_chain=cert, private_key=key
151 )
152
153 # The base transport sets the host, credentials and scopes
154 super().__init__(
155 host=host,
156 credentials=credentials,
157 credentials_file=credentials_file,
158 scopes=scopes,
159 quota_project_id=quota_project_id,
160 client_info=client_info,
161 always_use_jwt_access=always_use_jwt_access,
162 api_audience=api_audience,
163 )
164
165 if not self._grpc_channel:
166 self._grpc_channel = type(self).create_channel(
167 self._host,
168 # use the credentials which are saved
169 credentials=self._credentials,
170 # Set ``credentials_file`` to ``None`` here as
171 # the credentials that we saved earlier should be used.
172 credentials_file=None,
173 scopes=self._scopes,
174 ssl_credentials=self._ssl_channel_credentials,
175 quota_project_id=quota_project_id,
176 options=[
177 ("grpc.max_send_message_length", -1),
178 ("grpc.max_receive_message_length", -1),
179 ],
180 )
181
182 # Wrap messages. This must be done after self._grpc_channel exists
183 self._prep_wrapped_messages(client_info)
184
185 @classmethod
186 def create_channel(
187 cls,
188 host: str = "bigquerystorage.googleapis.com",
189 credentials: Optional[ga_credentials.Credentials] = None,
190 credentials_file: Optional[str] = None,
191 scopes: Optional[Sequence[str]] = None,
192 quota_project_id: Optional[str] = None,
193 **kwargs,
194 ) -> grpc.Channel:
195 """Create and return a gRPC channel object.
196 Args:
197 host (Optional[str]): The host for the channel to use.
198 credentials (Optional[~.Credentials]): The
199 authorization credentials to attach to requests. These
200 credentials identify this application to the service. If
201 none are specified, the client will attempt to ascertain
202 the credentials from the environment.
203 credentials_file (Optional[str]): A file with credentials that can
204 be loaded with :func:`google.auth.load_credentials_from_file`.
205 This argument is mutually exclusive with credentials.
206 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
207 service. These are only used when credentials are not specified and
208 are passed to :func:`google.auth.default`.
209 quota_project_id (Optional[str]): An optional project to use for billing
210 and quota.
211 kwargs (Optional[dict]): Keyword arguments, which are passed to the
212 channel creation.
213 Returns:
214 grpc.Channel: A gRPC channel object.
215
216 Raises:
217 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
218 and ``credentials_file`` are passed.
219 """
220
221 return grpc_helpers.create_channel(
222 host,
223 credentials=credentials,
224 credentials_file=credentials_file,
225 quota_project_id=quota_project_id,
226 default_scopes=cls.AUTH_SCOPES,
227 scopes=scopes,
228 default_host=cls.DEFAULT_HOST,
229 **kwargs,
230 )
231
232 @property
233 def grpc_channel(self) -> grpc.Channel:
234 """Return the channel designed to connect to this service."""
235 return self._grpc_channel
236
237 @property
238 def create_write_stream(
239 self,
240 ) -> Callable[[storage.CreateWriteStreamRequest], stream.WriteStream]:
241 r"""Return a callable for the create write stream method over gRPC.
242
243 Creates a write stream to the given table. Additionally, every
244 table has a special stream named '_default' to which data can be
245 written. This stream doesn't need to be created using
246 CreateWriteStream. It is a stream that can be used
247 simultaneously by any number of clients. Data written to this
248 stream is considered committed as soon as an acknowledgement is
249 received.
250
251 Returns:
252 Callable[[~.CreateWriteStreamRequest],
253 ~.WriteStream]:
254 A function that, when called, will call the underlying RPC
255 on the server.
256 """
257 # Generate a "stub function" on-the-fly which will actually make
258 # the request.
259 # gRPC handles serialization and deserialization, so we just need
260 # to pass in the functions for each.
261 if "create_write_stream" not in self._stubs:
262 self._stubs["create_write_stream"] = self.grpc_channel.unary_unary(
263 "/google.cloud.bigquery.storage.v1.BigQueryWrite/CreateWriteStream",
264 request_serializer=storage.CreateWriteStreamRequest.serialize,
265 response_deserializer=stream.WriteStream.deserialize,
266 )
267 return self._stubs["create_write_stream"]
268
269 @property
270 def append_rows(
271 self,
272 ) -> Callable[[storage.AppendRowsRequest], storage.AppendRowsResponse]:
273 r"""Return a callable for the append rows method over gRPC.
274
275 Appends data to the given stream.
276
277 If ``offset`` is specified, the ``offset`` is checked against
278 the end of stream. The server returns ``OUT_OF_RANGE`` in
279 ``AppendRowsResponse`` if an attempt is made to append to an
280 offset beyond the current end of the stream or
281 ``ALREADY_EXISTS`` if user provides an ``offset`` that has
282 already been written to. User can retry with adjusted offset
283 within the same RPC connection. If ``offset`` is not specified,
284 append happens at the end of the stream.
285
286 The response contains an optional offset at which the append
287 happened. No offset information will be returned for appends to
288 a default stream.
289
290 Responses are received in the same order in which requests are
291 sent. There will be one response for each successful inserted
292 request. Responses may optionally embed error information if the
293 originating AppendRequest was not successfully processed.
294
295 The specifics of when successfully appended data is made visible
296 to the table are governed by the type of stream:
297
298 - For COMMITTED streams (which includes the default stream),
299 data is visible immediately upon successful append.
300
301 - For BUFFERED streams, data is made visible via a subsequent
302 ``FlushRows`` rpc which advances a cursor to a newer offset
303 in the stream.
304
305 - For PENDING streams, data is not made visible until the
306 stream itself is finalized (via the ``FinalizeWriteStream``
307 rpc), and the stream is explicitly committed via the
308 ``BatchCommitWriteStreams`` rpc.
309
310 Returns:
311 Callable[[~.AppendRowsRequest],
312 ~.AppendRowsResponse]:
313 A function that, when called, will call the underlying RPC
314 on the server.
315 """
316 # Generate a "stub function" on-the-fly which will actually make
317 # the request.
318 # gRPC handles serialization and deserialization, so we just need
319 # to pass in the functions for each.
320 if "append_rows" not in self._stubs:
321 self._stubs["append_rows"] = self.grpc_channel.stream_stream(
322 "/google.cloud.bigquery.storage.v1.BigQueryWrite/AppendRows",
323 request_serializer=storage.AppendRowsRequest.serialize,
324 response_deserializer=storage.AppendRowsResponse.deserialize,
325 )
326 return self._stubs["append_rows"]
327
328 @property
329 def get_write_stream(
330 self,
331 ) -> Callable[[storage.GetWriteStreamRequest], stream.WriteStream]:
332 r"""Return a callable for the get write stream method over gRPC.
333
334 Gets information about a write stream.
335
336 Returns:
337 Callable[[~.GetWriteStreamRequest],
338 ~.WriteStream]:
339 A function that, when called, will call the underlying RPC
340 on the server.
341 """
342 # Generate a "stub function" on-the-fly which will actually make
343 # the request.
344 # gRPC handles serialization and deserialization, so we just need
345 # to pass in the functions for each.
346 if "get_write_stream" not in self._stubs:
347 self._stubs["get_write_stream"] = self.grpc_channel.unary_unary(
348 "/google.cloud.bigquery.storage.v1.BigQueryWrite/GetWriteStream",
349 request_serializer=storage.GetWriteStreamRequest.serialize,
350 response_deserializer=stream.WriteStream.deserialize,
351 )
352 return self._stubs["get_write_stream"]
353
354 @property
355 def finalize_write_stream(
356 self,
357 ) -> Callable[
358 [storage.FinalizeWriteStreamRequest], storage.FinalizeWriteStreamResponse
359 ]:
360 r"""Return a callable for the finalize write stream method over gRPC.
361
362 Finalize a write stream so that no new data can be appended to
363 the stream. Finalize is not supported on the '_default' stream.
364
365 Returns:
366 Callable[[~.FinalizeWriteStreamRequest],
367 ~.FinalizeWriteStreamResponse]:
368 A function that, when called, will call the underlying RPC
369 on the server.
370 """
371 # Generate a "stub function" on-the-fly which will actually make
372 # the request.
373 # gRPC handles serialization and deserialization, so we just need
374 # to pass in the functions for each.
375 if "finalize_write_stream" not in self._stubs:
376 self._stubs["finalize_write_stream"] = self.grpc_channel.unary_unary(
377 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FinalizeWriteStream",
378 request_serializer=storage.FinalizeWriteStreamRequest.serialize,
379 response_deserializer=storage.FinalizeWriteStreamResponse.deserialize,
380 )
381 return self._stubs["finalize_write_stream"]
382
383 @property
384 def batch_commit_write_streams(
385 self,
386 ) -> Callable[
387 [storage.BatchCommitWriteStreamsRequest],
388 storage.BatchCommitWriteStreamsResponse,
389 ]:
390 r"""Return a callable for the batch commit write streams method over gRPC.
391
392 Atomically commits a group of ``PENDING`` streams that belong to
393 the same ``parent`` table.
394
395 Streams must be finalized before commit and cannot be committed
396 multiple times. Once a stream is committed, data in the stream
397 becomes available for read operations.
398
399 Returns:
400 Callable[[~.BatchCommitWriteStreamsRequest],
401 ~.BatchCommitWriteStreamsResponse]:
402 A function that, when called, will call the underlying RPC
403 on the server.
404 """
405 # Generate a "stub function" on-the-fly which will actually make
406 # the request.
407 # gRPC handles serialization and deserialization, so we just need
408 # to pass in the functions for each.
409 if "batch_commit_write_streams" not in self._stubs:
410 self._stubs["batch_commit_write_streams"] = self.grpc_channel.unary_unary(
411 "/google.cloud.bigquery.storage.v1.BigQueryWrite/BatchCommitWriteStreams",
412 request_serializer=storage.BatchCommitWriteStreamsRequest.serialize,
413 response_deserializer=storage.BatchCommitWriteStreamsResponse.deserialize,
414 )
415 return self._stubs["batch_commit_write_streams"]
416
417 @property
418 def flush_rows(
419 self,
420 ) -> Callable[[storage.FlushRowsRequest], storage.FlushRowsResponse]:
421 r"""Return a callable for the flush rows method over gRPC.
422
423 Flushes rows to a BUFFERED stream.
424
425 If users are appending rows to BUFFERED stream, flush operation
426 is required in order for the rows to become available for
427 reading. A Flush operation flushes up to any previously flushed
428 offset in a BUFFERED stream, to the offset specified in the
429 request.
430
431 Flush is not supported on the \_default stream, since it is not
432 BUFFERED.
433
434 Returns:
435 Callable[[~.FlushRowsRequest],
436 ~.FlushRowsResponse]:
437 A function that, when called, will call the underlying RPC
438 on the server.
439 """
440 # Generate a "stub function" on-the-fly which will actually make
441 # the request.
442 # gRPC handles serialization and deserialization, so we just need
443 # to pass in the functions for each.
444 if "flush_rows" not in self._stubs:
445 self._stubs["flush_rows"] = self.grpc_channel.unary_unary(
446 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FlushRows",
447 request_serializer=storage.FlushRowsRequest.serialize,
448 response_deserializer=storage.FlushRowsResponse.deserialize,
449 )
450 return self._stubs["flush_rows"]
451
452 def close(self):
453 self.grpc_channel.close()
454
455 @property
456 def kind(self) -> str:
457 return "grpc"
458
459
460__all__ = ("BigQueryWriteGrpcTransport",)