Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery_storage_v1/services/big_query_write/client.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

315 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16from collections import OrderedDict 

17from http import HTTPStatus 

18import json 

19import logging as std_logging 

20import os 

21import re 

22from typing import ( 

23 Callable, 

24 Dict, 

25 Iterable, 

26 Iterator, 

27 Mapping, 

28 MutableMapping, 

29 MutableSequence, 

30 Optional, 

31 Sequence, 

32 Tuple, 

33 Type, 

34 Union, 

35 cast, 

36) 

37import warnings 

38 

39from google.api_core import client_options as client_options_lib 

40from google.api_core import exceptions as core_exceptions 

41from google.api_core import gapic_v1 

42from google.api_core import retry as retries 

43from google.auth import credentials as ga_credentials # type: ignore 

44from google.auth.exceptions import MutualTLSChannelError # type: ignore 

45from google.auth.transport import mtls # type: ignore 

46from google.auth.transport.grpc import SslCredentials # type: ignore 

47from google.oauth2 import service_account # type: ignore 

48import google.protobuf 

49 

50from google.cloud.bigquery_storage_v1 import gapic_version as package_version 

51 

52try: 

53 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None] 

54except AttributeError: # pragma: NO COVER 

55 OptionalRetry = Union[retries.Retry, object, None] # type: ignore 

56 

57try: 

58 from google.api_core import client_logging # type: ignore 

59 

60 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

61except ImportError: # pragma: NO COVER 

62 CLIENT_LOGGING_SUPPORTED = False 

63 

64_LOGGER = std_logging.getLogger(__name__) 

65 

66from google.protobuf import timestamp_pb2 # type: ignore 

67from google.rpc import status_pb2 # type: ignore 

68 

69from google.cloud.bigquery_storage_v1.types import storage, stream, table 

70 

71from .transports.base import DEFAULT_CLIENT_INFO, BigQueryWriteTransport 

72from .transports.grpc import BigQueryWriteGrpcTransport 

73from .transports.grpc_asyncio import BigQueryWriteGrpcAsyncIOTransport 

74 

75 

76class BigQueryWriteClientMeta(type): 

77 """Metaclass for the BigQueryWrite client. 

78 

79 This provides class-level methods for building and retrieving 

80 support objects (e.g. transport) without polluting the client instance 

81 objects. 

82 """ 

83 

84 _transport_registry = OrderedDict() # type: Dict[str, Type[BigQueryWriteTransport]] 

85 _transport_registry["grpc"] = BigQueryWriteGrpcTransport 

86 _transport_registry["grpc_asyncio"] = BigQueryWriteGrpcAsyncIOTransport 

87 

88 def get_transport_class( 

89 cls, 

90 label: Optional[str] = None, 

91 ) -> Type[BigQueryWriteTransport]: 

92 """Returns an appropriate transport class. 

93 

94 Args: 

95 label: The name of the desired transport. If none is 

96 provided, then the first transport in the registry is used. 

97 

98 Returns: 

99 The transport class to use. 

100 """ 

101 # If a specific transport is requested, return that one. 

102 if label: 

103 return cls._transport_registry[label] 

104 

105 # No transport is requested; return the default (that is, the first one 

106 # in the dictionary). 

107 return next(iter(cls._transport_registry.values())) 

108 

109 

110class BigQueryWriteClient(metaclass=BigQueryWriteClientMeta): 

111 """BigQuery Write API. 

112 

113 The Write API can be used to write data to BigQuery. 

114 

115 For supplementary information about the Write API, see: 

116 

117 https://cloud.google.com/bigquery/docs/write-api 

118 """ 

119 

120 @staticmethod 

121 def _get_default_mtls_endpoint(api_endpoint): 

122 """Converts api endpoint to mTLS endpoint. 

123 

124 Convert "*.sandbox.googleapis.com" and "*.googleapis.com" to 

125 "*.mtls.sandbox.googleapis.com" and "*.mtls.googleapis.com" respectively. 

126 Args: 

127 api_endpoint (Optional[str]): the api endpoint to convert. 

128 Returns: 

129 str: converted mTLS api endpoint. 

130 """ 

131 if not api_endpoint: 

132 return api_endpoint 

133 

134 mtls_endpoint_re = re.compile( 

135 r"(?P<name>[^.]+)(?P<mtls>\.mtls)?(?P<sandbox>\.sandbox)?(?P<googledomain>\.googleapis\.com)?" 

136 ) 

137 

138 m = mtls_endpoint_re.match(api_endpoint) 

139 name, mtls, sandbox, googledomain = m.groups() 

140 if mtls or not googledomain: 

141 return api_endpoint 

142 

143 if sandbox: 

144 return api_endpoint.replace( 

145 "sandbox.googleapis.com", "mtls.sandbox.googleapis.com" 

146 ) 

147 

148 return api_endpoint.replace(".googleapis.com", ".mtls.googleapis.com") 

149 

150 # Note: DEFAULT_ENDPOINT is deprecated. Use _DEFAULT_ENDPOINT_TEMPLATE instead. 

151 DEFAULT_ENDPOINT = "bigquerystorage.googleapis.com" 

152 DEFAULT_MTLS_ENDPOINT = _get_default_mtls_endpoint.__func__( # type: ignore 

153 DEFAULT_ENDPOINT 

154 ) 

155 

156 _DEFAULT_ENDPOINT_TEMPLATE = "bigquerystorage.{UNIVERSE_DOMAIN}" 

157 _DEFAULT_UNIVERSE = "googleapis.com" 

158 

159 @classmethod 

160 def from_service_account_info(cls, info: dict, *args, **kwargs): 

161 """Creates an instance of this client using the provided credentials 

162 info. 

163 

164 Args: 

165 info (dict): The service account private key info. 

166 args: Additional arguments to pass to the constructor. 

167 kwargs: Additional arguments to pass to the constructor. 

168 

169 Returns: 

170 BigQueryWriteClient: The constructed client. 

171 """ 

172 credentials = service_account.Credentials.from_service_account_info(info) 

173 kwargs["credentials"] = credentials 

174 return cls(*args, **kwargs) 

175 

176 @classmethod 

177 def from_service_account_file(cls, filename: str, *args, **kwargs): 

178 """Creates an instance of this client using the provided credentials 

179 file. 

180 

181 Args: 

182 filename (str): The path to the service account private key json 

183 file. 

184 args: Additional arguments to pass to the constructor. 

185 kwargs: Additional arguments to pass to the constructor. 

186 

187 Returns: 

188 BigQueryWriteClient: The constructed client. 

189 """ 

190 credentials = service_account.Credentials.from_service_account_file(filename) 

