1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from collections import OrderedDict
17from http import HTTPStatus
18import json
19import logging as std_logging
20import os
21import re
22from typing import (
23 Callable,
24 Dict,
25 Iterable,
26 Iterator,
27 Mapping,
28 MutableMapping,
29 MutableSequence,
30 Optional,
31 Sequence,
32 Tuple,
33 Type,
34 Union,
35 cast,
36)
37import warnings
38
39from google.api_core import client_options as client_options_lib
40from google.api_core import exceptions as core_exceptions
41from google.api_core import gapic_v1
42from google.api_core import retry as retries
43from google.auth import credentials as ga_credentials # type: ignore
44from google.auth.exceptions import MutualTLSChannelError # type: ignore
45from google.auth.transport import mtls # type: ignore
46from google.auth.transport.grpc import SslCredentials # type: ignore
47from google.oauth2 import service_account # type: ignore
48import google.protobuf
49
50from google.cloud.bigquery_storage_v1 import gapic_version as package_version
51
52try:
53 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None]
54except AttributeError: # pragma: NO COVER
55 OptionalRetry = Union[retries.Retry, object, None] # type: ignore
56
57try:
58 from google.api_core import client_logging # type: ignore
59
60 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
61except ImportError: # pragma: NO COVER
62 CLIENT_LOGGING_SUPPORTED = False
63
64_LOGGER = std_logging.getLogger(__name__)
65
66from google.protobuf import timestamp_pb2 # type: ignore
67from google.rpc import status_pb2 # type: ignore
68
69from google.cloud.bigquery_storage_v1.types import storage, stream, table
70
71from .transports.base import DEFAULT_CLIENT_INFO, BigQueryWriteTransport
72from .transports.grpc import BigQueryWriteGrpcTransport
73from .transports.grpc_asyncio import BigQueryWriteGrpcAsyncIOTransport
74
75
76class BigQueryWriteClientMeta(type):
77 """Metaclass for the BigQueryWrite client.
78
79 This provides class-level methods for building and retrieving
80 support objects (e.g. transport) without polluting the client instance
81 objects.
82 """
83
84 _transport_registry = OrderedDict() # type: Dict[str, Type[BigQueryWriteTransport]]
85 _transport_registry["grpc"] = BigQueryWriteGrpcTransport
86 _transport_registry["grpc_asyncio"] = BigQueryWriteGrpcAsyncIOTransport
87
88 def get_transport_class(
89 cls,
90 label: Optional[str] = None,
91 ) -> Type[BigQueryWriteTransport]:
92 """Returns an appropriate transport class.
93
94 Args:
95 label: The name of the desired transport. If none is
96 provided, then the first transport in the registry is used.
97
98 Returns:
99 The transport class to use.
100 """
101 # If a specific transport is requested, return that one.
102 if label:
103 return cls._transport_registry[label]
104
105 # No transport is requested; return the default (that is, the first one
106 # in the dictionary).
107 return next(iter(cls._transport_registry.values()))
108
109
110class BigQueryWriteClient(metaclass=BigQueryWriteClientMeta):
111 """BigQuery Write API.
112
113 The Write API can be used to write data to BigQuery.
114
115 For supplementary information about the Write API, see:
116
117 https://cloud.google.com/bigquery/docs/write-api
118 """
119
120 @staticmethod
121 def _get_default_mtls_endpoint(api_endpoint):
122 """Converts api endpoint to mTLS endpoint.
123
124 Convert "*.sandbox.googleapis.com" and "*.googleapis.com" to
125 "*.mtls.sandbox.googleapis.com" and "*.mtls.googleapis.com" respectively.
126 Args:
127 api_endpoint (Optional[str]): the api endpoint to convert.
128 Returns:
129 str: converted mTLS api endpoint.
130 """
131 if not api_endpoint:
132 return api_endpoint
133
134 mtls_endpoint_re = re.compile(
135 r"(?P<name>[^.]+)(?P<mtls>\.mtls)?(?P<sandbox>\.sandbox)?(?P<googledomain>\.googleapis\.com)?"
136 )
137
138 m = mtls_endpoint_re.match(api_endpoint)
139 name, mtls, sandbox, googledomain = m.groups()
140 if mtls or not googledomain:
141 return api_endpoint
142
143 if sandbox:
144 return api_endpoint.replace(
145 "sandbox.googleapis.com", "mtls.sandbox.googleapis.com"
146 )
147
148 return api_endpoint.replace(".googleapis.com", ".mtls.googleapis.com")
149
150 # Note: DEFAULT_ENDPOINT is deprecated. Use _DEFAULT_ENDPOINT_TEMPLATE instead.
151 DEFAULT_ENDPOINT = "bigquerystorage.googleapis.com"
152 DEFAULT_MTLS_ENDPOINT = _get_default_mtls_endpoint.__func__( # type: ignore
153 DEFAULT_ENDPOINT
154 )
155
156 _DEFAULT_ENDPOINT_TEMPLATE = "bigquerystorage.{UNIVERSE_DOMAIN}"
157 _DEFAULT_UNIVERSE = "googleapis.com"
158
159 @classmethod
160 def from_service_account_info(cls, info: dict, *args, **kwargs):
161 """Creates an instance of this client using the provided credentials
162 info.
163
164 Args:
165 info (dict): The service account private key info.
166 args: Additional arguments to pass to the constructor.
167 kwargs: Additional arguments to pass to the constructor.
168
169 Returns:
170 BigQueryWriteClient: The constructed client.
171 """
172 credentials = service_account.Credentials.from_service_account_info(info)
173 kwargs["credentials"] = credentials
174 return cls(*args, **kwargs)
175
176 @classmethod
177 def from_service_account_file(cls, filename: str, *args, **kwargs):
178 """Creates an instance of this client using the provided credentials
179 file.
180
181 Args:
182 filename (str): The path to the service account private key json
183 file.
184 args: Additional arguments to pass to the constructor.
185 kwargs: Additional arguments to pass to the constructor.
186
187 Returns:
188 BigQueryWriteClient: The constructed client.
189 """
190 credentials = service_account.Credentials.from_service_account_file(filename)
191 kwargs["credentials"] = credentials
192 return cls(*args, **kwargs)
193
194 from_service_account_json = from_service_account_file
195
196 @property
197 def transport(self) -> BigQueryWriteTransport:
198 """Returns the transport used by the client instance.
199
200 Returns:
201 BigQueryWriteTransport: The transport used by the client
202 instance.
203 """
204 return self._transport
205
206 @staticmethod
207 def table_path(
208 project: str,
209 dataset: str,
210 table: str,
211 ) -> str:
212 """Returns a fully-qualified table string."""
213 return "projects/{project}/datasets/{dataset}/tables/{table}".format(
214 project=project,
215 dataset=dataset,
216 table=table,
217 )
218
219 @staticmethod
220 def parse_table_path(path: str) -> Dict[str, str]:
221 """Parses a table path into its component segments."""
222 m = re.match(
223 r"^projects/(?P<project>.+?)/datasets/(?P<dataset>.+?)/tables/(?P<table>.+?)$",
224 path,
225 )
226 return m.groupdict() if m else {}
227
228 @staticmethod
229 def write_stream_path(
230 project: str,
231 dataset: str,
232 table: str,
233 stream: str,
234 ) -> str:
235 """Returns a fully-qualified write_stream string."""
236 return "projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}".format(
237 project=project,
238 dataset=dataset,
239 table=table,
240 stream=stream,
241 )
242
243 @staticmethod
244 def parse_write_stream_path(path: str) -> Dict[str, str]:
245 """Parses a write_stream path into its component segments."""
246 m = re.match(
247 r"^projects/(?P<project>.+?)/datasets/(?P<dataset>.+?)/tables/(?P<table>.+?)/streams/(?P<stream>.+?)$",
248 path,
249 )
250 return m.groupdict() if m else {}
251
252 @staticmethod
253 def common_billing_account_path(
254 billing_account: str,
255 ) -> str:
256 """Returns a fully-qualified billing_account string."""
257 return "billingAccounts/{billing_account}".format(
258 billing_account=billing_account,
259 )
260
261 @staticmethod
262 def parse_common_billing_account_path(path: str) -> Dict[str, str]:
263 """Parse a billing_account path into its component segments."""
264 m = re.match(r"^billingAccounts/(?P<billing_account>.+?)$", path)
265 return m.groupdict() if m else {}
266
267 @staticmethod
268 def common_folder_path(
269 folder: str,
270 ) -> str:
271 """Returns a fully-qualified folder string."""
272 return "folders/{folder}".format(
273 folder=folder,
274 )
275
276 @staticmethod
277 def parse_common_folder_path(path: str) -> Dict[str, str]:
278 """Parse a folder path into its component segments."""
279 m = re.match(r"^folders/(?P<folder>.+?)$", path)
280 return m.groupdict() if m else {}
281
282 @staticmethod
283 def common_organization_path(
284 organization: str,
285 ) -> str:
286 """Returns a fully-qualified organization string."""
287 return "organizations/{organization}".format(
288 organization=organization,
289 )
290
291 @staticmethod
292 def parse_common_organization_path(path: str) -> Dict[str, str]:
293 """Parse a organization path into its component segments."""
294 m = re.match(r"^organizations/(?P<organization>.+?)$", path)
295 return m.groupdict() if m else {}
296
297 @staticmethod
298 def common_project_path(
299 project: str,
300 ) -> str:
301 """Returns a fully-qualified project string."""
302 return "projects/{project}".format(
303 project=project,
304 )
305
306 @staticmethod
307 def parse_common_project_path(path: str) -> Dict[str, str]:
308 """Parse a project path into its component segments."""
309 m = re.match(r"^projects/(?P<project>.+?)$", path)
310 return m.groupdict() if m else {}
311
312 @staticmethod
313 def common_location_path(
314 project: str,
315 location: str,
316 ) -> str:
317 """Returns a fully-qualified location string."""
318 return "projects/{project}/locations/{location}".format(
319 project=project,
320 location=location,
321 )
322
323 @staticmethod
324 def parse_common_location_path(path: str) -> Dict[str, str]:
325 """Parse a location path into its component segments."""
326 m = re.match(r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)$", path)
327 return m.groupdict() if m else {}
328
329 @classmethod
330 def get_mtls_endpoint_and_cert_source(
331 cls, client_options: Optional[client_options_lib.ClientOptions] = None
332 ):
333 """Deprecated. Return the API endpoint and client cert source for mutual TLS.
334
335 The client cert source is determined in the following order:
336 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the
337 client cert source is None.
338 (2) if `client_options.client_cert_source` is provided, use the provided one; if the
339 default client cert source exists, use the default one; otherwise the client cert
340 source is None.
341
342 The API endpoint is determined in the following order:
343 (1) if `client_options.api_endpoint` if provided, use the provided one.
344 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the
345 default mTLS endpoint; if the environment variable is "never", use the default API
346 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise
347 use the default API endpoint.
348
349 More details can be found at https://google.aip.dev/auth/4114.
350
351 Args:
352 client_options (google.api_core.client_options.ClientOptions): Custom options for the
353 client. Only the `api_endpoint` and `client_cert_source` properties may be used
354 in this method.
355
356 Returns:
357 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the
358 client cert source to use.
359
360 Raises:
361 google.auth.exceptions.MutualTLSChannelError: If any errors happen.
362 """
363
364 warnings.warn(
365 "get_mtls_endpoint_and_cert_source is deprecated. Use the api_endpoint property instead.",
366 DeprecationWarning,
367 )
368 if client_options is None:
369 client_options = client_options_lib.ClientOptions()
370 use_client_cert = os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false")
371 use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto")
372 if use_client_cert not in ("true", "false"):
373 raise ValueError(
374 "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`"
375 )
376 if use_mtls_endpoint not in ("auto", "never", "always"):
377 raise MutualTLSChannelError(
378 "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`"
379 )
380
381 # Figure out the client cert source to use.
382 client_cert_source = None
383 if use_client_cert == "true":
384 if client_options.client_cert_source:
385 client_cert_source = client_options.client_cert_source
386 elif mtls.has_default_client_cert_source():
387 client_cert_source = mtls.default_client_cert_source()
388
389 # Figure out which api endpoint to use.
390 if client_options.api_endpoint is not None:
391 api_endpoint = client_options.api_endpoint
392 elif use_mtls_endpoint == "always" or (
393 use_mtls_endpoint == "auto" and client_cert_source
394 ):
395 api_endpoint = cls.DEFAULT_MTLS_ENDPOINT
396 else:
397 api_endpoint = cls.DEFAULT_ENDPOINT
398
399 return api_endpoint, client_cert_source
400
401 @staticmethod
402 def _read_environment_variables():
403 """Returns the environment variables used by the client.
404
405 Returns:
406 Tuple[bool, str, str]: returns the GOOGLE_API_USE_CLIENT_CERTIFICATE,
407 GOOGLE_API_USE_MTLS_ENDPOINT, and GOOGLE_CLOUD_UNIVERSE_DOMAIN environment variables.
408
409 Raises:
410 ValueError: If GOOGLE_API_USE_CLIENT_CERTIFICATE is not
411 any of ["true", "false"].
412 google.auth.exceptions.MutualTLSChannelError: If GOOGLE_API_USE_MTLS_ENDPOINT
413 is not any of ["auto", "never", "always"].
414 """
415 use_client_cert = os.getenv(
416 "GOOGLE_API_USE_CLIENT_CERTIFICATE", "false"
417 ).lower()
418 use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto").lower()
419 universe_domain_env = os.getenv("GOOGLE_CLOUD_UNIVERSE_DOMAIN")
420 if use_client_cert not in ("true", "false"):
421 raise ValueError(
422 "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`"
423 )
424 if use_mtls_endpoint not in ("auto", "never", "always"):
425 raise MutualTLSChannelError(
426 "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`"
427 )
428 return use_client_cert == "true", use_mtls_endpoint, universe_domain_env
429
430 @staticmethod
431 def _get_client_cert_source(provided_cert_source, use_cert_flag):
432 """Return the client cert source to be used by the client.
433
434 Args:
435 provided_cert_source (bytes): The client certificate source provided.
436 use_cert_flag (bool): A flag indicating whether to use the client certificate.
437
438 Returns:
439 bytes or None: The client cert source to be used by the client.
440 """
441 client_cert_source = None
442 if use_cert_flag:
443 if provided_cert_source:
444 client_cert_source = provided_cert_source
445 elif mtls.has_default_client_cert_source():
446 client_cert_source = mtls.default_client_cert_source()
447 return client_cert_source
448
449 @staticmethod
450 def _get_api_endpoint(
451 api_override, client_cert_source, universe_domain, use_mtls_endpoint
452 ):
453 """Return the API endpoint used by the client.
454
455 Args:
456 api_override (str): The API endpoint override. If specified, this is always
457 the return value of this function and the other arguments are not used.
458 client_cert_source (bytes): The client certificate source used by the client.
459 universe_domain (str): The universe domain used by the client.
460 use_mtls_endpoint (str): How to use the mTLS endpoint, which depends also on the other parameters.
461 Possible values are "always", "auto", or "never".
462
463 Returns:
464 str: The API endpoint to be used by the client.
465 """
466 if api_override is not None:
467 api_endpoint = api_override
468 elif use_mtls_endpoint == "always" or (
469 use_mtls_endpoint == "auto" and client_cert_source
470 ):
471 _default_universe = BigQueryWriteClient._DEFAULT_UNIVERSE
472 if universe_domain != _default_universe:
473 raise MutualTLSChannelError(
474 f"mTLS is not supported in any universe other than {_default_universe}."
475 )
476 api_endpoint = BigQueryWriteClient.DEFAULT_MTLS_ENDPOINT
477 else:
478 api_endpoint = BigQueryWriteClient._DEFAULT_ENDPOINT_TEMPLATE.format(
479 UNIVERSE_DOMAIN=universe_domain
480 )
481 return api_endpoint
482
483 @staticmethod
484 def _get_universe_domain(
485 client_universe_domain: Optional[str], universe_domain_env: Optional[str]
486 ) -> str:
487 """Return the universe domain used by the client.
488
489 Args:
490 client_universe_domain (Optional[str]): The universe domain configured via the client options.
491 universe_domain_env (Optional[str]): The universe domain configured via the "GOOGLE_CLOUD_UNIVERSE_DOMAIN" environment variable.
492
493 Returns:
494 str: The universe domain to be used by the client.
495
496 Raises:
497 ValueError: If the universe domain is an empty string.
498 """
499 universe_domain = BigQueryWriteClient._DEFAULT_UNIVERSE
500 if client_universe_domain is not None:
501 universe_domain = client_universe_domain
502 elif universe_domain_env is not None:
503 universe_domain = universe_domain_env
504 if len(universe_domain.strip()) == 0:
505 raise ValueError("Universe Domain cannot be an empty string.")
506 return universe_domain
507
508 def _validate_universe_domain(self):
509 """Validates client's and credentials' universe domains are consistent.
510
511 Returns:
512 bool: True iff the configured universe domain is valid.
513
514 Raises:
515 ValueError: If the configured universe domain is not valid.
516 """
517
518 # NOTE (b/349488459): universe validation is disabled until further notice.
519 return True
520
521 def _add_cred_info_for_auth_errors(
522 self, error: core_exceptions.GoogleAPICallError
523 ) -> None:
524 """Adds credential info string to error details for 401/403/404 errors.
525
526 Args:
527 error (google.api_core.exceptions.GoogleAPICallError): The error to add the cred info.
528 """
529 if error.code not in [
530 HTTPStatus.UNAUTHORIZED,
531 HTTPStatus.FORBIDDEN,
532 HTTPStatus.NOT_FOUND,
533 ]:
534 return
535
536 cred = self._transport._credentials
537
538 # get_cred_info is only available in google-auth>=2.35.0
539 if not hasattr(cred, "get_cred_info"):
540 return
541
542 # ignore the type check since pypy test fails when get_cred_info
543 # is not available
544 cred_info = cred.get_cred_info() # type: ignore
545 if cred_info and hasattr(error._details, "append"):
546 error._details.append(json.dumps(cred_info))
547
548 @property
549 def api_endpoint(self):
550 """Return the API endpoint used by the client instance.
551
552 Returns:
553 str: The API endpoint used by the client instance.
554 """
555 return self._api_endpoint
556
557 @property
558 def universe_domain(self) -> str:
559 """Return the universe domain used by the client instance.
560
561 Returns:
562 str: The universe domain used by the client instance.
563 """
564 return self._universe_domain
565
566 def __init__(
567 self,
568 *,
569 credentials: Optional[ga_credentials.Credentials] = None,
570 transport: Optional[
571 Union[str, BigQueryWriteTransport, Callable[..., BigQueryWriteTransport]]
572 ] = None,
573 client_options: Optional[Union[client_options_lib.ClientOptions, dict]] = None,
574 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
575 ) -> None:
576 """Instantiates the big query write client.
577
578 Args:
579 credentials (Optional[google.auth.credentials.Credentials]): The
580 authorization credentials to attach to requests. These
581 credentials identify the application to the service; if none
582 are specified, the client will attempt to ascertain the
583 credentials from the environment.
584 transport (Optional[Union[str,BigQueryWriteTransport,Callable[..., BigQueryWriteTransport]]]):
585 The transport to use, or a Callable that constructs and returns a new transport.
586 If a Callable is given, it will be called with the same set of initialization
587 arguments as used in the BigQueryWriteTransport constructor.
588 If set to None, a transport is chosen automatically.
589 client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]):
590 Custom options for the client.
591
592 1. The ``api_endpoint`` property can be used to override the
593 default endpoint provided by the client when ``transport`` is
594 not explicitly provided. Only if this property is not set and
595 ``transport`` was not explicitly provided, the endpoint is
596 determined by the GOOGLE_API_USE_MTLS_ENDPOINT environment
597 variable, which have one of the following values:
598 "always" (always use the default mTLS endpoint), "never" (always
599 use the default regular endpoint) and "auto" (auto-switch to the
600 default mTLS endpoint if client certificate is present; this is
601 the default value).
602
603 2. If the GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable
604 is "true", then the ``client_cert_source`` property can be used
605 to provide a client certificate for mTLS transport. If
606 not provided, the default SSL client certificate will be used if
607 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not
608 set, no client certificate will be used.
609
610 3. The ``universe_domain`` property can be used to override the
611 default "googleapis.com" universe. Note that the ``api_endpoint``
612 property still takes precedence; and ``universe_domain`` is
613 currently not supported for mTLS.
614
615 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
616 The client info used to send a user-agent string along with
617 API requests. If ``None``, then default info will be used.
618 Generally, you only need to set this if you're developing
619 your own client library.
620
621 Raises:
622 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport
623 creation failed for any reason.
624 """
625 self._client_options = client_options
626 if isinstance(self._client_options, dict):
627 self._client_options = client_options_lib.from_dict(self._client_options)
628 if self._client_options is None:
629 self._client_options = client_options_lib.ClientOptions()
630 self._client_options = cast(
631 client_options_lib.ClientOptions, self._client_options
632 )
633
634 universe_domain_opt = getattr(self._client_options, "universe_domain", None)
635
636 (
637 self._use_client_cert,
638 self._use_mtls_endpoint,
639 self._universe_domain_env,
640 ) = BigQueryWriteClient._read_environment_variables()
641 self._client_cert_source = BigQueryWriteClient._get_client_cert_source(
642 self._client_options.client_cert_source, self._use_client_cert
643 )
644 self._universe_domain = BigQueryWriteClient._get_universe_domain(
645 universe_domain_opt, self._universe_domain_env
646 )
647 self._api_endpoint = None # updated below, depending on `transport`
648
649 # Initialize the universe domain validation.
650 self._is_universe_domain_valid = False
651
652 if CLIENT_LOGGING_SUPPORTED: # pragma: NO COVER
653 # Setup logging.
654 client_logging.initialize_logging()
655
656 api_key_value = getattr(self._client_options, "api_key", None)
657 if api_key_value and credentials:
658 raise ValueError(
659 "client_options.api_key and credentials are mutually exclusive"
660 )
661
662 # Save or instantiate the transport.
663 # Ordinarily, we provide the transport, but allowing a custom transport
664 # instance provides an extensibility point for unusual situations.
665 transport_provided = isinstance(transport, BigQueryWriteTransport)
666 if transport_provided:
667 # transport is a BigQueryWriteTransport instance.
668 if credentials or self._client_options.credentials_file or api_key_value:
669 raise ValueError(
670 "When providing a transport instance, "
671 "provide its credentials directly."
672 )
673 if self._client_options.scopes:
674 raise ValueError(
675 "When providing a transport instance, provide its scopes "
676 "directly."
677 )
678 self._transport = cast(BigQueryWriteTransport, transport)
679 self._api_endpoint = self._transport.host
680
681 self._api_endpoint = (
682 self._api_endpoint
683 or BigQueryWriteClient._get_api_endpoint(
684 self._client_options.api_endpoint,
685 self._client_cert_source,
686 self._universe_domain,
687 self._use_mtls_endpoint,
688 )
689 )
690
691 if not transport_provided:
692 import google.auth._default # type: ignore
693
694 if api_key_value and hasattr(
695 google.auth._default, "get_api_key_credentials"
696 ):
697 credentials = google.auth._default.get_api_key_credentials(
698 api_key_value
699 )
700
701 transport_init: Union[
702 Type[BigQueryWriteTransport], Callable[..., BigQueryWriteTransport]
703 ] = (
704 BigQueryWriteClient.get_transport_class(transport)
705 if isinstance(transport, str) or transport is None
706 else cast(Callable[..., BigQueryWriteTransport], transport)
707 )
708 # initialize with the provided callable or the passed in class
709 self._transport = transport_init(
710 credentials=credentials,
711 credentials_file=self._client_options.credentials_file,
712 host=self._api_endpoint,
713 scopes=self._client_options.scopes,
714 client_cert_source_for_mtls=self._client_cert_source,
715 quota_project_id=self._client_options.quota_project_id,
716 client_info=client_info,
717 always_use_jwt_access=True,
718 api_audience=self._client_options.api_audience,
719 )
720
721 if "async" not in str(self._transport):
722 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
723 std_logging.DEBUG
724 ): # pragma: NO COVER
725 _LOGGER.debug(
726 "Created client `google.cloud.bigquery.storage_v1.BigQueryWriteClient`.",
727 extra={
728 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryWrite",
729 "universeDomain": getattr(
730 self._transport._credentials, "universe_domain", ""
731 ),
732 "credentialsType": f"{type(self._transport._credentials).__module__}.{type(self._transport._credentials).__qualname__}",
733 "credentialsInfo": getattr(
734 self.transport._credentials, "get_cred_info", lambda: None
735 )(),
736 }
737 if hasattr(self._transport, "_credentials")
738 else {
739 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryWrite",
740 "credentialsType": None,
741 },
742 )
743
744 def create_write_stream(
745 self,
746 request: Optional[Union[storage.CreateWriteStreamRequest, dict]] = None,
747 *,
748 parent: Optional[str] = None,
749 write_stream: Optional[stream.WriteStream] = None,
750 retry: OptionalRetry = gapic_v1.method.DEFAULT,
751 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
752 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
753 ) -> stream.WriteStream:
754 r"""Creates a write stream to the given table. Additionally, every
755 table has a special stream named '_default' to which data can be
756 written. This stream doesn't need to be created using
757 CreateWriteStream. It is a stream that can be used
758 simultaneously by any number of clients. Data written to this
759 stream is considered committed as soon as an acknowledgement is
760 received.
761
762 .. code-block:: python
763
764 # This snippet has been automatically generated and should be regarded as a
765 # code template only.
766 # It will require modifications to work:
767 # - It may require correct/in-range values for request initialization.
768 # - It may require specifying regional endpoints when creating the service
769 # client as shown in:
770 # https://googleapis.dev/python/google-api-core/latest/client_options.html
771 from google.cloud import bigquery_storage_v1
772
773 def sample_create_write_stream():
774 # Create a client
775 client = bigquery_storage_v1.BigQueryWriteClient()
776
777 # Initialize request argument(s)
778 request = bigquery_storage_v1.CreateWriteStreamRequest(
779 parent="parent_value",
780 )
781
782 # Make the request
783 response = client.create_write_stream(request=request)
784
785 # Handle the response
786 print(response)
787
788 Args:
789 request (Union[google.cloud.bigquery_storage_v1.types.CreateWriteStreamRequest, dict]):
790 The request object. Request message for ``CreateWriteStream``.
791 parent (str):
792 Required. Reference to the table to which the stream
793 belongs, in the format of
794 ``projects/{project}/datasets/{dataset}/tables/{table}``.
795
796 This corresponds to the ``parent`` field
797 on the ``request`` instance; if ``request`` is provided, this
798 should not be set.
799 write_stream (google.cloud.bigquery_storage_v1.types.WriteStream):
800 Required. Stream to be created.
801 This corresponds to the ``write_stream`` field
802 on the ``request`` instance; if ``request`` is provided, this
803 should not be set.
804 retry (google.api_core.retry.Retry): Designation of what errors, if any,
805 should be retried.
806 timeout (float): The timeout for this request.
807 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
808 sent along with the request as metadata. Normally, each value must be of type `str`,
809 but for metadata keys ending with the suffix `-bin`, the corresponding values must
810 be of type `bytes`.
811
812 Returns:
813 google.cloud.bigquery_storage_v1.types.WriteStream:
814 Information about a single stream
815 that gets data inside the storage
816 system.
817
818 """
819 # Create or coerce a protobuf request object.
820 # - Quick check: If we got a request object, we should *not* have
821 # gotten any keyword arguments that map to the request.
822 flattened_params = [parent, write_stream]
823 has_flattened_params = (
824 len([param for param in flattened_params if param is not None]) > 0
825 )
826 if request is not None and has_flattened_params:
827 raise ValueError(
828 "If the `request` argument is set, then none of "
829 "the individual field arguments should be set."
830 )
831
832 # - Use the request object if provided (there's no risk of modifying the input as
833 # there are no flattened fields), or create one.
834 if not isinstance(request, storage.CreateWriteStreamRequest):
835 request = storage.CreateWriteStreamRequest(request)
836 # If we have keyword arguments corresponding to fields on the
837 # request, apply these.
838 if parent is not None:
839 request.parent = parent
840 if write_stream is not None:
841 request.write_stream = write_stream
842
843 # Wrap the RPC method; this adds retry and timeout information,
844 # and friendly error handling.
845 rpc = self._transport._wrapped_methods[self._transport.create_write_stream]
846
847 # Certain fields should be provided within the metadata header;
848 # add these here.
849 metadata = tuple(metadata) + (
850 gapic_v1.routing_header.to_grpc_metadata((("parent", request.parent),)),
851 )
852
853 # Validate the universe domain.
854 self._validate_universe_domain()
855
856 # Send the request.
857 response = rpc(
858 request,
859 retry=retry,
860 timeout=timeout,
861 metadata=metadata,
862 )
863
864 # Done; return the response.
865 return response
866
867 def append_rows(
868 self,
869 requests: Optional[Iterator[storage.AppendRowsRequest]] = None,
870 *,
871 retry: OptionalRetry = gapic_v1.method.DEFAULT,
872 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
873 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
874 ) -> Iterable[storage.AppendRowsResponse]:
875 r"""Appends data to the given stream.
876
877 If ``offset`` is specified, the ``offset`` is checked against
878 the end of stream. The server returns ``OUT_OF_RANGE`` in
879 ``AppendRowsResponse`` if an attempt is made to append to an
880 offset beyond the current end of the stream or
881 ``ALREADY_EXISTS`` if user provides an ``offset`` that has
882 already been written to. User can retry with adjusted offset
883 within the same RPC connection. If ``offset`` is not specified,
884 append happens at the end of the stream.
885
886 The response contains an optional offset at which the append
887 happened. No offset information will be returned for appends to
888 a default stream.
889
890 Responses are received in the same order in which requests are
891 sent. There will be one response for each successful inserted
892 request. Responses may optionally embed error information if the
893 originating AppendRequest was not successfully processed.
894
895 The specifics of when successfully appended data is made visible
896 to the table are governed by the type of stream:
897
898 - For COMMITTED streams (which includes the default stream),
899 data is visible immediately upon successful append.
900
901 - For BUFFERED streams, data is made visible via a subsequent
902 ``FlushRows`` rpc which advances a cursor to a newer offset
903 in the stream.
904
905 - For PENDING streams, data is not made visible until the
906 stream itself is finalized (via the ``FinalizeWriteStream``
907 rpc), and the stream is explicitly committed via the
908 ``BatchCommitWriteStreams`` rpc.
909
910 .. code-block:: python
911
912 # This snippet has been automatically generated and should be regarded as a
913 # code template only.
914 # It will require modifications to work:
915 # - It may require correct/in-range values for request initialization.
916 # - It may require specifying regional endpoints when creating the service
917 # client as shown in:
918 # https://googleapis.dev/python/google-api-core/latest/client_options.html
919 from google.cloud import bigquery_storage_v1
920
921 def sample_append_rows():
922 # Create a client
923 client = bigquery_storage_v1.BigQueryWriteClient()
924
925 # Initialize request argument(s)
926 request = bigquery_storage_v1.AppendRowsRequest(
927 write_stream="write_stream_value",
928 )
929
930 # This method expects an iterator which contains
931 # 'bigquery_storage_v1.AppendRowsRequest' objects
932 # Here we create a generator that yields a single `request` for
933 # demonstrative purposes.
934 requests = [request]
935
936 def request_generator():
937 for request in requests:
938 yield request
939
940 # Make the request
941 stream = client.append_rows(requests=request_generator())
942
943 # Handle the response
944 for response in stream:
945 print(response)
946
947 Args:
948 requests (Iterator[google.cloud.bigquery_storage_v1.types.AppendRowsRequest]):
949 The request object iterator. Request message for ``AppendRows``.
950
951 Because AppendRows is a bidirectional streaming RPC,
952 certain parts of the AppendRowsRequest need only be
953 specified for the first request before switching table
954 destinations. You can also switch table destinations
955 within the same connection for the default stream.
956
957 The size of a single AppendRowsRequest must be less than
958 10 MB in size. Requests larger than this return an
959 error, typically ``INVALID_ARGUMENT``.
960 retry (google.api_core.retry.Retry): Designation of what errors, if any,
961 should be retried.
962 timeout (float): The timeout for this request.
963 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
964 sent along with the request as metadata. Normally, each value must be of type `str`,
965 but for metadata keys ending with the suffix `-bin`, the corresponding values must
966 be of type `bytes`.
967
968 Returns:
969 Iterable[google.cloud.bigquery_storage_v1.types.AppendRowsResponse]:
970 Response message for AppendRows.
971 """
972
973 # Wrap the RPC method; this adds retry and timeout information,
974 # and friendly error handling.
975 rpc = self._transport._wrapped_methods[self._transport.append_rows]
976
977 # Certain fields should be provided within the metadata header;
978 # add these here.
979 metadata = tuple(metadata) + (gapic_v1.routing_header.to_grpc_metadata(()),)
980
981 # Validate the universe domain.
982 self._validate_universe_domain()
983
984 # Send the request.
985 response = rpc(
986 requests,
987 retry=retry,
988 timeout=timeout,
989 metadata=metadata,
990 )
991
992 # Done; return the response.
993 return response
994
995 def get_write_stream(
996 self,
997 request: Optional[Union[storage.GetWriteStreamRequest, dict]] = None,
998 *,
999 name: Optional[str] = None,
1000 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1001 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1002 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1003 ) -> stream.WriteStream:
1004 r"""Gets information about a write stream.
1005
1006 .. code-block:: python
1007
1008 # This snippet has been automatically generated and should be regarded as a
1009 # code template only.
1010 # It will require modifications to work:
1011 # - It may require correct/in-range values for request initialization.
1012 # - It may require specifying regional endpoints when creating the service
1013 # client as shown in:
1014 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1015 from google.cloud import bigquery_storage_v1
1016
1017 def sample_get_write_stream():
1018 # Create a client
1019 client = bigquery_storage_v1.BigQueryWriteClient()
1020
1021 # Initialize request argument(s)
1022 request = bigquery_storage_v1.GetWriteStreamRequest(
1023 name="name_value",
1024 )
1025
1026 # Make the request
1027 response = client.get_write_stream(request=request)
1028
1029 # Handle the response
1030 print(response)
1031
1032 Args:
1033 request (Union[google.cloud.bigquery_storage_v1.types.GetWriteStreamRequest, dict]):
1034 The request object. Request message for ``GetWriteStreamRequest``.
1035 name (str):
1036 Required. Name of the stream to get, in the form of
1037 ``projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}``.
1038
1039 This corresponds to the ``name`` field
1040 on the ``request`` instance; if ``request`` is provided, this
1041 should not be set.
1042 retry (google.api_core.retry.Retry): Designation of what errors, if any,
1043 should be retried.
1044 timeout (float): The timeout for this request.
1045 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1046 sent along with the request as metadata. Normally, each value must be of type `str`,
1047 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1048 be of type `bytes`.
1049
1050 Returns:
1051 google.cloud.bigquery_storage_v1.types.WriteStream:
1052 Information about a single stream
1053 that gets data inside the storage
1054 system.
1055
1056 """
1057 # Create or coerce a protobuf request object.
1058 # - Quick check: If we got a request object, we should *not* have
1059 # gotten any keyword arguments that map to the request.
1060 flattened_params = [name]
1061 has_flattened_params = (
1062 len([param for param in flattened_params if param is not None]) > 0
1063 )
1064 if request is not None and has_flattened_params:
1065 raise ValueError(
1066 "If the `request` argument is set, then none of "
1067 "the individual field arguments should be set."
1068 )
1069
1070 # - Use the request object if provided (there's no risk of modifying the input as
1071 # there are no flattened fields), or create one.
1072 if not isinstance(request, storage.GetWriteStreamRequest):
1073 request = storage.GetWriteStreamRequest(request)
1074 # If we have keyword arguments corresponding to fields on the
1075 # request, apply these.
1076 if name is not None:
1077 request.name = name
1078
1079 # Wrap the RPC method; this adds retry and timeout information,
1080 # and friendly error handling.
1081 rpc = self._transport._wrapped_methods[self._transport.get_write_stream]
1082
1083 # Certain fields should be provided within the metadata header;
1084 # add these here.
1085 metadata = tuple(metadata) + (
1086 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
1087 )
1088
1089 # Validate the universe domain.
1090 self._validate_universe_domain()
1091
1092 # Send the request.
1093 response = rpc(
1094 request,
1095 retry=retry,
1096 timeout=timeout,
1097 metadata=metadata,
1098 )
1099
1100 # Done; return the response.
1101 return response
1102
1103 def finalize_write_stream(
1104 self,
1105 request: Optional[Union[storage.FinalizeWriteStreamRequest, dict]] = None,
1106 *,
1107 name: Optional[str] = None,
1108 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1109 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1110 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1111 ) -> storage.FinalizeWriteStreamResponse:
1112 r"""Finalize a write stream so that no new data can be appended to
1113 the stream. Finalize is not supported on the '_default' stream.
1114
1115 .. code-block:: python
1116
1117 # This snippet has been automatically generated and should be regarded as a
1118 # code template only.
1119 # It will require modifications to work:
1120 # - It may require correct/in-range values for request initialization.
1121 # - It may require specifying regional endpoints when creating the service
1122 # client as shown in:
1123 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1124 from google.cloud import bigquery_storage_v1
1125
1126 def sample_finalize_write_stream():
1127 # Create a client
1128 client = bigquery_storage_v1.BigQueryWriteClient()
1129
1130 # Initialize request argument(s)
1131 request = bigquery_storage_v1.FinalizeWriteStreamRequest(
1132 name="name_value",
1133 )
1134
1135 # Make the request
1136 response = client.finalize_write_stream(request=request)
1137
1138 # Handle the response
1139 print(response)
1140
1141 Args:
1142 request (Union[google.cloud.bigquery_storage_v1.types.FinalizeWriteStreamRequest, dict]):
1143 The request object. Request message for invoking ``FinalizeWriteStream``.
1144 name (str):
1145 Required. Name of the stream to finalize, in the form of
1146 ``projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}``.
1147
1148 This corresponds to the ``name`` field
1149 on the ``request`` instance; if ``request`` is provided, this
1150 should not be set.
1151 retry (google.api_core.retry.Retry): Designation of what errors, if any,
1152 should be retried.
1153 timeout (float): The timeout for this request.
1154 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1155 sent along with the request as metadata. Normally, each value must be of type `str`,
1156 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1157 be of type `bytes`.
1158
1159 Returns:
1160 google.cloud.bigquery_storage_v1.types.FinalizeWriteStreamResponse:
1161 Response message for FinalizeWriteStream.
1162 """
1163 # Create or coerce a protobuf request object.
1164 # - Quick check: If we got a request object, we should *not* have
1165 # gotten any keyword arguments that map to the request.
1166 flattened_params = [name]
1167 has_flattened_params = (
1168 len([param for param in flattened_params if param is not None]) > 0
1169 )
1170 if request is not None and has_flattened_params:
1171 raise ValueError(
1172 "If the `request` argument is set, then none of "
1173 "the individual field arguments should be set."
1174 )
1175
1176 # - Use the request object if provided (there's no risk of modifying the input as
1177 # there are no flattened fields), or create one.
1178 if not isinstance(request, storage.FinalizeWriteStreamRequest):
1179 request = storage.FinalizeWriteStreamRequest(request)
1180 # If we have keyword arguments corresponding to fields on the
1181 # request, apply these.
1182 if name is not None:
1183 request.name = name
1184
1185 # Wrap the RPC method; this adds retry and timeout information,
1186 # and friendly error handling.
1187 rpc = self._transport._wrapped_methods[self._transport.finalize_write_stream]
1188
1189 # Certain fields should be provided within the metadata header;
1190 # add these here.
1191 metadata = tuple(metadata) + (
1192 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
1193 )
1194
1195 # Validate the universe domain.
1196 self._validate_universe_domain()
1197
1198 # Send the request.
1199 response = rpc(
1200 request,
1201 retry=retry,
1202 timeout=timeout,
1203 metadata=metadata,
1204 )
1205
1206 # Done; return the response.
1207 return response
1208
1209 def batch_commit_write_streams(
1210 self,
1211 request: Optional[Union[storage.BatchCommitWriteStreamsRequest, dict]] = None,
1212 *,
1213 parent: Optional[str] = None,
1214 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1215 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1216 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1217 ) -> storage.BatchCommitWriteStreamsResponse:
1218 r"""Atomically commits a group of ``PENDING`` streams that belong to
1219 the same ``parent`` table.
1220
1221 Streams must be finalized before commit and cannot be committed
1222 multiple times. Once a stream is committed, data in the stream
1223 becomes available for read operations.
1224
1225 .. code-block:: python
1226
1227 # This snippet has been automatically generated and should be regarded as a
1228 # code template only.
1229 # It will require modifications to work:
1230 # - It may require correct/in-range values for request initialization.
1231 # - It may require specifying regional endpoints when creating the service
1232 # client as shown in:
1233 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1234 from google.cloud import bigquery_storage_v1
1235
1236 def sample_batch_commit_write_streams():
1237 # Create a client
1238 client = bigquery_storage_v1.BigQueryWriteClient()
1239
1240 # Initialize request argument(s)
1241 request = bigquery_storage_v1.BatchCommitWriteStreamsRequest(
1242 parent="parent_value",
1243 write_streams=['write_streams_value1', 'write_streams_value2'],
1244 )
1245
1246 # Make the request
1247 response = client.batch_commit_write_streams(request=request)
1248
1249 # Handle the response
1250 print(response)
1251
1252 Args:
1253 request (Union[google.cloud.bigquery_storage_v1.types.BatchCommitWriteStreamsRequest, dict]):
1254 The request object. Request message for ``BatchCommitWriteStreams``.
1255 parent (str):
1256 Required. Parent table that all the streams should
1257 belong to, in the form of
1258 ``projects/{project}/datasets/{dataset}/tables/{table}``.
1259
1260 This corresponds to the ``parent`` field
1261 on the ``request`` instance; if ``request`` is provided, this
1262 should not be set.
1263 retry (google.api_core.retry.Retry): Designation of what errors, if any,
1264 should be retried.
1265 timeout (float): The timeout for this request.
1266 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1267 sent along with the request as metadata. Normally, each value must be of type `str`,
1268 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1269 be of type `bytes`.
1270
1271 Returns:
1272 google.cloud.bigquery_storage_v1.types.BatchCommitWriteStreamsResponse:
1273 Response message for BatchCommitWriteStreams.
1274 """
1275 # Create or coerce a protobuf request object.
1276 # - Quick check: If we got a request object, we should *not* have
1277 # gotten any keyword arguments that map to the request.
1278 flattened_params = [parent]
1279 has_flattened_params = (
1280 len([param for param in flattened_params if param is not None]) > 0
1281 )
1282 if request is not None and has_flattened_params:
1283 raise ValueError(
1284 "If the `request` argument is set, then none of "
1285 "the individual field arguments should be set."
1286 )
1287
1288 # - Use the request object if provided (there's no risk of modifying the input as
1289 # there are no flattened fields), or create one.
1290 if not isinstance(request, storage.BatchCommitWriteStreamsRequest):
1291 request = storage.BatchCommitWriteStreamsRequest(request)
1292 # If we have keyword arguments corresponding to fields on the
1293 # request, apply these.
1294 if parent is not None:
1295 request.parent = parent
1296
1297 # Wrap the RPC method; this adds retry and timeout information,
1298 # and friendly error handling.
1299 rpc = self._transport._wrapped_methods[
1300 self._transport.batch_commit_write_streams
1301 ]
1302
1303 # Certain fields should be provided within the metadata header;
1304 # add these here.
1305 metadata = tuple(metadata) + (
1306 gapic_v1.routing_header.to_grpc_metadata((("parent", request.parent),)),
1307 )
1308
1309 # Validate the universe domain.
1310 self._validate_universe_domain()
1311
1312 # Send the request.
1313 response = rpc(
1314 request,
1315 retry=retry,
1316 timeout=timeout,
1317 metadata=metadata,
1318 )
1319
1320 # Done; return the response.
1321 return response
1322
1323 def flush_rows(
1324 self,
1325 request: Optional[Union[storage.FlushRowsRequest, dict]] = None,
1326 *,
1327 write_stream: Optional[str] = None,
1328 retry: OptionalRetry = gapic_v1.method.DEFAULT,
1329 timeout: Union[float, object] = gapic_v1.method.DEFAULT,
1330 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
1331 ) -> storage.FlushRowsResponse:
1332 r"""Flushes rows to a BUFFERED stream.
1333
1334 If users are appending rows to BUFFERED stream, flush operation
1335 is required in order for the rows to become available for
1336 reading. A Flush operation flushes up to any previously flushed
1337 offset in a BUFFERED stream, to the offset specified in the
1338 request.
1339
1340 Flush is not supported on the \_default stream, since it is not
1341 BUFFERED.
1342
1343 .. code-block:: python
1344
1345 # This snippet has been automatically generated and should be regarded as a
1346 # code template only.
1347 # It will require modifications to work:
1348 # - It may require correct/in-range values for request initialization.
1349 # - It may require specifying regional endpoints when creating the service
1350 # client as shown in:
1351 # https://googleapis.dev/python/google-api-core/latest/client_options.html
1352 from google.cloud import bigquery_storage_v1
1353
1354 def sample_flush_rows():
1355 # Create a client
1356 client = bigquery_storage_v1.BigQueryWriteClient()
1357
1358 # Initialize request argument(s)
1359 request = bigquery_storage_v1.FlushRowsRequest(
1360 write_stream="write_stream_value",
1361 )
1362
1363 # Make the request
1364 response = client.flush_rows(request=request)
1365
1366 # Handle the response
1367 print(response)
1368
1369 Args:
1370 request (Union[google.cloud.bigquery_storage_v1.types.FlushRowsRequest, dict]):
1371 The request object. Request message for ``FlushRows``.
1372 write_stream (str):
1373 Required. The stream that is the
1374 target of the flush operation.
1375
1376 This corresponds to the ``write_stream`` field
1377 on the ``request`` instance; if ``request`` is provided, this
1378 should not be set.
1379 retry (google.api_core.retry.Retry): Designation of what errors, if any,
1380 should be retried.
1381 timeout (float): The timeout for this request.
1382 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
1383 sent along with the request as metadata. Normally, each value must be of type `str`,
1384 but for metadata keys ending with the suffix `-bin`, the corresponding values must
1385 be of type `bytes`.
1386
1387 Returns:
1388 google.cloud.bigquery_storage_v1.types.FlushRowsResponse:
1389 Respond message for FlushRows.
1390 """
1391 # Create or coerce a protobuf request object.
1392 # - Quick check: If we got a request object, we should *not* have
1393 # gotten any keyword arguments that map to the request.
1394 flattened_params = [write_stream]
1395 has_flattened_params = (
1396 len([param for param in flattened_params if param is not None]) > 0
1397 )
1398 if request is not None and has_flattened_params:
1399 raise ValueError(
1400 "If the `request` argument is set, then none of "
1401 "the individual field arguments should be set."
1402 )
1403
1404 # - Use the request object if provided (there's no risk of modifying the input as
1405 # there are no flattened fields), or create one.
1406 if not isinstance(request, storage.FlushRowsRequest):
1407 request = storage.FlushRowsRequest(request)
1408 # If we have keyword arguments corresponding to fields on the
1409 # request, apply these.
1410 if write_stream is not None:
1411 request.write_stream = write_stream
1412
1413 # Wrap the RPC method; this adds retry and timeout information,
1414 # and friendly error handling.
1415 rpc = self._transport._wrapped_methods[self._transport.flush_rows]
1416
1417 # Certain fields should be provided within the metadata header;
1418 # add these here.
1419 metadata = tuple(metadata) + (
1420 gapic_v1.routing_header.to_grpc_metadata(
1421 (("write_stream", request.write_stream),)
1422 ),
1423 )
1424
1425 # Validate the universe domain.
1426 self._validate_universe_domain()
1427
1428 # Send the request.
1429 response = rpc(
1430 request,
1431 retry=retry,
1432 timeout=timeout,
1433 metadata=metadata,
1434 )
1435
1436 # Done; return the response.
1437 return response
1438
1439 def __enter__(self) -> "BigQueryWriteClient":
1440 return self
1441
1442 def __exit__(self, type, value, traceback):
1443 """Releases underlying transport's resources.
1444
1445 .. warning::
1446 ONLY use as a context manager if the transport is NOT shared
1447 with other clients! Exiting the with block will CLOSE the transport
1448 and may cause errors in other clients!
1449 """
1450 self.transport.close()
1451
1452
1453DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
1454 gapic_version=package_version.__version__
1455)
1456
1457if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER
1458 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__
1459
1460__all__ = ("BigQueryWriteClient",)