1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import json
17import logging as std_logging
18import pickle
19from typing import Callable, Dict, Optional, Sequence, Tuple, Union
20import warnings
21
22from google.api_core import gapic_v1, grpc_helpers
23import google.auth # type: ignore
24from google.auth import credentials as ga_credentials # type: ignore
25from google.auth.transport.grpc import SslCredentials # type: ignore
26from google.protobuf.json_format import MessageToJson
27import google.protobuf.message
28import grpc # type: ignore
29import proto # type: ignore
30
31from google.cloud.bigquery_storage_v1.types import storage, stream
32
33from .base import DEFAULT_CLIENT_INFO, BigQueryWriteTransport
34
35try:
36 from google.api_core import client_logging # type: ignore
37
38 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
39except ImportError: # pragma: NO COVER
40 CLIENT_LOGGING_SUPPORTED = False
41
42_LOGGER = std_logging.getLogger(__name__)
43
44
45class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
46 def intercept_unary_unary(self, continuation, client_call_details, request):
47 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
48 std_logging.DEBUG
49 )
50 if logging_enabled: # pragma: NO COVER
51 request_metadata = client_call_details.metadata
52 if isinstance(request, proto.Message):
53 request_payload = type(request).to_json(request)
54 elif isinstance(request, google.protobuf.message.Message):
55 request_payload = MessageToJson(request)
56 else:
57 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"
58
59 request_metadata = {
60 key: value.decode("utf-8") if isinstance(value, bytes) else value
61 for key, value in request_metadata
62 }
63 grpc_request = {
64 "payload": request_payload,
65 "requestMethod": "grpc",
66 "metadata": dict(request_metadata),
67 }
68 _LOGGER.debug(
69 f"Sending request for {client_call_details.method}",
70 extra={
71 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryWrite",
72 "rpcName": str(client_call_details.method),
73 "request": grpc_request,
74 "metadata": grpc_request["metadata"],
75 },
76 )
77 response = continuation(client_call_details, request)
78 if logging_enabled: # pragma: NO COVER
79 response_metadata = response.trailing_metadata()
80 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples
81 metadata = (
82 dict([(k, str(v)) for k, v in response_metadata])
83 if response_metadata
84 else None
85 )
86 result = response.result()
87 if isinstance(result, proto.Message):
88 response_payload = type(result).to_json(result)
89 elif isinstance(result, google.protobuf.message.Message):
90 response_payload = MessageToJson(result)
91 else:
92 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}"
93 grpc_response = {
94 "payload": response_payload,
95 "metadata": metadata,
96 "status": "OK",
97 }
98 _LOGGER.debug(
99 f"Received response for {client_call_details.method}.",
100 extra={
101 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryWrite",
102 "rpcName": client_call_details.method,
103 "response": grpc_response,
104 "metadata": grpc_response["metadata"],
105 },
106 )
107 return response
108
109
110class BigQueryWriteGrpcTransport(BigQueryWriteTransport):
111 """gRPC backend transport for BigQueryWrite.
112
113 BigQuery Write API.
114
115 The Write API can be used to write data to BigQuery.
116
117 For supplementary information about the Write API, see:
118
119 https://cloud.google.com/bigquery/docs/write-api
120
121 This class defines the same methods as the primary client, so the
122 primary client can load the underlying transport implementation
123 and call it.
124
125 It sends protocol buffers over the wire using gRPC (which is built on
126 top of HTTP/2); the ``grpcio`` package must be installed.
127 """
128
129 _stubs: Dict[str, Callable]
130
131 def __init__(
132 self,
133 *,
134 host: str = "bigquerystorage.googleapis.com",
135 credentials: Optional[ga_credentials.Credentials] = None,
136 credentials_file: Optional[str] = None,
137 scopes: Optional[Sequence[str]] = None,
138 channel: Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]] = None,
139 api_mtls_endpoint: Optional[str] = None,
140 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
141 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
142 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
143 quota_project_id: Optional[str] = None,
144 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
145 always_use_jwt_access: Optional[bool] = False,
146 api_audience: Optional[str] = None,
147 ) -> None:
148 """Instantiate the transport.
149
150 Args:
151 host (Optional[str]):
152 The hostname to connect to (default: 'bigquerystorage.googleapis.com').
153 credentials (Optional[google.auth.credentials.Credentials]): The
154 authorization credentials to attach to requests. These
155 credentials identify the application to the service; if none
156 are specified, the client will attempt to ascertain the
157 credentials from the environment.
158 This argument is ignored if a ``channel`` instance is provided.
159 credentials_file (Optional[str]): A file with credentials that can
160 be loaded with :func:`google.auth.load_credentials_from_file`.
161 This argument is ignored if a ``channel`` instance is provided.
162 scopes (Optional(Sequence[str])): A list of scopes. This argument is
163 ignored if a ``channel`` instance is provided.
164 channel (Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]]):
165 A ``Channel`` instance through which to make calls, or a Callable
166 that constructs and returns one. If set to None, ``self.create_channel``
167 is used to create the channel. If a Callable is given, it will be called
168 with the same arguments as used in ``self.create_channel``.
169 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
170 If provided, it overrides the ``host`` argument and tries to create
171 a mutual TLS channel with client SSL credentials from
172 ``client_cert_source`` or application default SSL credentials.
173 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
174 Deprecated. A callback to provide client SSL certificate bytes and
175 private key bytes, both in PEM format. It is ignored if
176 ``api_mtls_endpoint`` is None.
177 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
178 for the grpc channel. It is ignored if a ``channel`` instance is provided.
179 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
180 A callback to provide client certificate bytes and private key bytes,
181 both in PEM format. It is used to configure a mutual TLS channel. It is
182 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided.
183 quota_project_id (Optional[str]): An optional project to use for billing
184 and quota.
185 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
186 The client info used to send a user-agent string along with
187 API requests. If ``None``, then default info will be used.
188 Generally, you only need to set this if you're developing
189 your own client library.
190 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
191 be used for service account credentials.
192
193 Raises:
194 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport
195 creation failed for any reason.
196 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
197 and ``credentials_file`` are passed.
198 """
199 self._grpc_channel = None
200 self._ssl_channel_credentials = ssl_channel_credentials
201 self._stubs: Dict[str, Callable] = {}
202
203 if api_mtls_endpoint:
204 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
205 if client_cert_source:
206 warnings.warn("client_cert_source is deprecated", DeprecationWarning)
207
208 if isinstance(channel, grpc.Channel):
209 # Ignore credentials if a channel was passed.
210 credentials = None
211 self._ignore_credentials = True
212 # If a channel was explicitly provided, set it.
213 self._grpc_channel = channel
214 self._ssl_channel_credentials = None
215
216 else:
217 if api_mtls_endpoint:
218 host = api_mtls_endpoint
219
220 # Create SSL credentials with client_cert_source or application
221 # default SSL credentials.
222 if client_cert_source:
223 cert, key = client_cert_source()
224 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
225 certificate_chain=cert, private_key=key
226 )
227 else:
228 self._ssl_channel_credentials = SslCredentials().ssl_credentials
229
230 else:
231 if client_cert_source_for_mtls and not ssl_channel_credentials:
232 cert, key = client_cert_source_for_mtls()
233 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
234 certificate_chain=cert, private_key=key
235 )
236
237 # The base transport sets the host, credentials and scopes
238 super().__init__(
239 host=host,
240 credentials=credentials,
241 credentials_file=credentials_file,
242 scopes=scopes,
243 quota_project_id=quota_project_id,
244 client_info=client_info,
245 always_use_jwt_access=always_use_jwt_access,
246 api_audience=api_audience,
247 )
248
249 if not self._grpc_channel:
250 # initialize with the provided callable or the default channel
251 channel_init = channel or type(self).create_channel
252 self._grpc_channel = channel_init(
253 self._host,
254 # use the credentials which are saved
255 credentials=self._credentials,
256 # Set ``credentials_file`` to ``None`` here as
257 # the credentials that we saved earlier should be used.
258 credentials_file=None,
259 scopes=self._scopes,
260 ssl_credentials=self._ssl_channel_credentials,
261 quota_project_id=quota_project_id,
262 options=[
263 ("grpc.max_send_message_length", -1),
264 ("grpc.max_receive_message_length", -1),
265 ],
266 )
267
268 self._interceptor = _LoggingClientInterceptor()
269 self._logged_channel = grpc.intercept_channel(
270 self._grpc_channel, self._interceptor
271 )
272
273 # Wrap messages. This must be done after self._logged_channel exists
274 self._prep_wrapped_messages(client_info)
275
276 @classmethod
277 def create_channel(
278 cls,
279 host: str = "bigquerystorage.googleapis.com",
280 credentials: Optional[ga_credentials.Credentials] = None,
281 credentials_file: Optional[str] = None,
282 scopes: Optional[Sequence[str]] = None,
283 quota_project_id: Optional[str] = None,
284 **kwargs,
285 ) -> grpc.Channel:
286 """Create and return a gRPC channel object.
287 Args:
288 host (Optional[str]): The host for the channel to use.
289 credentials (Optional[~.Credentials]): The
290 authorization credentials to attach to requests. These
291 credentials identify this application to the service. If
292 none are specified, the client will attempt to ascertain
293 the credentials from the environment.
294 credentials_file (Optional[str]): A file with credentials that can
295 be loaded with :func:`google.auth.load_credentials_from_file`.
296 This argument is mutually exclusive with credentials.
297 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
298 service. These are only used when credentials are not specified and
299 are passed to :func:`google.auth.default`.
300 quota_project_id (Optional[str]): An optional project to use for billing
301 and quota.
302 kwargs (Optional[dict]): Keyword arguments, which are passed to the
303 channel creation.
304 Returns:
305 grpc.Channel: A gRPC channel object.
306
307 Raises:
308 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
309 and ``credentials_file`` are passed.
310 """
311
312 return grpc_helpers.create_channel(
313 host,
314 credentials=credentials,
315 credentials_file=credentials_file,
316 quota_project_id=quota_project_id,
317 default_scopes=cls.AUTH_SCOPES,
318 scopes=scopes,
319 default_host=cls.DEFAULT_HOST,
320 **kwargs,
321 )
322
323 @property
324 def grpc_channel(self) -> grpc.Channel:
325 """Return the channel designed to connect to this service."""
326 return self._grpc_channel
327
328 @property
329 def create_write_stream(
330 self,
331 ) -> Callable[[storage.CreateWriteStreamRequest], stream.WriteStream]:
332 r"""Return a callable for the create write stream method over gRPC.
333
334 Creates a write stream to the given table. Additionally, every
335 table has a special stream named '_default' to which data can be
336 written. This stream doesn't need to be created using
337 CreateWriteStream. It is a stream that can be used
338 simultaneously by any number of clients. Data written to this
339 stream is considered committed as soon as an acknowledgement is
340 received.
341
342 Returns:
343 Callable[[~.CreateWriteStreamRequest],
344 ~.WriteStream]:
345 A function that, when called, will call the underlying RPC
346 on the server.
347 """
348 # Generate a "stub function" on-the-fly which will actually make
349 # the request.
350 # gRPC handles serialization and deserialization, so we just need
351 # to pass in the functions for each.
352 if "create_write_stream" not in self._stubs:
353 self._stubs["create_write_stream"] = self._logged_channel.unary_unary(
354 "/google.cloud.bigquery.storage.v1.BigQueryWrite/CreateWriteStream",
355 request_serializer=storage.CreateWriteStreamRequest.serialize,
356 response_deserializer=stream.WriteStream.deserialize,
357 )
358 return self._stubs["create_write_stream"]
359
360 @property
361 def append_rows(
362 self,
363 ) -> Callable[[storage.AppendRowsRequest], storage.AppendRowsResponse]:
364 r"""Return a callable for the append rows method over gRPC.
365
366 Appends data to the given stream.
367
368 If ``offset`` is specified, the ``offset`` is checked against
369 the end of stream. The server returns ``OUT_OF_RANGE`` in
370 ``AppendRowsResponse`` if an attempt is made to append to an
371 offset beyond the current end of the stream or
372 ``ALREADY_EXISTS`` if user provides an ``offset`` that has
373 already been written to. User can retry with adjusted offset
374 within the same RPC connection. If ``offset`` is not specified,
375 append happens at the end of the stream.
376
377 The response contains an optional offset at which the append
378 happened. No offset information will be returned for appends to
379 a default stream.
380
381 Responses are received in the same order in which requests are
382 sent. There will be one response for each successful inserted
383 request. Responses may optionally embed error information if the
384 originating AppendRequest was not successfully processed.
385
386 The specifics of when successfully appended data is made visible
387 to the table are governed by the type of stream:
388
389 - For COMMITTED streams (which includes the default stream),
390 data is visible immediately upon successful append.
391
392 - For BUFFERED streams, data is made visible via a subsequent
393 ``FlushRows`` rpc which advances a cursor to a newer offset
394 in the stream.
395
396 - For PENDING streams, data is not made visible until the
397 stream itself is finalized (via the ``FinalizeWriteStream``
398 rpc), and the stream is explicitly committed via the
399 ``BatchCommitWriteStreams`` rpc.
400
401 Returns:
402 Callable[[~.AppendRowsRequest],
403 ~.AppendRowsResponse]:
404 A function that, when called, will call the underlying RPC
405 on the server.
406 """
407 # Generate a "stub function" on-the-fly which will actually make
408 # the request.
409 # gRPC handles serialization and deserialization, so we just need
410 # to pass in the functions for each.
411 if "append_rows" not in self._stubs:
412 self._stubs["append_rows"] = self._logged_channel.stream_stream(
413 "/google.cloud.bigquery.storage.v1.BigQueryWrite/AppendRows",
414 request_serializer=storage.AppendRowsRequest.serialize,
415 response_deserializer=storage.AppendRowsResponse.deserialize,
416 )
417 return self._stubs["append_rows"]
418
419 @property
420 def get_write_stream(
421 self,
422 ) -> Callable[[storage.GetWriteStreamRequest], stream.WriteStream]:
423 r"""Return a callable for the get write stream method over gRPC.
424
425 Gets information about a write stream.
426
427 Returns:
428 Callable[[~.GetWriteStreamRequest],
429 ~.WriteStream]:
430 A function that, when called, will call the underlying RPC
431 on the server.
432 """
433 # Generate a "stub function" on-the-fly which will actually make
434 # the request.
435 # gRPC handles serialization and deserialization, so we just need
436 # to pass in the functions for each.
437 if "get_write_stream" not in self._stubs:
438 self._stubs["get_write_stream"] = self._logged_channel.unary_unary(
439 "/google.cloud.bigquery.storage.v1.BigQueryWrite/GetWriteStream",
440 request_serializer=storage.GetWriteStreamRequest.serialize,
441 response_deserializer=stream.WriteStream.deserialize,
442 )
443 return self._stubs["get_write_stream"]
444
445 @property
446 def finalize_write_stream(
447 self,
448 ) -> Callable[
449 [storage.FinalizeWriteStreamRequest], storage.FinalizeWriteStreamResponse
450 ]:
451 r"""Return a callable for the finalize write stream method over gRPC.
452
453 Finalize a write stream so that no new data can be appended to
454 the stream. Finalize is not supported on the '_default' stream.
455
456 Returns:
457 Callable[[~.FinalizeWriteStreamRequest],
458 ~.FinalizeWriteStreamResponse]:
459 A function that, when called, will call the underlying RPC
460 on the server.
461 """
462 # Generate a "stub function" on-the-fly which will actually make
463 # the request.
464 # gRPC handles serialization and deserialization, so we just need
465 # to pass in the functions for each.
466 if "finalize_write_stream" not in self._stubs:
467 self._stubs["finalize_write_stream"] = self._logged_channel.unary_unary(
468 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FinalizeWriteStream",
469 request_serializer=storage.FinalizeWriteStreamRequest.serialize,
470 response_deserializer=storage.FinalizeWriteStreamResponse.deserialize,
471 )
472 return self._stubs["finalize_write_stream"]
473
474 @property
475 def batch_commit_write_streams(
476 self,
477 ) -> Callable[
478 [storage.BatchCommitWriteStreamsRequest],
479 storage.BatchCommitWriteStreamsResponse,
480 ]:
481 r"""Return a callable for the batch commit write streams method over gRPC.
482
483 Atomically commits a group of ``PENDING`` streams that belong to
484 the same ``parent`` table.
485
486 Streams must be finalized before commit and cannot be committed
487 multiple times. Once a stream is committed, data in the stream
488 becomes available for read operations.
489
490 Returns:
491 Callable[[~.BatchCommitWriteStreamsRequest],
492 ~.BatchCommitWriteStreamsResponse]:
493 A function that, when called, will call the underlying RPC
494 on the server.
495 """
496 # Generate a "stub function" on-the-fly which will actually make
497 # the request.
498 # gRPC handles serialization and deserialization, so we just need
499 # to pass in the functions for each.
500 if "batch_commit_write_streams" not in self._stubs:
501 self._stubs[
502 "batch_commit_write_streams"
503 ] = self._logged_channel.unary_unary(
504 "/google.cloud.bigquery.storage.v1.BigQueryWrite/BatchCommitWriteStreams",
505 request_serializer=storage.BatchCommitWriteStreamsRequest.serialize,
506 response_deserializer=storage.BatchCommitWriteStreamsResponse.deserialize,
507 )
508 return self._stubs["batch_commit_write_streams"]
509
510 @property
511 def flush_rows(
512 self,
513 ) -> Callable[[storage.FlushRowsRequest], storage.FlushRowsResponse]:
514 r"""Return a callable for the flush rows method over gRPC.
515
516 Flushes rows to a BUFFERED stream.
517
518 If users are appending rows to BUFFERED stream, flush operation
519 is required in order for the rows to become available for
520 reading. A Flush operation flushes up to any previously flushed
521 offset in a BUFFERED stream, to the offset specified in the
522 request.
523
524 Flush is not supported on the \_default stream, since it is not
525 BUFFERED.
526
527 Returns:
528 Callable[[~.FlushRowsRequest],
529 ~.FlushRowsResponse]:
530 A function that, when called, will call the underlying RPC
531 on the server.
532 """
533 # Generate a "stub function" on-the-fly which will actually make
534 # the request.
535 # gRPC handles serialization and deserialization, so we just need
536 # to pass in the functions for each.
537 if "flush_rows" not in self._stubs:
538 self._stubs["flush_rows"] = self._logged_channel.unary_unary(
539 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FlushRows",
540 request_serializer=storage.FlushRowsRequest.serialize,
541 response_deserializer=storage.FlushRowsResponse.deserialize,
542 )
543 return self._stubs["flush_rows"]
544
545 def close(self):
546 self._logged_channel.close()
547
548 @property
549 def kind(self) -> str:
550 return "grpc"
551
552
553__all__ = ("BigQueryWriteGrpcTransport",)