191 kwargs["credentials"] = credentials 

192 return cls(*args, **kwargs) 

193 

194 from_service_account_json = from_service_account_file 

195 

196 @property 

197 def transport(self) -> BigQueryWriteTransport: 

198 """Returns the transport used by the client instance. 

199 

200 Returns: 

201 BigQueryWriteTransport: The transport used by the client 

202 instance. 

203 """ 

204 return self._transport 

205 

206 @staticmethod 

207 def table_path( 

208 project: str, 

209 dataset: str, 

210 table: str, 

211 ) -> str: 

212 """Returns a fully-qualified table string.""" 

213 return "projects/{project}/datasets/{dataset}/tables/{table}".format( 

214 project=project, 

215 dataset=dataset, 

216 table=table, 

217 ) 

218 

219 @staticmethod 

220 def parse_table_path(path: str) -> Dict[str, str]: 

221 """Parses a table path into its component segments.""" 

222 m = re.match( 

223 r"^projects/(?P<project>.+?)/datasets/(?P<dataset>.+?)/tables/(?P<table>.+?)$", 

224 path, 

225 ) 

226 return m.groupdict() if m else {} 

227 

228 @staticmethod 

229 def write_stream_path( 

230 project: str, 

231 dataset: str, 

232 table: str, 

233 stream: str, 

234 ) -> str: 

235 """Returns a fully-qualified write_stream string.""" 

236 return "projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}".format( 

237 project=project, 

238 dataset=dataset, 

239 table=table, 

240 stream=stream, 

241 ) 

242 

243 @staticmethod 

244 def parse_write_stream_path(path: str) -> Dict[str, str]: 

245 """Parses a write_stream path into its component segments.""" 

246 m = re.match( 

247 r"^projects/(?P<project>.+?)/datasets/(?P<dataset>.+?)/tables/(?P<table>.+?)/streams/(?P<stream>.+?)$", 

248 path, 

249 ) 

250 return m.groupdict() if m else {} 

251 

252 @staticmethod 

253 def common_billing_account_path( 

254 billing_account: str, 

255 ) -> str: 

256 """Returns a fully-qualified billing_account string.""" 

257 return "billingAccounts/{billing_account}".format( 

258 billing_account=billing_account, 

259 ) 

260 

261 @staticmethod 

262 def parse_common_billing_account_path(path: str) -> Dict[str, str]: 

263 """Parse a billing_account path into its component segments.""" 

264 m = re.match(r"^billingAccounts/(?P<billing_account>.+?)$", path) 

265 return m.groupdict() if m else {} 

266 

267 @staticmethod 

268 def common_folder_path( 

269 folder: str, 

270 ) -> str: 

271 """Returns a fully-qualified folder string.""" 

272 return "folders/{folder}".format( 

273 folder=folder, 

274 ) 

275 

276 @staticmethod 

277 def parse_common_folder_path(path: str) -> Dict[str, str]: 

278 """Parse a folder path into its component segments.""" 

279 m = re.match(r"^folders/(?P<folder>.+?)$", path) 

280 return m.groupdict() if m else {} 

281 

282 @staticmethod 

283 def common_organization_path( 

284 organization: str, 

285 ) -> str: 

286 """Returns a fully-qualified organization string.""" 

287 return "organizations/{organization}".format( 

288 organization=organization, 

289 ) 

290 

291 @staticmethod 

292 def parse_common_organization_path(path: str) -> Dict[str, str]: 

293 """Parse a organization path into its component segments.""" 

294 m = re.match(r"^organizations/(?P<organization>.+?)$", path) 

295 return m.groupdict() if m else {} 

296 

297 @staticmethod 

298 def common_project_path( 

299 project: str, 

300 ) -> str: 

301 """Returns a fully-qualified project string.""" 

302 return "projects/{project}".format( 

303 project=project, 

304 ) 

305 

306 @staticmethod 

307 def parse_common_project_path(path: str) -> Dict[str, str]: 

308 """Parse a project path into its component segments.""" 

309 m = re.match(r"^projects/(?P<project>.+?)$", path) 

310 return m.groupdict() if m else {} 

311 

312 @staticmethod 

313 def common_location_path( 

314 project: str, 

315 location: str, 

316 ) -> str: 

317 """Returns a fully-qualified location string.""" 

318 return "projects/{project}/locations/{location}".format( 

319 project=project, 

320 location=location, 

321 ) 

322 

323 @staticmethod 

324 def parse_common_location_path(path: str) -> Dict[str, str]: 

325 """Parse a location path into its component segments.""" 

326 m = re.match(r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)$", path) 

327 return m.groupdict() if m else {} 

328 

329 @classmethod 

330 def get_mtls_endpoint_and_cert_source( 

331 cls, client_options: Optional[client_options_lib.ClientOptions] = None 

332 ): 

333 """Deprecated. Return the API endpoint and client cert source for mutual TLS. 

334 

335 The client cert source is determined in the following order: 

336 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the 

337 client cert source is None. 

338 (2) if `client_options.client_cert_source` is provided, use the provided one; if the 

339 default client cert source exists, use the default one; otherwise the client cert 

340 source is None. 

341 

342 The API endpoint is determined in the following order: 

343 (1) if `client_options.api_endpoint` if provided, use the provided one. 

344 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the 

345 default mTLS endpoint; if the environment variable is "never", use the default API 

346 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise 

347 use the default API endpoint. 

348 

349 More details can be found at https://google.aip.dev/auth/4114. 

350 

351 Args: 

352 client_options (google.api_core.client_options.ClientOptions): Custom options for the 

353 client. Only the `api_endpoint` and `client_cert_source` properties may be used 

354 in this method. 

355 

356 Returns: 

357 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the 

358 client cert source to use. 

359 

360 Raises: 

361 google.auth.exceptions.MutualTLSChannelError: If any errors happen. 

362 """ 

363 

364 warnings.warn( 

365 "get_mtls_endpoint_and_cert_source is deprecated. Use the api_endpoint property instead.", 

366 DeprecationWarning, 

367 ) 

368 if client_options is None: 

369 client_options = client_options_lib.ClientOptions() 

370 use_client_cert = os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false") 

371 use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto") 

372 if use_client_cert not in ("true", "false"): 

373 raise ValueError( 

374 "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`" 

375 ) 

376 if use_mtls_endpoint not in ("auto", "never", "always"): 

377 raise MutualTLSChannelError( 

378 "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`" 

379 ) 

380 

381 # Figure out the client cert source to use. 

382 client_cert_source = None 

383 if use_client_cert == "true": 

384 if client_options.client_cert_source: 

385 client_cert_source = client_options.client_cert_source 

