1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import json
17import logging as std_logging
18import pickle
19from typing import Callable, Dict, Optional, Sequence, Tuple, Union
20import warnings
21
22from google.api_core import gapic_v1, grpc_helpers
23import google.auth # type: ignore
24from google.auth import credentials as ga_credentials # type: ignore
25from google.auth.transport.grpc import SslCredentials # type: ignore
26from google.protobuf.json_format import MessageToJson
27import google.protobuf.message
28import grpc # type: ignore
29import proto # type: ignore
30
31from google.cloud.bigquery_storage_v1.types import storage, stream
32
33from .base import DEFAULT_CLIENT_INFO, BigQueryReadTransport
34
35try:
36 from google.api_core import client_logging # type: ignore
37
38 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
39except ImportError: # pragma: NO COVER
40 CLIENT_LOGGING_SUPPORTED = False
41
42_LOGGER = std_logging.getLogger(__name__)
43
44
45class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
46 def intercept_unary_unary(self, continuation, client_call_details, request):
47 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
48 std_logging.DEBUG
49 )
50 if logging_enabled: # pragma: NO COVER
51 request_metadata = client_call_details.metadata
52 if isinstance(request, proto.Message):
53 request_payload = type(request).to_json(request)
54 elif isinstance(request, google.protobuf.message.Message):
55 request_payload = MessageToJson(request)
56 else:
57 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"
58
59 request_metadata = {
60 key: value.decode("utf-8") if isinstance(value, bytes) else value
61 for key, value in request_metadata
62 }
63 grpc_request = {
64 "payload": request_payload,
65 "requestMethod": "grpc",
66 "metadata": dict(request_metadata),
67 }
68 _LOGGER.debug(
69 f"Sending request for {client_call_details.method}",
70 extra={
71 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryRead",
72 "rpcName": str(client_call_details.method),
73 "request": grpc_request,
74 "metadata": grpc_request["metadata"],
75 },
76 )
77 response = continuation(client_call_details, request)
78 if logging_enabled: # pragma: NO COVER
79 response_metadata = response.trailing_metadata()
80 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples
81 metadata = (
82 dict([(k, str(v)) for k, v in response_metadata])
83 if response_metadata
84 else None
85 )
86 result = response.result()
87 if isinstance(result, proto.Message):
88 response_payload = type(result).to_json(result)
89 elif isinstance(result, google.protobuf.message.Message):
90 response_payload = MessageToJson(result)
91 else:
92 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}"
93 grpc_response = {
94 "payload": response_payload,
95 "metadata": metadata,
96 "status": "OK",
97 }
98 _LOGGER.debug(
99 f"Received response for {client_call_details.method}.",
100 extra={
101 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryRead",
102 "rpcName": client_call_details.method,
103 "response": grpc_response,
104 "metadata": grpc_response["metadata"],
105 },
106 )
107 return response
108
109
110class BigQueryReadGrpcTransport(BigQueryReadTransport):
111 """gRPC backend transport for BigQueryRead.
112
113 BigQuery Read API.
114
115 The Read API can be used to read data from BigQuery.
116
117 This class defines the same methods as the primary client, so the
118 primary client can load the underlying transport implementation
119 and call it.
120
121 It sends protocol buffers over the wire using gRPC (which is built on
122 top of HTTP/2); the ``grpcio`` package must be installed.
123 """
124
125 _stubs: Dict[str, Callable]
126
127 def __init__(
128 self,
129 *,
130 host: str = "bigquerystorage.googleapis.com",
131 credentials: Optional[ga_credentials.Credentials] = None,
132 credentials_file: Optional[str] = None,
133 scopes: Optional[Sequence[str]] = None,
134 channel: Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]] = None,
135 api_mtls_endpoint: Optional[str] = None,
136 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
137 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
138 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
139 quota_project_id: Optional[str] = None,
140 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
141 always_use_jwt_access: Optional[bool] = False,
142 api_audience: Optional[str] = None,
143 ) -> None:
144 """Instantiate the transport.
145
146 Args:
147 host (Optional[str]):
148 The hostname to connect to (default: 'bigquerystorage.googleapis.com').
149 credentials (Optional[google.auth.credentials.Credentials]): The
150 authorization credentials to attach to requests. These
151 credentials identify the application to the service; if none
152 are specified, the client will attempt to ascertain the
153 credentials from the environment.
154 This argument is ignored if a ``channel`` instance is provided.
155 credentials_file (Optional[str]): A file with credentials that can
156 be loaded with :func:`google.auth.load_credentials_from_file`.
157 This argument is ignored if a ``channel`` instance is provided.
158 scopes (Optional(Sequence[str])): A list of scopes. This argument is
159 ignored if a ``channel`` instance is provided.
160 channel (Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]]):
161 A ``Channel`` instance through which to make calls, or a Callable
162 that constructs and returns one. If set to None, ``self.create_channel``
163 is used to create the channel. If a Callable is given, it will be called
164 with the same arguments as used in ``self.create_channel``.
165 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
166 If provided, it overrides the ``host`` argument and tries to create
167 a mutual TLS channel with client SSL credentials from
168 ``client_cert_source`` or application default SSL credentials.
169 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
170 Deprecated. A callback to provide client SSL certificate bytes and
171 private key bytes, both in PEM format. It is ignored if
172 ``api_mtls_endpoint`` is None.
173 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
174 for the grpc channel. It is ignored if a ``channel`` instance is provided.
175 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
176 A callback to provide client certificate bytes and private key bytes,
177 both in PEM format. It is used to configure a mutual TLS channel. It is
178 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided.
179 quota_project_id (Optional[str]): An optional project to use for billing
180 and quota.
181 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
182 The client info used to send a user-agent string along with
183 API requests. If ``None``, then default info will be used.
184 Generally, you only need to set this if you're developing
185 your own client library.
186 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
187 be used for service account credentials.
188
189 Raises:
190 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport
191 creation failed for any reason.
192 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
193 and ``credentials_file`` are passed.
194 """
195 self._grpc_channel = None
196 self._ssl_channel_credentials = ssl_channel_credentials
197 self._stubs: Dict[str, Callable] = {}
198
199 if api_mtls_endpoint:
200 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
201 if client_cert_source:
202 warnings.warn("client_cert_source is deprecated", DeprecationWarning)
203
204 if isinstance(channel, grpc.Channel):
205 # Ignore credentials if a channel was passed.
206 credentials = None
207 self._ignore_credentials = True
208 # If a channel was explicitly provided, set it.
209 self._grpc_channel = channel
210 self._ssl_channel_credentials = None
211
212 else:
213 if api_mtls_endpoint:
214 host = api_mtls_endpoint
215
216 # Create SSL credentials with client_cert_source or application
217 # default SSL credentials.
218 if client_cert_source:
219 cert, key = client_cert_source()
220 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
221 certificate_chain=cert, private_key=key
222 )
223 else:
224 self._ssl_channel_credentials = SslCredentials().ssl_credentials
225
226 else:
227 if client_cert_source_for_mtls and not ssl_channel_credentials:
228 cert, key = client_cert_source_for_mtls()
229 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
230 certificate_chain=cert, private_key=key
231 )
232
233 # The base transport sets the host, credentials and scopes
234 super().__init__(
235 host=host,
236 credentials=credentials,
237 credentials_file=credentials_file,
238 scopes=scopes,
239 quota_project_id=quota_project_id,
240 client_info=client_info,
241 always_use_jwt_access=always_use_jwt_access,
242 api_audience=api_audience,
243 )
244
245 if not self._grpc_channel:
246 # initialize with the provided callable or the default channel
247 channel_init = channel or type(self).create_channel
248 self._grpc_channel = channel_init(
249 self._host,
250 # use the credentials which are saved
251 credentials=self._credentials,
252 # Set ``credentials_file`` to ``None`` here as
253 # the credentials that we saved earlier should be used.
254 credentials_file=None,
255 scopes=self._scopes,
256 ssl_credentials=self._ssl_channel_credentials,
257 quota_project_id=quota_project_id,
258 options=[
259 ("grpc.max_send_message_length", -1),
260 ("grpc.max_receive_message_length", -1),
261 ],
262 )
263
264 self._interceptor = _LoggingClientInterceptor()
265 self._logged_channel = grpc.intercept_channel(
266 self._grpc_channel, self._interceptor
267 )
268
269 # Wrap messages. This must be done after self._logged_channel exists
270 self._prep_wrapped_messages(client_info)
271
272 @classmethod
273 def create_channel(
274 cls,
275 host: str = "bigquerystorage.googleapis.com",
276 credentials: Optional[ga_credentials.Credentials] = None,
277 credentials_file: Optional[str] = None,
278 scopes: Optional[Sequence[str]] = None,
279 quota_project_id: Optional[str] = None,
280 **kwargs,
281 ) -> grpc.Channel:
282 """Create and return a gRPC channel object.
283 Args:
284 host (Optional[str]): The host for the channel to use.
285 credentials (Optional[~.Credentials]): The
286 authorization credentials to attach to requests. These
287 credentials identify this application to the service. If
288 none are specified, the client will attempt to ascertain
289 the credentials from the environment.
290 credentials_file (Optional[str]): A file with credentials that can
291 be loaded with :func:`google.auth.load_credentials_from_file`.
292 This argument is mutually exclusive with credentials.
293 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
294 service. These are only used when credentials are not specified and
295 are passed to :func:`google.auth.default`.
296 quota_project_id (Optional[str]): An optional project to use for billing
297 and quota.
298 kwargs (Optional[dict]): Keyword arguments, which are passed to the
299 channel creation.
300 Returns:
301 grpc.Channel: A gRPC channel object.
302
303 Raises:
304 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
305 and ``credentials_file`` are passed.
306 """
307
308 return grpc_helpers.create_channel(
309 host,
310 credentials=credentials,
311 credentials_file=credentials_file,
312 quota_project_id=quota_project_id,
313 default_scopes=cls.AUTH_SCOPES,
314 scopes=scopes,
315 default_host=cls.DEFAULT_HOST,
316 **kwargs,
317 )
318
319 @property
320 def grpc_channel(self) -> grpc.Channel:
321 """Return the channel designed to connect to this service."""
322 return self._grpc_channel
323
324 @property
325 def create_read_session(
326 self,
327 ) -> Callable[[storage.CreateReadSessionRequest], stream.ReadSession]:
328 r"""Return a callable for the create read session method over gRPC.
329
330 Creates a new read session. A read session divides
331 the contents of a BigQuery table into one or more
332 streams, which can then be used to read data from the
333 table. The read session also specifies properties of the
334 data to be read, such as a list of columns or a
335 push-down filter describing the rows to be returned.
336
337 A particular row can be read by at most one stream. When
338 the caller has reached the end of each stream in the
339 session, then all the data in the table has been read.
340
341 Data is assigned to each stream such that roughly the
342 same number of rows can be read from each stream.
343 Because the server-side unit for assigning data is
344 collections of rows, the API does not guarantee that
345 each stream will return the same number or rows.
346 Additionally, the limits are enforced based on the
347 number of pre-filtered rows, so some filters can lead to
348 lopsided assignments.
349
350 Read sessions automatically expire 6 hours after they
351 are created and do not require manual clean-up by the
352 caller.
353
354 Returns:
355 Callable[[~.CreateReadSessionRequest],
356 ~.ReadSession]:
357 A function that, when called, will call the underlying RPC
358 on the server.
359 """
360 # Generate a "stub function" on-the-fly which will actually make
361 # the request.
362 # gRPC handles serialization and deserialization, so we just need
363 # to pass in the functions for each.
364 if "create_read_session" not in self._stubs:
365 self._stubs["create_read_session"] = self._logged_channel.unary_unary(
366 "/google.cloud.bigquery.storage.v1.BigQueryRead/CreateReadSession",
367 request_serializer=storage.CreateReadSessionRequest.serialize,
368 response_deserializer=stream.ReadSession.deserialize,
369 )
370 return self._stubs["create_read_session"]
371
372 @property
373 def read_rows(
374 self,
375 ) -> Callable[[storage.ReadRowsRequest], storage.ReadRowsResponse]:
376 r"""Return a callable for the read rows method over gRPC.
377
378 Reads rows from the stream in the format prescribed
379 by the ReadSession. Each response contains one or more
380 table rows, up to a maximum of 100 MiB per response;
381 read requests which attempt to read individual rows
382 larger than 100 MiB will fail.
383
384 Each request also returns a set of stream statistics
385 reflecting the current state of the stream.
386
387 Returns:
388 Callable[[~.ReadRowsRequest],
389 ~.ReadRowsResponse]:
390 A function that, when called, will call the underlying RPC
391 on the server.
392 """
393 # Generate a "stub function" on-the-fly which will actually make
394 # the request.
395 # gRPC handles serialization and deserialization, so we just need
396 # to pass in the functions for each.
397 if "read_rows" not in self._stubs:
398 self._stubs["read_rows"] = self._logged_channel.unary_stream(
399 "/google.cloud.bigquery.storage.v1.BigQueryRead/ReadRows",
400 request_serializer=storage.ReadRowsRequest.serialize,
401 response_deserializer=storage.ReadRowsResponse.deserialize,
402 )
403 return self._stubs["read_rows"]
404
405 @property
406 def split_read_stream(
407 self,
408 ) -> Callable[[storage.SplitReadStreamRequest], storage.SplitReadStreamResponse]:
409 r"""Return a callable for the split read stream method over gRPC.
410
411 Splits a given ``ReadStream`` into two ``ReadStream`` objects.
412 These ``ReadStream`` objects are referred to as the primary and
413 the residual streams of the split. The original ``ReadStream``
414 can still be read from in the same manner as before. Both of the
415 returned ``ReadStream`` objects can also be read from, and the
416 rows returned by both child streams will be the same as the rows
417 read from the original stream.
418
419 Moreover, the two child streams will be allocated back-to-back
420 in the original ``ReadStream``. Concretely, it is guaranteed
421 that for streams original, primary, and residual, that
422 original[0-j] = primary[0-j] and original[j-n] = residual[0-m]
423 once the streams have been read to completion.
424
425 Returns:
426 Callable[[~.SplitReadStreamRequest],
427 ~.SplitReadStreamResponse]:
428 A function that, when called, will call the underlying RPC
429 on the server.
430 """
431 # Generate a "stub function" on-the-fly which will actually make
432 # the request.
433 # gRPC handles serialization and deserialization, so we just need
434 # to pass in the functions for each.
435 if "split_read_stream" not in self._stubs:
436 self._stubs["split_read_stream"] = self._logged_channel.unary_unary(
437 "/google.cloud.bigquery.storage.v1.BigQueryRead/SplitReadStream",
438 request_serializer=storage.SplitReadStreamRequest.serialize,
439 response_deserializer=storage.SplitReadStreamResponse.deserialize,
440 )
441 return self._stubs["split_read_stream"]
442
443 def close(self):
444 self._logged_channel.close()
445
446 @property
447 def kind(self) -> str:
448 return "grpc"
449
450
451__all__ = ("BigQueryReadGrpcTransport",)