386 elif mtls.has_default_client_cert_source(): 

387 client_cert_source = mtls.default_client_cert_source() 

388 

389 # Figure out which api endpoint to use. 

390 if client_options.api_endpoint is not None: 

391 api_endpoint = client_options.api_endpoint 

392 elif use_mtls_endpoint == "always" or ( 

393 use_mtls_endpoint == "auto" and client_cert_source 

394 ): 

395 api_endpoint = cls.DEFAULT_MTLS_ENDPOINT 

396 else: 

397 api_endpoint = cls.DEFAULT_ENDPOINT 

398 

399 return api_endpoint, client_cert_source 

400 

401 @staticmethod 

402 def _read_environment_variables(): 

403 """Returns the environment variables used by the client. 

404 

405 Returns: 

406 Tuple[bool, str, str]: returns the GOOGLE_API_USE_CLIENT_CERTIFICATE, 

407 GOOGLE_API_USE_MTLS_ENDPOINT, and GOOGLE_CLOUD_UNIVERSE_DOMAIN environment variables. 

408 

409 Raises: 

410 ValueError: If GOOGLE_API_USE_CLIENT_CERTIFICATE is not 

411 any of ["true", "false"]. 

412 google.auth.exceptions.MutualTLSChannelError: If GOOGLE_API_USE_MTLS_ENDPOINT 

413 is not any of ["auto", "never", "always"]. 

414 """ 

415 use_client_cert = os.getenv( 

416 "GOOGLE_API_USE_CLIENT_CERTIFICATE", "false" 

417 ).lower() 

418 use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto").lower() 

419 universe_domain_env = os.getenv("GOOGLE_CLOUD_UNIVERSE_DOMAIN") 

420 if use_client_cert not in ("true", "false"): 

421 raise ValueError( 

422 "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`" 

423 ) 

424 if use_mtls_endpoint not in ("auto", "never", "always"): 

425 raise MutualTLSChannelError( 

426 "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`" 

427 ) 

428 return use_client_cert == "true", use_mtls_endpoint, universe_domain_env 

429 

430 @staticmethod 

431 def _get_client_cert_source(provided_cert_source, use_cert_flag): 

432 """Return the client cert source to be used by the client. 

433 

434 Args: 

435 provided_cert_source (bytes): The client certificate source provided. 

436 use_cert_flag (bool): A flag indicating whether to use the client certificate. 

437 

438 Returns: 

439 bytes or None: The client cert source to be used by the client. 

440 """ 

441 client_cert_source = None 

442 if use_cert_flag: 

443 if provided_cert_source: 

444 client_cert_source = provided_cert_source 

445 elif mtls.has_default_client_cert_source(): 

446 client_cert_source = mtls.default_client_cert_source() 

447 return client_cert_source 

448 

449 @staticmethod 

450 def _get_api_endpoint( 

451 api_override, client_cert_source, universe_domain, use_mtls_endpoint 

452 ): 

453 """Return the API endpoint used by the client. 

454 

455 Args: 

456 api_override (str): The API endpoint override. If specified, this is always 

457 the return value of this function and the other arguments are not used. 

458 client_cert_source (bytes): The client certificate source used by the client. 

459 universe_domain (str): The universe domain used by the client. 

460 use_mtls_endpoint (str): How to use the mTLS endpoint, which depends also on the other parameters. 

461 Possible values are "always", "auto", or "never". 

462 

463 Returns: 

464 str: The API endpoint to be used by the client. 

465 """ 

466 if api_override is not None: 

467 api_endpoint = api_override 

468 elif use_mtls_endpoint == "always" or ( 

469 use_mtls_endpoint == "auto" and client_cert_source 

470 ): 

471 _default_universe = BigQueryWriteClient._DEFAULT_UNIVERSE 

472 if universe_domain != _default_universe: 

473 raise MutualTLSChannelError( 

474 f"mTLS is not supported in any universe other than {_default_universe}." 

475 ) 

476 api_endpoint = BigQueryWriteClient.DEFAULT_MTLS_ENDPOINT 

477 else: 

478 api_endpoint = BigQueryWriteClient._DEFAULT_ENDPOINT_TEMPLATE.format( 

479 UNIVERSE_DOMAIN=universe_domain 

480 ) 

481 return api_endpoint 

482 

483 @staticmethod 

484 def _get_universe_domain( 

485 client_universe_domain: Optional[str], universe_domain_env: Optional[str] 

486 ) -> str: 

487 """Return the universe domain used by the client. 

488 

489 Args: 

490 client_universe_domain (Optional[str]): The universe domain configured via the client options. 

491 universe_domain_env (Optional[str]): The universe domain configured via the "GOOGLE_CLOUD_UNIVERSE_DOMAIN" environment variable. 

492 

493 Returns: 

494 str: The universe domain to be used by the client. 

495 

496 Raises: 

497 ValueError: If the universe domain is an empty string. 

498 """ 

499 universe_domain = BigQueryWriteClient._DEFAULT_UNIVERSE 

500 if client_universe_domain is not None: 

501 universe_domain = client_universe_domain 

502 elif universe_domain_env is not None: 

503 universe_domain = universe_domain_env 

504 if len(universe_domain.strip()) == 0: 

505 raise ValueError("Universe Domain cannot be an empty string.") 

506 return universe_domain 

507 

508 def _validate_universe_domain(self): 

509 """Validates client's and credentials' universe domains are consistent. 

510 

511 Returns: 

512 bool: True iff the configured universe domain is valid. 

513 

514 Raises: 

515 ValueError: If the configured universe domain is not valid. 

516 """ 

517 

518 # NOTE (b/349488459): universe validation is disabled until further notice. 

519 return True 

520 

521 def _add_cred_info_for_auth_errors( 

522 self, error: core_exceptions.GoogleAPICallError 

523 ) -> None: 

524 """Adds credential info string to error details for 401/403/404 errors. 

525 

526 Args: 

527 error (google.api_core.exceptions.GoogleAPICallError): The error to add the cred info. 

528 """ 

529 if error.code not in [ 

530 HTTPStatus.UNAUTHORIZED, 

531 HTTPStatus.FORBIDDEN, 

532 HTTPStatus.NOT_FOUND, 

533 ]: 

534 return 

535 

536 cred = self._transport._credentials 

537 

538 # get_cred_info is only available in google-auth>=2.35.0 

539 if not hasattr(cred, "get_cred_info"): 

540 return 

541 

542 # ignore the type check since pypy test fails when get_cred_info 

543 # is not available 

544 cred_info = cred.get_cred_info() # type: ignore 

545 if cred_info and hasattr(error._details, "append"): 

546 error._details.append(json.dumps(cred_info)) 

547 

548 @property 

549 def api_endpoint(self): 

550 """Return the API endpoint used by the client instance. 

551 

552 Returns: 

553 str: The API endpoint used by the client instance. 

554 """ 

555 return self._api_endpoint 

556 

557 @property 

558 def universe_domain(self) -> str: 

559 """Return the universe domain used by the client instance. 

560 

561 Returns: 

562 str: The universe domain used by the client instance. 

563 """ 

564 return self._universe_domain 

565 

566 def __init__( 

567 self, 

568 *, 

569 credentials: Optional[ga_credentials.Credentials] = None, 

570 transport: Optional[ 

571 Union[str, BigQueryWriteTransport, Callable[..., BigQueryWriteTransport]] 

572 ] = None, 

573 client_options: Optional[Union[client_options_lib.ClientOptions, dict]] = None, 

574 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

575 ) -> None: 

576 """Instantiates the big query write client. 

577 

578 Args: 

579 credentials (Optional[google.auth.credentials.Credentials]): The 

580 authorization credentials to attach to requests. These 

581 credentials identify the application to the service; if none 

582 are specified, the client will attempt to ascertain the 

583 credentials from the environment. 

584 transport (Optional[Union[str,BigQueryWriteTransport,Callable[..., BigQueryWriteTransport]]]): 

585 The transport to use, or a Callable that constructs and returns a new transport. 

586 If a Callable is given, it will be called with the same set of initialization 

587 arguments as used in the BigQueryWriteTransport constructor. 

588 If set to None, a transport is chosen automatically. 

589 client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]): 

590 Custom options for the client. 

591 

592 1. The ``api_endpoint`` property can be used to override the 

593 default endpoint provided by the client when ``transport`` is 

594 not explicitly provided. Only if this property is not set and 

595 ``transport`` was not explicitly provided, the endpoint is 

596 determined by the GOOGLE_API_USE_MTLS_ENDPOINT environment 

597 variable, which have one of the following values: 

598 "always" (always use the default mTLS endpoint), "never" (always 

599 use the default regular endpoint) and "auto" (auto-switch to the 

600 default mTLS endpoint if client certificate is present; this is 

601 the default value). 

602 

603 2. If the GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable 

604 is "true", then the ``client_cert_source`` property can be used 

605 to provide a client certificate for mTLS transport. If 

606 not provided, the default SSL client certificate will be used if 

607 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not 

608 set, no client certificate will be used. 

609 

610 3. The ``universe_domain`` property can be used to override the 

611 default "googleapis.com" universe. Note that the ``api_endpoint`` 

612 property still takes precedence; and ``universe_domain`` is 

613 currently not supported for mTLS. 

614 

615 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

616 The client info used to send a user-agent string along with 

617 API requests. If ``None``, then default info will be used. 

618 Generally, you only need to set this if you're developing 

619 your own client library. 

620 

621 Raises: 

622 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport 

623 creation failed for any reason. 

624 """ 

625 self._client_options = client_options 

626 if isinstance(self._client_options, dict): 

627 self._client_options = client_options_lib.from_dict(self._client_options) 

628 if self._client_options is None: 

629 self._client_options = client_options_lib.ClientOptions() 

630 self._client_options = cast( 

631 client_options_lib.ClientOptions, self._client_options 

632 ) 

633 

634 universe_domain_opt = getattr(self._client_options, "universe_domain", None) 

635 

636 ( 

637 self._use_client_cert, 

638 self._use_mtls_endpoint, 

639 self._universe_domain_env, 

640 ) = BigQueryWriteClient._read_environment_variables() 

641 self._client_cert_source = BigQueryWriteClient._get_client_cert_source( 

642 self._client_options.client_cert_source, self._use_client_cert 

643 ) 

644 self._universe_domain = BigQueryWriteClient._get_universe_domain( 

645 universe_domain_opt, self._universe_domain_env 

646 ) 

647 self._api_endpoint = None # updated below, depending on `transport` 

648 

649 # Initialize the universe domain validation. 

650 self._is_universe_domain_valid = False 

651 

652 if CLIENT_LOGGING_SUPPORTED: # pragma: NO COVER 

653 # Setup logging. 

654 client_logging.initialize_logging() 

655 

656 api_key_value = getattr(self._client_options, "api_key", None) 

657 if api_key_value and credentials: 

658 raise ValueError( 

659 "client_options.api_key and credentials are mutually exclusive" 

660 ) 

661 

662 # Save or instantiate the transport. 

663 # Ordinarily, we provide the transport, but allowing a custom transport 

664 # instance provides an extensibility point for unusual situations. 

665 transport_provided = isinstance(transport, BigQueryWriteTransport) 

666 if transport_provided: 

667 # transport is a BigQueryWriteTransport instance. 

668 if credentials or self._client_options.credentials_file or api_key_value: 

669 raise ValueError( 

670 "When providing a transport instance, " 

671 "provide its credentials directly." 

672 ) 

673 if self._client_options.scopes: 

674 raise ValueError( 

675 "When providing a transport instance, provide its scopes " 

676 "directly." 

677 ) 

678 self._transport = cast(BigQueryWriteTransport, transport) 

679 self._api_endpoint = self._transport.host 

680 

681 self._api_endpoint = ( 

682 self._api_endpoint 

683 or BigQueryWriteClient._get_api_endpoint( 

684 self._client_options.api_endpoint, 

685 self._client_cert_source, 

686 self._universe_domain, 

687 self._use_mtls_endpoint, 

688 ) 

689 ) 

690 

691 if not transport_provided: 

692 import google.auth._default # type: ignore 

693 

694 if api_key_value and hasattr( 

695 google.auth._default, "get_api_key_credentials" 

696 ): 

697 credentials = google.auth._default.get_api_key_credentials( 

698 api_key_value 

699 ) 

700 

701 transport_init: Union[ 

702 Type[BigQueryWriteTransport], Callable[..., BigQueryWriteTransport] 

703 ] = ( 

704 BigQueryWriteClient.get_transport_class(transport) 

705 if isinstance(transport, str) or transport is None 

706 else cast(Callable[..., BigQueryWriteTransport], transport) 

707 ) 

708 # initialize with the provided callable or the passed in class 

709 self._transport = transport_init( 

710 credentials=credentials, 

711 credentials_file=self._client_options.credentials_file, 

712 host=self._api_endpoint, 

713 scopes=self._client_options.scopes, 

714 client_cert_source_for_mtls=self._client_cert_source, 

715 quota_project_id=self._client_options.quota_project_id, 

716 client_info=client_info, 

717 always_use_jwt_access=True, 

718 api_audience=self._client_options.api_audience, 

719 ) 

720 

721 if "async" not in str(self._transport): 

722 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

723 std_logging.DEBUG 

724 ): # pragma: NO COVER 

725 _LOGGER.debug( 

726 "Created client `google.cloud.bigquery.storage_v1.BigQueryWriteClient`.", 

727 extra={ 

728 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryWrite", 

729 "universeDomain": getattr( 

730 self._transport._credentials, "universe_domain", "" 

731 ), 

732 "credentialsType": f"{type(self._transport._credentials).__module__}.{type(self._transport._credentials).__qualname__}", 

733 "credentialsInfo": getattr( 

734 self.transport._credentials, "get_cred_info", lambda: None 

735 )(), 

736 } 

737 if hasattr(self._transport, "_credentials") 

738 else { 

739 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryWrite", 

740 "credentialsType": None, 

741 }, 

742 ) 

743 

744 def create_write_stream( 

745 self, 

746 request: Optional[Union[storage.CreateWriteStreamRequest, dict]] = None, 

747 *, 

748 parent: Optional[str] = None, 

749 write_stream: Optional[stream.WriteStream] = None, 

750 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

751 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

752 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

753 ) -> stream.WriteStream: 

754 r"""Creates a write stream to the given table. Additionally, every 

755 table has a special stream named '_default' to which data can be 

756 written. This stream doesn't need to be created using 

757 CreateWriteStream. It is a stream that can be used 

758 simultaneously by any number of clients. Data written to this 

759 stream is considered committed as soon as an acknowledgement is 

760 received. 

761 

762 .. code-block:: python 

763 

764 # This snippet has been automatically generated and should be regarded as a 

765 # code template only. 

766 # It will require modifications to work: 

767 # - It may require correct/in-range values for request initialization. 

768 # - It may require specifying regional endpoints when creating the service 

769 # client as shown in: 

770 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

771 from google.cloud import bigquery_storage_v1 

772 

773 def sample_create_write_stream(): 

774 # Create a client 

775 client = bigquery_storage_v1.BigQueryWriteClient() 

776 

777 # Initialize request argument(s) 

778 request = bigquery_storage_v1.CreateWriteStreamRequest( 

779 parent="parent_value", 

780 ) 

781 

782 # Make the request 

783 response = client.create_write_stream(request=request) 

784 

785 # Handle the response 

786 print(response) 

787 

788 Args: 

789 request (Union[google.cloud.bigquery_storage_v1.types.CreateWriteStreamRequest, dict]): 

790 The request object. Request message for ``CreateWriteStream``. 

791 parent (str): 

792 Required. Reference to the table to which the stream 

793 belongs, in the format of 

794 ``projects/{project}/datasets/{dataset}/tables/{table}``. 

795 

796 This corresponds to the ``parent`` field 

797 on the ``request`` instance; if ``request`` is provided, this 

798 should not be set. 

799 write_stream (google.cloud.bigquery_storage_v1.types.WriteStream): 

800 Required. Stream to be created. 

801 This corresponds to the ``write_stream`` field 

802 on the ``request`` instance; if ``request`` is provided, this 

803 should not be set. 

804 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

805 should be retried. 

806 timeout (float): The timeout for this request. 

807 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

808 sent along with the request as metadata. Normally, each value must be of type `str`, 

809 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

810 be of type `bytes`. 

811 

812 Returns: 

813 google.cloud.bigquery_storage_v1.types.WriteStream: 

814 Information about a single stream 

815 that gets data inside the storage 

816 system. 

817 

818 """ 

819 # Create or coerce a protobuf request object. 

820 # - Quick check: If we got a request object, we should *not* have 

821 # gotten any keyword arguments that map to the request. 

822 flattened_params = [parent, write_stream] 

823 has_flattened_params = ( 

824 len([param for param in flattened_params if param is not None]) > 0 

825 ) 

826 if request is not None and has_flattened_params: 

827 raise ValueError( 

828 "If the `request` argument is set, then none of " 

829 "the individual field arguments should be set." 

830 ) 

831 

832 # - Use the request object if provided (there's no risk of modifying the input as 

833 # there are no flattened fields), or create one. 

834 if not isinstance(request, storage.CreateWriteStreamRequest): 

835 request = storage.CreateWriteStreamRequest(request) 

836 # If we have keyword arguments corresponding to fields on the 

837 # request, apply these. 

838 if parent is not None: 

839 request.parent = parent 

840 if write_stream is not None: 

841 request.write_stream = write_stream 

842 

843 # Wrap the RPC method; this adds retry and timeout information, 

844 # and friendly error handling. 

845 rpc = self._transport._wrapped_methods[self._transport.create_write_stream] 

846 

847 # Certain fields should be provided within the metadata header; 

848 # add these here. 

849 metadata = tuple(metadata) + ( 

850 gapic_v1.routing_header.to_grpc_metadata((("parent", request.parent),)), 

851 ) 

852 

853 # Validate the universe domain. 

854 self._validate_universe_domain() 

855 

856 # Send the request. 

857 response = rpc( 

858 request, 

859 retry=retry, 

860 timeout=timeout, 

861 metadata=metadata, 

862 ) 

863 

864 # Done; return the response. 

865 return response 

866 

867 def append_rows( 

868 self, 

869 requests: Optional[Iterator[storage.AppendRowsRequest]] = None, 

870 *, 

871 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

872 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

873 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

874 ) -> Iterable[storage.AppendRowsResponse]: 

875 r"""Appends data to the given stream. 

876 

877 If ``offset`` is specified, the ``offset`` is checked against 

878 the end of stream. The server returns ``OUT_OF_RANGE`` in 

879 ``AppendRowsResponse`` if an attempt is made to append to an 

880 offset beyond the current end of the stream or 

881 ``ALREADY_EXISTS`` if user provides an ``offset`` that has 

882 already been written to. User can retry with adjusted offset 

883 within the same RPC connection. If ``offset`` is not specified, 

884 append happens at the end of the stream. 

885 

886 The response contains an optional offset at which the append 

887 happened. No offset information will be returned for appends to 

888 a default stream. 

889 

890 Responses are received in the same order in which requests are 

891 sent. There will be one response for each successful inserted 

892 request. Responses may optionally embed error information if the 

893 originating AppendRequest was not successfully processed. 

894 

895 The specifics of when successfully appended data is made visible 

896 to the table are governed by the type of stream: 

897 

898 - For COMMITTED streams (which includes the default stream), 

899 data is visible immediately upon successful append. 

900 

901 - For BUFFERED streams, data is made visible via a subsequent 

902 ``FlushRows`` rpc which advances a cursor to a newer offset 

903 in the stream. 

904 

905 - For PENDING streams, data is not made visible until the 

906 stream itself is finalized (via the ``FinalizeWriteStream`` 

907 rpc), and the stream is explicitly committed via the 

908 ``BatchCommitWriteStreams`` rpc. 

909 

910 .. code-block:: python 

911 

912 # This snippet has been automatically generated and should be regarded as a 

913 # code template only. 

914 # It will require modifications to work: 

915 # - It may require correct/in-range values for request initialization. 

916 # - It may require specifying regional endpoints when creating the service 

917 # client as shown in: 

918 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

919 from google.cloud import bigquery_storage_v1 

920 

921 def sample_append_rows(): 

922 # Create a client 

923 client = bigquery_storage_v1.BigQueryWriteClient() 

924 

925 # Initialize request argument(s) 

926 request = bigquery_storage_v1.AppendRowsRequest( 

927 write_stream="write_stream_value", 

928 ) 

929 

930 # This method expects an iterator which contains 

931 # 'bigquery_storage_v1.AppendRowsRequest' objects 

932 # Here we create a generator that yields a single `request` for 

933 # demonstrative purposes. 

934 requests = [request] 

935 

936 def request_generator(): 

937 for request in requests: 

938 yield request 

939 

940 # Make the request 

941 stream = client.append_rows(requests=request_generator()) 

942 

943 # Handle the response 

944 for response in stream: 

945 print(response) 

946 

947 Args: 

948 requests (Iterator[google.cloud.bigquery_storage_v1.types.AppendRowsRequest]): 

949 The request object iterator. Request message for ``AppendRows``. 

950 

951 Because AppendRows is a bidirectional streaming RPC, 

952 certain parts of the AppendRowsRequest need only be 

953 specified for the first request before switching table 

954 destinations. You can also switch table destinations 

955 within the same connection for the default stream. 

956 

957 The size of a single AppendRowsRequest must be less than 

958 10 MB in size. Requests larger than this return an 

959 error, typically ``INVALID_ARGUMENT``. 

960 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

961 should be retried. 

962 timeout (float): The timeout for this request. 

963 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

964 sent along with the request as metadata. Normally, each value must be of type `str`, 

965 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

966 be of type `bytes`. 

967 

968 Returns: 

969 Iterable[google.cloud.bigquery_storage_v1.types.AppendRowsResponse]: 

970 Response message for AppendRows. 

971 """ 

972 

973 # Wrap the RPC method; this adds retry and timeout information, 

974 # and friendly error handling. 

975 rpc = self._transport._wrapped_methods[self._transport.append_rows] 

976 

977 # Certain fields should be provided within the metadata header; 

978 # add these here. 

979 metadata = tuple(metadata) + (gapic_v1.routing_header.to_grpc_metadata(()),) 

980 

981 # Validate the universe domain. 

982 self._validate_universe_domain() 

983 

984 # Send the request. 

985 response = rpc( 

986 requests, 

987 retry=retry, 

988 timeout=timeout, 

989 metadata=metadata, 

990 ) 

991 

992 # Done; return the response. 

993 return response 

994 

995 def get_write_stream( 

996 self, 

997 request: Optional[Union[storage.GetWriteStreamRequest, dict]] = None, 

998 *, 

999 name: Optional[str] = None, 

1000 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

1001 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

1002 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

1003 ) -> stream.WriteStream: 

1004 r"""Gets information about a write stream. 

1005 

1006 .. code-block:: python 

1007 

1008 # This snippet has been automatically generated and should be regarded as a 

1009 # code template only. 

1010 # It will require modifications to work: 

1011 # - It may require correct/in-range values for request initialization. 

1012 # - It may require specifying regional endpoints when creating the service 

1013 # client as shown in: 

1014 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

1015 from google.cloud import bigquery_storage_v1 

1016 

1017 def sample_get_write_stream(): 

1018 # Create a client 

1019 client = bigquery_storage_v1.BigQueryWriteClient() 

1020 

1021 # Initialize request argument(s) 

1022 request = bigquery_storage_v1.GetWriteStreamRequest( 

1023 name="name_value", 

1024 ) 

1025 

1026 # Make the request 

1027 response = client.get_write_stream(request=request) 

1028 

1029 # Handle the response 

1030 print(response) 

1031 

1032 Args: 

1033 request (Union[google.cloud.bigquery_storage_v1.types.GetWriteStreamRequest, dict]): 

1034 The request object. Request message for ``GetWriteStreamRequest``. 

1035 name (str): 

1036 Required. Name of the stream to get, in the form of 

1037 ``projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}``. 

1038 

1039 This corresponds to the ``name`` field 

1040 on the ``request`` instance; if ``request`` is provided, this 

1041 should not be set. 

1042 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

1043 should be retried. 

1044 timeout (float): The timeout for this request. 

1045 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

1046 sent along with the request as metadata. Normally, each value must be of type `str`, 

1047 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

1048 be of type `bytes`. 

1049 

1050 Returns: 

1051 google.cloud.bigquery_storage_v1.types.WriteStream: 

1052 Information about a single stream 

1053 that gets data inside the storage 

1054 system. 

1055 

1056 """ 

1057 # Create or coerce a protobuf request object. 

1058 # - Quick check: If we got a request object, we should *not* have 

1059 # gotten any keyword arguments that map to the request. 

1060 flattened_params = [name] 

1061 has_flattened_params = ( 

1062 len([param for param in flattened_params if param is not None]) > 0 

1063 ) 

1064 if request is not None and has_flattened_params: 

1065 raise ValueError( 

1066 "If the `request` argument is set, then none of " 

1067 "the individual field arguments should be set." 

1068 ) 

1069 

1070 # - Use the request object if provided (there's no risk of modifying the input as 

1071 # there are no flattened fields), or create one. 

1072 if not isinstance(request, storage.GetWriteStreamRequest): 

1073 request = storage.GetWriteStreamRequest(request) 

1074 # If we have keyword arguments corresponding to fields on the 

1075 # request, apply these. 

1076 if name is not None: 

1077 request.name = name 

1078 

1079 # Wrap the RPC method; this adds retry and timeout information, 

1080 # and friendly error handling. 

1081 rpc = self._transport._wrapped_methods[self._transport.get_write_stream] 

1082 

1083 # Certain fields should be provided within the metadata header; 

1084 # add these here. 

1085 metadata = tuple(metadata) + ( 

1086 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), 

1087 ) 

1088 

1089 # Validate the universe domain. 

1090 self._validate_universe_domain() 

1091 

1092 # Send the request. 

1093 response = rpc( 

1094 request, 

1095 retry=retry, 

1096 timeout=timeout, 

1097 metadata=metadata, 

1098 ) 

1099 

1100 # Done; return the response. 

1101 return response 

1102 

1103 def finalize_write_stream( 

1104 self, 

1105 request: Optional[Union[storage.FinalizeWriteStreamRequest, dict]] = None, 

1106 *, 

1107 name: Optional[str] = None, 

1108 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

1109 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

1110 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

1111 ) -> storage.FinalizeWriteStreamResponse: 

1112 r"""Finalize a write stream so that no new data can be appended to 

1113 the stream. Finalize is not supported on the '_default' stream. 

1114 

1115 .. code-block:: python 

1116 

1117 # This snippet has been automatically generated and should be regarded as a 

1118 # code template only. 

1119 # It will require modifications to work: 

1120 # - It may require correct/in-range values for request initialization. 

1121 # - It may require specifying regional endpoints when creating the service 

1122 # client as shown in: 

1123 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

1124 from google.cloud import bigquery_storage_v1 

1125 

1126 def sample_finalize_write_stream(): 

1127 # Create a client 

1128 client = bigquery_storage_v1.BigQueryWriteClient() 

1129 

1130 # Initialize request argument(s) 

1131 request = bigquery_storage_v1.FinalizeWriteStreamRequest( 

1132 name="name_value", 

1133 ) 

1134 

1135 # Make the request 

1136 response = client.finalize_write_stream(request=request) 

1137 

1138 # Handle the response 

1139 print(response) 

1140 

1141 Args: 

1142 request (Union[google.cloud.bigquery_storage_v1.types.FinalizeWriteStreamRequest, dict]): 

1143 The request object. Request message for invoking ``FinalizeWriteStream``. 

1144 name (str): 

1145 Required. Name of the stream to finalize, in the form of 

1146 ``projects/{project}/datasets/{dataset}/tables/{table}/streams/{stream}``. 

1147 

1148 This corresponds to the ``name`` field 

1149 on the ``request`` instance; if ``request`` is provided, this 

1150 should not be set. 

1151 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

1152 should be retried. 

1153 timeout (float): The timeout for this request. 

1154 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

1155 sent along with the request as metadata. Normally, each value must be of type `str`, 

1156 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

1157 be of type `bytes`. 

1158 

1159 Returns: 

1160 google.cloud.bigquery_storage_v1.types.FinalizeWriteStreamResponse: 

1161 Response message for FinalizeWriteStream. 

1162 """ 

1163 # Create or coerce a protobuf request object. 

1164 # - Quick check: If we got a request object, we should *not* have 

1165 # gotten any keyword arguments that map to the request. 

1166 flattened_params = [name] 

1167 has_flattened_params = ( 

1168 len([param for param in flattened_params if param is not None]) > 0 

1169 ) 

1170 if request is not None and has_flattened_params: 

1171 raise ValueError( 

1172 "If the `request` argument is set, then none of " 

1173 "the individual field arguments should be set." 

1174 ) 

1175 

1176 # - Use the request object if provided (there's no risk of modifying the input as 

1177 # there are no flattened fields), or create one. 

1178 if not isinstance(request, storage.FinalizeWriteStreamRequest): 

1179 request = storage.FinalizeWriteStreamRequest(request) 

1180 # If we have keyword arguments corresponding to fields on the 

1181 # request, apply these. 

1182 if name is not None: 

1183 request.name = name 

1184 

1185 # Wrap the RPC method; this adds retry and timeout information, 

1186 # and friendly error handling. 

1187 rpc = self._transport._wrapped_methods[self._transport.finalize_write_stream] 

1188 

1189 # Certain fields should be provided within the metadata header; 

1190 # add these here. 

1191 metadata = tuple(metadata) + ( 

1192 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), 

1193 ) 

1194 

1195 # Validate the universe domain. 

1196 self._validate_universe_domain() 

1197 

1198 # Send the request. 

1199 response = rpc( 

1200 request, 

1201 retry=retry, 

1202 timeout=timeout, 

1203 metadata=metadata, 

1204 ) 

1205 

1206 # Done; return the response. 

1207 return response 

1208 

1209 def batch_commit_write_streams( 

1210 self, 

1211 request: Optional[Union[storage.BatchCommitWriteStreamsRequest, dict]] = None, 

1212 *, 

1213 parent: Optional[str] = None, 

1214 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

1215 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

1216 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

1217 ) -> storage.BatchCommitWriteStreamsResponse: 

1218 r"""Atomically commits a group of ``PENDING`` streams that belong to 

1219 the same ``parent`` table. 

1220 

1221 Streams must be finalized before commit and cannot be committed 

1222 multiple times. Once a stream is committed, data in the stream 

1223 becomes available for read operations. 

1224 

1225 .. code-block:: python 

1226 

1227 # This snippet has been automatically generated and should be regarded as a 

1228 # code template only. 

1229 # It will require modifications to work: 

1230 # - It may require correct/in-range values for request initialization. 

1231 # - It may require specifying regional endpoints when creating the service 

1232 # client as shown in: 

1233 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

1234 from google.cloud import bigquery_storage_v1 

1235 

1236 def sample_batch_commit_write_streams(): 

1237 # Create a client 

1238 client = bigquery_storage_v1.BigQueryWriteClient() 

1239 

1240 # Initialize request argument(s) 

1241 request = bigquery_storage_v1.BatchCommitWriteStreamsRequest( 

1242 parent="parent_value", 

1243 write_streams=['write_streams_value1', 'write_streams_value2'], 

1244 ) 

1245 

1246 # Make the request 

1247 response = client.batch_commit_write_streams(request=request) 

1248 

1249 # Handle the response 

1250 print(response) 

1251 

1252 Args: 

1253 request (Union[google.cloud.bigquery_storage_v1.types.BatchCommitWriteStreamsRequest, dict]): 

1254 The request object. Request message for ``BatchCommitWriteStreams``. 

1255 parent (str): 

1256 Required. Parent table that all the streams should 

1257 belong to, in the form of 

1258 ``projects/{project}/datasets/{dataset}/tables/{table}``. 

1259 

1260 This corresponds to the ``parent`` field 

1261 on the ``request`` instance; if ``request`` is provided, this 

1262 should not be set. 

1263 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

1264 should be retried. 

1265 timeout (float): The timeout for this request. 

1266 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

1267 sent along with the request as metadata. Normally, each value must be of type `str`, 

1268 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

1269 be of type `bytes`. 

1270 

1271 Returns: 

1272 google.cloud.bigquery_storage_v1.types.BatchCommitWriteStreamsResponse: 

1273 Response message for BatchCommitWriteStreams. 

1274 """ 

1275 # Create or coerce a protobuf request object. 

1276 # - Quick check: If we got a request object, we should *not* have 

1277 # gotten any keyword arguments that map to the request. 

1278 flattened_params = [parent] 

1279 has_flattened_params = ( 

1280 len([param for param in flattened_params if param is not None]) > 0 

1281 ) 

1282 if request is not None and has_flattened_params: 

1283 raise ValueError( 

1284 "If the `request` argument is set, then none of " 

1285 "the individual field arguments should be set." 

1286 ) 

1287 

1288 # - Use the request object if provided (there's no risk of modifying the input as 

1289 # there are no flattened fields), or create one. 

1290 if not isinstance(request, storage.BatchCommitWriteStreamsRequest): 

1291 request = storage.BatchCommitWriteStreamsRequest(request) 

1292 # If we have keyword arguments corresponding to fields on the 

1293 # request, apply these. 

1294 if parent is not None: 

1295 request.parent = parent 

1296 

1297 # Wrap the RPC method; this adds retry and timeout information, 

1298 # and friendly error handling. 

1299 rpc = self._transport._wrapped_methods[ 

1300 self._transport.batch_commit_write_streams 

1301 ] 

1302 

1303 # Certain fields should be provided within the metadata header; 

1304 # add these here. 

1305 metadata = tuple(metadata) + ( 

1306 gapic_v1.routing_header.to_grpc_metadata((("parent", request.parent),)), 

1307 ) 

1308 

1309 # Validate the universe domain. 

1310 self._validate_universe_domain() 

1311 

1312 # Send the request. 

1313 response = rpc( 

1314 request, 

1315 retry=retry, 

1316 timeout=timeout, 

1317 metadata=metadata, 

1318 ) 

1319 

1320 # Done; return the response. 

1321 return response 

1322 

1323 def flush_rows( 

1324 self, 

1325 request: Optional[Union[storage.FlushRowsRequest, dict]] = None, 

1326 *, 

1327 write_stream: Optional[str] = None, 

1328 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

1329 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

1330 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

1331 ) -> storage.FlushRowsResponse: 

1332 r"""Flushes rows to a BUFFERED stream. 

1333 

1334 If users are appending rows to BUFFERED stream, flush operation 

1335 is required in order for the rows to become available for 

1336 reading. A Flush operation flushes up to any previously flushed 

1337 offset in a BUFFERED stream, to the offset specified in the 

1338 request. 

1339 

1340 Flush is not supported on the \_default stream, since it is not 

1341 BUFFERED. 

1342 

1343 .. code-block:: python 

1344 

1345 # This snippet has been automatically generated and should be regarded as a 

1346 # code template only. 

1347 # It will require modifications to work: 

1348 # - It may require correct/in-range values for request initialization. 

1349 # - It may require specifying regional endpoints when creating the service 

1350 # client as shown in: 

1351 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

1352 from google.cloud import bigquery_storage_v1 

1353 

1354 def sample_flush_rows(): 

1355 # Create a client 

1356 client = bigquery_storage_v1.BigQueryWriteClient() 

1357 

1358 # Initialize request argument(s) 

1359 request = bigquery_storage_v1.FlushRowsRequest( 

1360 write_stream="write_stream_value", 

1361 ) 

1362 

1363 # Make the request 

1364 response = client.flush_rows(request=request) 

1365 

1366 # Handle the response 

1367 print(response) 

1368 

1369 Args: 

1370 request (Union[google.cloud.bigquery_storage_v1.types.FlushRowsRequest, dict]): 

1371 The request object. Request message for ``FlushRows``. 

1372 write_stream (str): 

1373 Required. The stream that is the 

1374 target of the flush operation. 

1375 

1376 This corresponds to the ``write_stream`` field 

1377 on the ``request`` instance; if ``request`` is provided, this 

1378 should not be set. 

1379 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

1380 should be retried. 

1381 timeout (float): The timeout for this request. 

1382 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

1383 sent along with the request as metadata. Normally, each value must be of type `str`, 

1384 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

1385 be of type `bytes`. 

1386 

1387 Returns: 

1388 google.cloud.bigquery_storage_v1.types.FlushRowsResponse: 

1389 Respond message for FlushRows. 

1390 """ 

1391 # Create or coerce a protobuf request object. 

1392 # - Quick check: If we got a request object, we should *not* have 

1393 # gotten any keyword arguments that map to the request. 

1394 flattened_params = [write_stream] 

1395 has_flattened_params = ( 

1396 len([param for param in flattened_params if param is not None]) > 0 

1397 ) 

1398 if request is not None and has_flattened_params: 

1399 raise ValueError( 

1400 "If the `request` argument is set, then none of " 

1401 "the individual field arguments should be set." 

1402 ) 

1403 

1404 # - Use the request object if provided (there's no risk of modifying the input as 

1405 # there are no flattened fields), or create one. 

1406 if not isinstance(request, storage.FlushRowsRequest): 

1407 request = storage.FlushRowsRequest(request) 

1408 # If we have keyword arguments corresponding to fields on the 

1409 # request, apply these. 

1410 if write_stream is not None: 

1411 request.write_stream = write_stream 

1412 

1413 # Wrap the RPC method; this adds retry and timeout information, 

1414 # and friendly error handling. 

1415 rpc = self._transport._wrapped_methods[self._transport.flush_rows] 

1416 

1417 # Certain fields should be provided within the metadata header; 

1418 # add these here. 

1419 metadata = tuple(metadata) + ( 

1420 gapic_v1.routing_header.to_grpc_metadata( 

1421 (("write_stream", request.write_stream),) 

1422 ), 

1423 ) 

1424 

1425 # Validate the universe domain. 

1426 self._validate_universe_domain() 

1427 

1428 # Send the request. 

1429 response = rpc( 

1430 request, 

1431 retry=retry, 

1432 timeout=timeout, 

1433 metadata=metadata, 

1434 ) 

1435 

1436 # Done; return the response. 

1437 return response 

1438 

1439 def __enter__(self) -> "BigQueryWriteClient": 

1440 return self 

1441 

1442 def __exit__(self, type, value, traceback): 

1443 """Releases underlying transport's resources. 

1444 

1445 .. warning:: 

1446 ONLY use as a context manager if the transport is NOT shared 

1447 with other clients! Exiting the with block will CLOSE the transport 

1448 and may cause errors in other clients! 

1449 """ 

1450 self.transport.close() 

1451 

1452 

1453DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

1454 gapic_version=package_version.__version__ 

1455) 

1456 

1457if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER 

1458 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__ 

1459 

1460__all__ = ("BigQueryWriteClient",)