Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/client.py: 39%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

285 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16from collections import OrderedDict 

17from http import HTTPStatus 

18import json 

19import logging as std_logging 

20import os 

21import re 

22from typing import ( 

23 Callable, 

24 Dict, 

25 Iterable, 

26 Mapping, 

27 MutableMapping, 

28 MutableSequence, 

29 Optional, 

30 Sequence, 

31 Tuple, 

32 Type, 

33 Union, 

34 cast, 

35) 

36import warnings 

37 

38from google.api_core import client_options as client_options_lib 

39from google.api_core import exceptions as core_exceptions 

40from google.api_core import gapic_v1 

41from google.api_core import retry as retries 

42from google.auth import credentials as ga_credentials # type: ignore 

43from google.auth.exceptions import MutualTLSChannelError # type: ignore 

44from google.auth.transport import mtls # type: ignore 

45from google.auth.transport.grpc import SslCredentials # type: ignore 

46from google.oauth2 import service_account # type: ignore 

47import google.protobuf 

48 

49from google.cloud.bigquery_storage_v1 import gapic_version as package_version 

50 

51try: 

52 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None] 

53except AttributeError: # pragma: NO COVER 

54 OptionalRetry = Union[retries.Retry, object, None] # type: ignore 

55 

56try: 

57 from google.api_core import client_logging # type: ignore 

58 

59 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

60except ImportError: # pragma: NO COVER 

61 CLIENT_LOGGING_SUPPORTED = False 

62 

63_LOGGER = std_logging.getLogger(__name__) 

64 

65from google.protobuf import timestamp_pb2 # type: ignore 

66 

67from google.cloud.bigquery_storage_v1.types import arrow, avro, storage, stream 

68 

69from .transports.base import DEFAULT_CLIENT_INFO, BigQueryReadTransport 

70from .transports.grpc import BigQueryReadGrpcTransport 

71from .transports.grpc_asyncio import BigQueryReadGrpcAsyncIOTransport 

72 

73 

74class BigQueryReadClientMeta(type): 

75 """Metaclass for the BigQueryRead client. 

76 

77 This provides class-level methods for building and retrieving 

78 support objects (e.g. transport) without polluting the client instance 

79 objects. 

80 """ 

81 

82 _transport_registry = OrderedDict() # type: Dict[str, Type[BigQueryReadTransport]] 

83 _transport_registry["grpc"] = BigQueryReadGrpcTransport 

84 _transport_registry["grpc_asyncio"] = BigQueryReadGrpcAsyncIOTransport 

85 

86 def get_transport_class( 

87 cls, 

88 label: Optional[str] = None, 

89 ) -> Type[BigQueryReadTransport]: 

90 """Returns an appropriate transport class. 

91 

92 Args: 

93 label: The name of the desired transport. If none is 

94 provided, then the first transport in the registry is used. 

95 

96 Returns: 

97 The transport class to use. 

98 """ 

99 # If a specific transport is requested, return that one. 

100 if label: 

101 return cls._transport_registry[label] 

102 

103 # No transport is requested; return the default (that is, the first one 

104 # in the dictionary). 

105 return next(iter(cls._transport_registry.values())) 

106 

107 

108class BigQueryReadClient(metaclass=BigQueryReadClientMeta): 

109 """BigQuery Read API. 

110 

111 The Read API can be used to read data from BigQuery. 

112 """ 

113 

114 @staticmethod 

115 def _get_default_mtls_endpoint(api_endpoint): 

116 """Converts api endpoint to mTLS endpoint. 

117 

118 Convert "*.sandbox.googleapis.com" and "*.googleapis.com" to 

119 "*.mtls.sandbox.googleapis.com" and "*.mtls.googleapis.com" respectively. 

120 Args: 

121 api_endpoint (Optional[str]): the api endpoint to convert. 

122 Returns: 

123 str: converted mTLS api endpoint. 

124 """ 

125 if not api_endpoint: 

126 return api_endpoint 

127 

128 mtls_endpoint_re = re.compile( 

129 r"(?P<name>[^.]+)(?P<mtls>\.mtls)?(?P<sandbox>\.sandbox)?(?P<googledomain>\.googleapis\.com)?" 

130 ) 

131 

132 m = mtls_endpoint_re.match(api_endpoint) 

133 name, mtls, sandbox, googledomain = m.groups() 

134 if mtls or not googledomain: 

135 return api_endpoint 

136 

137 if sandbox: 

138 return api_endpoint.replace( 

139 "sandbox.googleapis.com", "mtls.sandbox.googleapis.com" 

140 ) 

141 

142 return api_endpoint.replace(".googleapis.com", ".mtls.googleapis.com") 

143 

144 # Note: DEFAULT_ENDPOINT is deprecated. Use _DEFAULT_ENDPOINT_TEMPLATE instead. 

145 DEFAULT_ENDPOINT = "bigquerystorage.googleapis.com" 

146 DEFAULT_MTLS_ENDPOINT = _get_default_mtls_endpoint.__func__( # type: ignore 

147 DEFAULT_ENDPOINT 

148 ) 

149 

150 _DEFAULT_ENDPOINT_TEMPLATE = "bigquerystorage.{UNIVERSE_DOMAIN}" 

151 _DEFAULT_UNIVERSE = "googleapis.com" 

152 

153 @classmethod 

154 def from_service_account_info(cls, info: dict, *args, **kwargs): 

155 """Creates an instance of this client using the provided credentials 

156 info. 

157 

158 Args: 

159 info (dict): The service account private key info. 

160 args: Additional arguments to pass to the constructor. 

161 kwargs: Additional arguments to pass to the constructor. 

162 

163 Returns: 

164 BigQueryReadClient: The constructed client. 

165 """ 

166 credentials = service_account.Credentials.from_service_account_info(info) 

167 kwargs["credentials"] = credentials 

168 return cls(*args, **kwargs) 

169 

170 @classmethod 

171 def from_service_account_file(cls, filename: str, *args, **kwargs): 

172 """Creates an instance of this client using the provided credentials 

173 file. 

174 

175 Args: 

176 filename (str): The path to the service account private key json 

177 file. 

178 args: Additional arguments to pass to the constructor. 

179 kwargs: Additional arguments to pass to the constructor. 

180 

181 Returns: 

182 BigQueryReadClient: The constructed client. 

183 """ 

184 credentials = service_account.Credentials.from_service_account_file(filename) 

185 kwargs["credentials"] = credentials 

186 return cls(*args, **kwargs) 

187 

188 from_service_account_json = from_service_account_file 

189 

190 @property 

191 def transport(self) -> BigQueryReadTransport: 

192 """Returns the transport used by the client instance. 

193 

194 Returns: 

195 BigQueryReadTransport: The transport used by the client 

196 instance. 

197 """ 

198 return self._transport 

199 

200 @staticmethod 

201 def read_session_path( 

202 project: str, 

203 location: str, 

204 session: str, 

205 ) -> str: 

206 """Returns a fully-qualified read_session string.""" 

207 return "projects/{project}/locations/{location}/sessions/{session}".format( 

208 project=project, 

209 location=location, 

210 session=session, 

211 ) 

212 

213 @staticmethod 

214 def parse_read_session_path(path: str) -> Dict[str, str]: 

215 """Parses a read_session path into its component segments.""" 

216 m = re.match( 

217 r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)/sessions/(?P<session>.+?)$", 

218 path, 

219 ) 

220 return m.groupdict() if m else {} 

221 

222 @staticmethod 

223 def read_stream_path( 

224 project: str, 

225 location: str, 

226 session: str, 

227 stream: str, 

228 ) -> str: 

229 """Returns a fully-qualified read_stream string.""" 

230 return "projects/{project}/locations/{location}/sessions/{session}/streams/{stream}".format( 

231 project=project, 

232 location=location, 

233 session=session, 

234 stream=stream, 

235 ) 

236 

237 @staticmethod 

238 def parse_read_stream_path(path: str) -> Dict[str, str]: 

239 """Parses a read_stream path into its component segments.""" 

240 m = re.match( 

241 r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)/sessions/(?P<session>.+?)/streams/(?P<stream>.+?)$", 

242 path, 

243 ) 

244 return m.groupdict() if m else {} 

245 

246 @staticmethod 

247 def table_path( 

248 project: str, 

249 dataset: str, 

250 table: str, 

251 ) -> str: 

252 """Returns a fully-qualified table string.""" 

253 return "projects/{project}/datasets/{dataset}/tables/{table}".format( 

254 project=project, 

255 dataset=dataset, 

256 table=table, 

257 ) 

258 

259 @staticmethod 

260 def parse_table_path(path: str) -> Dict[str, str]: 

261 """Parses a table path into its component segments.""" 

262 m = re.match( 

263 r"^projects/(?P<project>.+?)/datasets/(?P<dataset>.+?)/tables/(?P<table>.+?)$", 

264 path, 

265 ) 

266 return m.groupdict() if m else {} 

267 

268 @staticmethod 

269 def common_billing_account_path( 

270 billing_account: str, 

271 ) -> str: 

272 """Returns a fully-qualified billing_account string.""" 

273 return "billingAccounts/{billing_account}".format( 

274 billing_account=billing_account, 

275 ) 

276 

277 @staticmethod 

278 def parse_common_billing_account_path(path: str) -> Dict[str, str]: 

279 """Parse a billing_account path into its component segments.""" 

280 m = re.match(r"^billingAccounts/(?P<billing_account>.+?)$", path) 

281 return m.groupdict() if m else {} 

282 

283 @staticmethod 

284 def common_folder_path( 

285 folder: str, 

286 ) -> str: 

287 """Returns a fully-qualified folder string.""" 

288 return "folders/{folder}".format( 

289 folder=folder, 

290 ) 

291 

292 @staticmethod 

293 def parse_common_folder_path(path: str) -> Dict[str, str]: 

294 """Parse a folder path into its component segments.""" 

295 m = re.match(r"^folders/(?P<folder>.+?)$", path) 

296 return m.groupdict() if m else {} 

297 

298 @staticmethod 

299 def common_organization_path( 

300 organization: str, 

301 ) -> str: 

302 """Returns a fully-qualified organization string.""" 

303 return "organizations/{organization}".format( 

304 organization=organization, 

305 ) 

306 

307 @staticmethod 

308 def parse_common_organization_path(path: str) -> Dict[str, str]: 

309 """Parse a organization path into its component segments.""" 

310 m = re.match(r"^organizations/(?P<organization>.+?)$", path) 

311 return m.groupdict() if m else {} 

312 

313 @staticmethod 

314 def common_project_path( 

315 project: str, 

316 ) -> str: 

317 """Returns a fully-qualified project string.""" 

318 return "projects/{project}".format( 

319 project=project, 

320 ) 

321 

322 @staticmethod 

323 def parse_common_project_path(path: str) -> Dict[str, str]: 

324 """Parse a project path into its component segments.""" 

325 m = re.match(r"^projects/(?P<project>.+?)$", path) 

326 return m.groupdict() if m else {} 

327 

328 @staticmethod 

329 def common_location_path( 

330 project: str, 

331 location: str, 

332 ) -> str: 

333 """Returns a fully-qualified location string.""" 

334 return "projects/{project}/locations/{location}".format( 

335 project=project, 

336 location=location, 

337 ) 

338 

339 @staticmethod 

340 def parse_common_location_path(path: str) -> Dict[str, str]: 

341 """Parse a location path into its component segments.""" 

342 m = re.match(r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)$", path) 

343 return m.groupdict() if m else {} 

344 

345 @classmethod 

346 def get_mtls_endpoint_and_cert_source( 

347 cls, client_options: Optional[client_options_lib.ClientOptions] = None 

348 ): 

349 """Deprecated. Return the API endpoint and client cert source for mutual TLS. 

350 

351 The client cert source is determined in the following order: 

352 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the 

353 client cert source is None. 

354 (2) if `client_options.client_cert_source` is provided, use the provided one; if the 

355 default client cert source exists, use the default one; otherwise the client cert 

356 source is None. 

357 

358 The API endpoint is determined in the following order: 

359 (1) if `client_options.api_endpoint` if provided, use the provided one. 

360 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the 

361 default mTLS endpoint; if the environment variable is "never", use the default API 

362 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise 

363 use the default API endpoint. 

364 

365 More details can be found at https://google.aip.dev/auth/4114. 

366 

367 Args: 

368 client_options (google.api_core.client_options.ClientOptions): Custom options for the 

369 client. Only the `api_endpoint` and `client_cert_source` properties may be used 

370 in this method. 

371 

372 Returns: 

373 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the 

374 client cert source to use. 

375 

376 Raises: 

377 google.auth.exceptions.MutualTLSChannelError: If any errors happen. 

378 """ 

379 

380 warnings.warn( 

381 "get_mtls_endpoint_and_cert_source is deprecated. Use the api_endpoint property instead.", 

382 DeprecationWarning, 

383 ) 

384 if client_options is None: 

385 client_options = client_options_lib.ClientOptions() 

386 use_client_cert = os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false") 

387 use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto") 

388 if use_client_cert not in ("true", "false"): 

389 raise ValueError( 

390 "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`" 

391 ) 

392 if use_mtls_endpoint not in ("auto", "never", "always"): 

393 raise MutualTLSChannelError( 

394 "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`" 

395 ) 

396 

397 # Figure out the client cert source to use. 

398 client_cert_source = None 

399 if use_client_cert == "true": 

400 if client_options.client_cert_source: 

401 client_cert_source = client_options.client_cert_source 

402 elif mtls.has_default_client_cert_source(): 

403 client_cert_source = mtls.default_client_cert_source() 

404 

405 # Figure out which api endpoint to use. 

406 if client_options.api_endpoint is not None: 

407 api_endpoint = client_options.api_endpoint 

408 elif use_mtls_endpoint == "always" or ( 

409 use_mtls_endpoint == "auto" and client_cert_source 

410 ): 

411 api_endpoint = cls.DEFAULT_MTLS_ENDPOINT 

412 else: 

413 api_endpoint = cls.DEFAULT_ENDPOINT 

414 

415 return api_endpoint, client_cert_source 

416 

417 @staticmethod 

418 def _read_environment_variables(): 

419 """Returns the environment variables used by the client. 

420 

421 Returns: 

422 Tuple[bool, str, str]: returns the GOOGLE_API_USE_CLIENT_CERTIFICATE, 

423 GOOGLE_API_USE_MTLS_ENDPOINT, and GOOGLE_CLOUD_UNIVERSE_DOMAIN environment variables. 

424 

425 Raises: 

426 ValueError: If GOOGLE_API_USE_CLIENT_CERTIFICATE is not 

427 any of ["true", "false"]. 

428 google.auth.exceptions.MutualTLSChannelError: If GOOGLE_API_USE_MTLS_ENDPOINT 

429 is not any of ["auto", "never", "always"]. 

430 """ 

431 use_client_cert = os.getenv( 

432 "GOOGLE_API_USE_CLIENT_CERTIFICATE", "false" 

433 ).lower() 

434 use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto").lower() 

435 universe_domain_env = os.getenv("GOOGLE_CLOUD_UNIVERSE_DOMAIN") 

436 if use_client_cert not in ("true", "false"): 

437 raise ValueError( 

438 "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`" 

439 ) 

440 if use_mtls_endpoint not in ("auto", "never", "always"): 

441 raise MutualTLSChannelError( 

442 "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`" 

443 ) 

444 return use_client_cert == "true", use_mtls_endpoint, universe_domain_env 

445 

446 @staticmethod 

447 def _get_client_cert_source(provided_cert_source, use_cert_flag): 

448 """Return the client cert source to be used by the client. 

449 

450 Args: 

451 provided_cert_source (bytes): The client certificate source provided. 

452 use_cert_flag (bool): A flag indicating whether to use the client certificate. 

453 

454 Returns: 

455 bytes or None: The client cert source to be used by the client. 

456 """ 

457 client_cert_source = None 

458 if use_cert_flag: 

459 if provided_cert_source: 

460 client_cert_source = provided_cert_source 

461 elif mtls.has_default_client_cert_source(): 

462 client_cert_source = mtls.default_client_cert_source() 

463 return client_cert_source 

464 

465 @staticmethod 

466 def _get_api_endpoint( 

467 api_override, client_cert_source, universe_domain, use_mtls_endpoint 

468 ): 

469 """Return the API endpoint used by the client. 

470 

471 Args: 

472 api_override (str): The API endpoint override. If specified, this is always 

473 the return value of this function and the other arguments are not used. 

474 client_cert_source (bytes): The client certificate source used by the client. 

475 universe_domain (str): The universe domain used by the client. 

476 use_mtls_endpoint (str): How to use the mTLS endpoint, which depends also on the other parameters. 

477 Possible values are "always", "auto", or "never". 

478 

479 Returns: 

480 str: The API endpoint to be used by the client. 

481 """ 

482 if api_override is not None: 

483 api_endpoint = api_override 

484 elif use_mtls_endpoint == "always" or ( 

485 use_mtls_endpoint == "auto" and client_cert_source 

486 ): 

487 _default_universe = BigQueryReadClient._DEFAULT_UNIVERSE 

488 if universe_domain != _default_universe: 

489 raise MutualTLSChannelError( 

490 f"mTLS is not supported in any universe other than {_default_universe}." 

491 ) 

492 api_endpoint = BigQueryReadClient.DEFAULT_MTLS_ENDPOINT 

493 else: 

494 api_endpoint = BigQueryReadClient._DEFAULT_ENDPOINT_TEMPLATE.format( 

495 UNIVERSE_DOMAIN=universe_domain 

496 ) 

497 return api_endpoint 

498 

499 @staticmethod 

500 def _get_universe_domain( 

501 client_universe_domain: Optional[str], universe_domain_env: Optional[str] 

502 ) -> str: 

503 """Return the universe domain used by the client. 

504 

505 Args: 

506 client_universe_domain (Optional[str]): The universe domain configured via the client options. 

507 universe_domain_env (Optional[str]): The universe domain configured via the "GOOGLE_CLOUD_UNIVERSE_DOMAIN" environment variable. 

508 

509 Returns: 

510 str: The universe domain to be used by the client. 

511 

512 Raises: 

513 ValueError: If the universe domain is an empty string. 

514 """ 

515 universe_domain = BigQueryReadClient._DEFAULT_UNIVERSE 

516 if client_universe_domain is not None: 

517 universe_domain = client_universe_domain 

518 elif universe_domain_env is not None: 

519 universe_domain = universe_domain_env 

520 if len(universe_domain.strip()) == 0: 

521 raise ValueError("Universe Domain cannot be an empty string.") 

522 return universe_domain 

523 

524 def _validate_universe_domain(self): 

525 """Validates client's and credentials' universe domains are consistent. 

526 

527 Returns: 

528 bool: True iff the configured universe domain is valid. 

529 

530 Raises: 

531 ValueError: If the configured universe domain is not valid. 

532 """ 

533 

534 # NOTE (b/349488459): universe validation is disabled until further notice. 

535 return True 

536 

537 def _add_cred_info_for_auth_errors( 

538 self, error: core_exceptions.GoogleAPICallError 

539 ) -> None: 

540 """Adds credential info string to error details for 401/403/404 errors. 

541 

542 Args: 

543 error (google.api_core.exceptions.GoogleAPICallError): The error to add the cred info. 

544 """ 

545 if error.code not in [ 

546 HTTPStatus.UNAUTHORIZED, 

547 HTTPStatus.FORBIDDEN, 

548 HTTPStatus.NOT_FOUND, 

549 ]: 

550 return 

551 

552 cred = self._transport._credentials 

553 

554 # get_cred_info is only available in google-auth>=2.35.0 

555 if not hasattr(cred, "get_cred_info"): 

556 return 

557 

558 # ignore the type check since pypy test fails when get_cred_info 

559 # is not available 

560 cred_info = cred.get_cred_info() # type: ignore 

561 if cred_info and hasattr(error._details, "append"): 

562 error._details.append(json.dumps(cred_info)) 

563 

564 @property 

565 def api_endpoint(self): 

566 """Return the API endpoint used by the client instance. 

567 

568 Returns: 

569 str: The API endpoint used by the client instance. 

570 """ 

571 return self._api_endpoint 

572 

573 @property 

574 def universe_domain(self) -> str: 

575 """Return the universe domain used by the client instance. 

576 

577 Returns: 

578 str: The universe domain used by the client instance. 

579 """ 

580 return self._universe_domain 

581 

582 def __init__( 

583 self, 

584 *, 

585 credentials: Optional[ga_credentials.Credentials] = None, 

586 transport: Optional[ 

587 Union[str, BigQueryReadTransport, Callable[..., BigQueryReadTransport]] 

588 ] = None, 

589 client_options: Optional[Union[client_options_lib.ClientOptions, dict]] = None, 

590 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

591 ) -> None: 

592 """Instantiates the big query read client. 

593 

594 Args: 

595 credentials (Optional[google.auth.credentials.Credentials]): The 

596 authorization credentials to attach to requests. These 

597 credentials identify the application to the service; if none 

598 are specified, the client will attempt to ascertain the 

599 credentials from the environment. 

600 transport (Optional[Union[str,BigQueryReadTransport,Callable[..., BigQueryReadTransport]]]): 

601 The transport to use, or a Callable that constructs and returns a new transport. 

602 If a Callable is given, it will be called with the same set of initialization 

603 arguments as used in the BigQueryReadTransport constructor. 

604 If set to None, a transport is chosen automatically. 

605 client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]): 

606 Custom options for the client. 

607 

608 1. The ``api_endpoint`` property can be used to override the 

609 default endpoint provided by the client when ``transport`` is 

610 not explicitly provided. Only if this property is not set and 

611 ``transport`` was not explicitly provided, the endpoint is 

612 determined by the GOOGLE_API_USE_MTLS_ENDPOINT environment 

613 variable, which have one of the following values: 

614 "always" (always use the default mTLS endpoint), "never" (always 

615 use the default regular endpoint) and "auto" (auto-switch to the 

616 default mTLS endpoint if client certificate is present; this is 

617 the default value). 

618 

619 2. If the GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable 

620 is "true", then the ``client_cert_source`` property can be used 

621 to provide a client certificate for mTLS transport. If 

622 not provided, the default SSL client certificate will be used if 

623 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not 

624 set, no client certificate will be used. 

625 

626 3. The ``universe_domain`` property can be used to override the 

627 default "googleapis.com" universe. Note that the ``api_endpoint`` 

628 property still takes precedence; and ``universe_domain`` is 

629 currently not supported for mTLS. 

630 

631 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

632 The client info used to send a user-agent string along with 

633 API requests. If ``None``, then default info will be used. 

634 Generally, you only need to set this if you're developing 

635 your own client library. 

636 

637 Raises: 

638 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport 

639 creation failed for any reason. 

640 """ 

641 self._client_options = client_options 

642 if isinstance(self._client_options, dict): 

643 self._client_options = client_options_lib.from_dict(self._client_options) 

644 if self._client_options is None: 

645 self._client_options = client_options_lib.ClientOptions() 

646 self._client_options = cast( 

647 client_options_lib.ClientOptions, self._client_options 

648 ) 

649 

650 universe_domain_opt = getattr(self._client_options, "universe_domain", None) 

651 

652 ( 

653 self._use_client_cert, 

654 self._use_mtls_endpoint, 

655 self._universe_domain_env, 

656 ) = BigQueryReadClient._read_environment_variables() 

657 self._client_cert_source = BigQueryReadClient._get_client_cert_source( 

658 self._client_options.client_cert_source, self._use_client_cert 

659 ) 

660 self._universe_domain = BigQueryReadClient._get_universe_domain( 

661 universe_domain_opt, self._universe_domain_env 

662 ) 

663 self._api_endpoint = None # updated below, depending on `transport` 

664 

665 # Initialize the universe domain validation. 

666 self._is_universe_domain_valid = False 

667 

668 if CLIENT_LOGGING_SUPPORTED: # pragma: NO COVER 

669 # Setup logging. 

670 client_logging.initialize_logging() 

671 

672 api_key_value = getattr(self._client_options, "api_key", None) 

673 if api_key_value and credentials: 

674 raise ValueError( 

675 "client_options.api_key and credentials are mutually exclusive" 

676 ) 

677 

678 # Save or instantiate the transport. 

679 # Ordinarily, we provide the transport, but allowing a custom transport 

680 # instance provides an extensibility point for unusual situations. 

681 transport_provided = isinstance(transport, BigQueryReadTransport) 

682 if transport_provided: 

683 # transport is a BigQueryReadTransport instance. 

684 if credentials or self._client_options.credentials_file or api_key_value: 

685 raise ValueError( 

686 "When providing a transport instance, " 

687 "provide its credentials directly." 

688 ) 

689 if self._client_options.scopes: 

690 raise ValueError( 

691 "When providing a transport instance, provide its scopes " 

692 "directly." 

693 ) 

694 self._transport = cast(BigQueryReadTransport, transport) 

695 self._api_endpoint = self._transport.host 

696 

697 self._api_endpoint = self._api_endpoint or BigQueryReadClient._get_api_endpoint( 

698 self._client_options.api_endpoint, 

699 self._client_cert_source, 

700 self._universe_domain, 

701 self._use_mtls_endpoint, 

702 ) 

703 

704 if not transport_provided: 

705 import google.auth._default # type: ignore 

706 

707 if api_key_value and hasattr( 

708 google.auth._default, "get_api_key_credentials" 

709 ): 

710 credentials = google.auth._default.get_api_key_credentials( 

711 api_key_value 

712 ) 

713 

714 transport_init: Union[ 

715 Type[BigQueryReadTransport], Callable[..., BigQueryReadTransport] 

716 ] = ( 

717 BigQueryReadClient.get_transport_class(transport) 

718 if isinstance(transport, str) or transport is None 

719 else cast(Callable[..., BigQueryReadTransport], transport) 

720 ) 

721 # initialize with the provided callable or the passed in class 

722 self._transport = transport_init( 

723 credentials=credentials, 

724 credentials_file=self._client_options.credentials_file, 

725 host=self._api_endpoint, 

726 scopes=self._client_options.scopes, 

727 client_cert_source_for_mtls=self._client_cert_source, 

728 quota_project_id=self._client_options.quota_project_id, 

729 client_info=client_info, 

730 always_use_jwt_access=True, 

731 api_audience=self._client_options.api_audience, 

732 ) 

733 

734 if "async" not in str(self._transport): 

735 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

736 std_logging.DEBUG 

737 ): # pragma: NO COVER 

738 _LOGGER.debug( 

739 "Created client `google.cloud.bigquery.storage_v1.BigQueryReadClient`.", 

740 extra={ 

741 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryRead", 

742 "universeDomain": getattr( 

743 self._transport._credentials, "universe_domain", "" 

744 ), 

745 "credentialsType": f"{type(self._transport._credentials).__module__}.{type(self._transport._credentials).__qualname__}", 

746 "credentialsInfo": getattr( 

747 self.transport._credentials, "get_cred_info", lambda: None 

748 )(), 

749 } 

750 if hasattr(self._transport, "_credentials") 

751 else { 

752 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryRead", 

753 "credentialsType": None, 

754 }, 

755 ) 

756 

757 def create_read_session( 

758 self, 

759 request: Optional[Union[storage.CreateReadSessionRequest, dict]] = None, 

760 *, 

761 parent: Optional[str] = None, 

762 read_session: Optional[stream.ReadSession] = None, 

763 max_stream_count: Optional[int] = None, 

764 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

765 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

766 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

767 ) -> stream.ReadSession: 

768 r"""Creates a new read session. A read session divides 

769 the contents of a BigQuery table into one or more 

770 streams, which can then be used to read data from the 

771 table. The read session also specifies properties of the 

772 data to be read, such as a list of columns or a 

773 push-down filter describing the rows to be returned. 

774 

775 A particular row can be read by at most one stream. When 

776 the caller has reached the end of each stream in the 

777 session, then all the data in the table has been read. 

778 

779 Data is assigned to each stream such that roughly the 

780 same number of rows can be read from each stream. 

781 Because the server-side unit for assigning data is 

782 collections of rows, the API does not guarantee that 

783 each stream will return the same number or rows. 

784 Additionally, the limits are enforced based on the 

785 number of pre-filtered rows, so some filters can lead to 

786 lopsided assignments. 

787 

788 Read sessions automatically expire 6 hours after they 

789 are created and do not require manual clean-up by the 

790 caller. 

791 

792 .. code-block:: python 

793 

794 # This snippet has been automatically generated and should be regarded as a 

795 # code template only. 

796 # It will require modifications to work: 

797 # - It may require correct/in-range values for request initialization. 

798 # - It may require specifying regional endpoints when creating the service 

799 # client as shown in: 

800 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

801 from google.cloud import bigquery_storage_v1 

802 

803 def sample_create_read_session(): 

804 # Create a client 

805 client = bigquery_storage_v1.BigQueryReadClient() 

806 

807 # Initialize request argument(s) 

808 request = bigquery_storage_v1.CreateReadSessionRequest( 

809 parent="parent_value", 

810 ) 

811 

812 # Make the request 

813 response = client.create_read_session(request=request) 

814 

815 # Handle the response 

816 print(response) 

817 

818 Args: 

819 request (Union[google.cloud.bigquery_storage_v1.types.CreateReadSessionRequest, dict]): 

820 The request object. Request message for ``CreateReadSession``. 

821 parent (str): 

822 Required. The request project that owns the session, in 

823 the form of ``projects/{project_id}``. 

824 

825 This corresponds to the ``parent`` field 

826 on the ``request`` instance; if ``request`` is provided, this 

827 should not be set. 

828 read_session (google.cloud.bigquery_storage_v1.types.ReadSession): 

829 Required. Session to be created. 

830 This corresponds to the ``read_session`` field 

831 on the ``request`` instance; if ``request`` is provided, this 

832 should not be set. 

833 max_stream_count (int): 

834 Max initial number of streams. If unset or zero, the 

835 server will provide a value of streams so as to produce 

836 reasonable throughput. Must be non-negative. The number 

837 of streams may be lower than the requested number, 

838 depending on the amount parallelism that is reasonable 

839 for the table. There is a default system max limit of 

840 1,000. 

841 

842 This must be greater than or equal to 

843 preferred_min_stream_count. Typically, clients should 

844 either leave this unset to let the system to determine 

845 an upper bound OR set this a size for the maximum "units 

846 of work" it can gracefully handle. 

847 

848 This corresponds to the ``max_stream_count`` field 

849 on the ``request`` instance; if ``request`` is provided, this 

850 should not be set. 

851 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

852 should be retried. 

853 timeout (float): The timeout for this request. 

854 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

855 sent along with the request as metadata. Normally, each value must be of type `str`, 

856 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

857 be of type `bytes`. 

858 

859 Returns: 

860 google.cloud.bigquery_storage_v1.types.ReadSession: 

861 Information about the ReadSession. 

862 """ 

863 # Create or coerce a protobuf request object. 

864 # - Quick check: If we got a request object, we should *not* have 

865 # gotten any keyword arguments that map to the request. 

866 flattened_params = [parent, read_session, max_stream_count] 

867 has_flattened_params = ( 

868 len([param for param in flattened_params if param is not None]) > 0 

869 ) 

870 if request is not None and has_flattened_params: 

871 raise ValueError( 

872 "If the `request` argument is set, then none of " 

873 "the individual field arguments should be set." 

874 ) 

875 

876 # - Use the request object if provided (there's no risk of modifying the input as 

877 # there are no flattened fields), or create one. 

878 if not isinstance(request, storage.CreateReadSessionRequest): 

879 request = storage.CreateReadSessionRequest(request) 

880 # If we have keyword arguments corresponding to fields on the 

881 # request, apply these. 

882 if parent is not None: 

883 request.parent = parent 

884 if read_session is not None: 

885 request.read_session = read_session 

886 if max_stream_count is not None: 

887 request.max_stream_count = max_stream_count 

888 

889 # Wrap the RPC method; this adds retry and timeout information, 

890 # and friendly error handling. 

891 rpc = self._transport._wrapped_methods[self._transport.create_read_session] 

892 

893 # Certain fields should be provided within the metadata header; 

894 # add these here. 

895 metadata = tuple(metadata) + ( 

896 gapic_v1.routing_header.to_grpc_metadata( 

897 (("read_session.table", request.read_session.table),) 

898 ), 

899 ) 

900 

901 # Validate the universe domain. 

902 self._validate_universe_domain() 

903 

904 # Send the request. 

905 response = rpc( 

906 request, 

907 retry=retry, 

908 timeout=timeout, 

909 metadata=metadata, 

910 ) 

911 

912 # Done; return the response. 

913 return response 

914 

915 def read_rows( 

916 self, 

917 request: Optional[Union[storage.ReadRowsRequest, dict]] = None, 

918 *, 

919 read_stream: Optional[str] = None, 

920 offset: Optional[int] = None, 

921 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

922 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

923 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

924 ) -> Iterable[storage.ReadRowsResponse]: 

925 r"""Reads rows from the stream in the format prescribed 

926 by the ReadSession. Each response contains one or more 

927 table rows, up to a maximum of 100 MiB per response; 

928 read requests which attempt to read individual rows 

929 larger than 100 MiB will fail. 

930 

931 Each request also returns a set of stream statistics 

932 reflecting the current state of the stream. 

933 

934 .. code-block:: python 

935 

936 # This snippet has been automatically generated and should be regarded as a 

937 # code template only. 

938 # It will require modifications to work: 

939 # - It may require correct/in-range values for request initialization. 

940 # - It may require specifying regional endpoints when creating the service 

941 # client as shown in: 

942 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

943 from google.cloud import bigquery_storage_v1 

944 

945 def sample_read_rows(): 

946 # Create a client 

947 client = bigquery_storage_v1.BigQueryReadClient() 

948 

949 # Initialize request argument(s) 

950 request = bigquery_storage_v1.ReadRowsRequest( 

951 read_stream="read_stream_value", 

952 ) 

953 

954 # Make the request 

955 stream = client.read_rows(request=request) 

956 

957 # Handle the response 

958 for response in stream: 

959 print(response) 

960 

961 Args: 

962 request (Union[google.cloud.bigquery_storage_v1.types.ReadRowsRequest, dict]): 

963 The request object. Request message for ``ReadRows``. 

964 read_stream (str): 

965 Required. Stream to read rows from. 

966 This corresponds to the ``read_stream`` field 

967 on the ``request`` instance; if ``request`` is provided, this 

968 should not be set. 

969 offset (int): 

970 The offset requested must be less 

971 than the last row read from Read. 

972 Requesting a larger offset is undefined. 

973 If not specified, start reading from 

974 offset zero. 

975 

976 This corresponds to the ``offset`` field 

977 on the ``request`` instance; if ``request`` is provided, this 

978 should not be set. 

979 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

980 should be retried. 

981 timeout (float): The timeout for this request. 

982 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

983 sent along with the request as metadata. Normally, each value must be of type `str`, 

984 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

985 be of type `bytes`. 

986 

987 Returns: 

988 Iterable[google.cloud.bigquery_storage_v1.types.ReadRowsResponse]: 

989 Response from calling ReadRows may include row data, progress and 

990 throttling information. 

991 

992 """ 

993 # Create or coerce a protobuf request object. 

994 # - Quick check: If we got a request object, we should *not* have 

995 # gotten any keyword arguments that map to the request. 

996 flattened_params = [read_stream, offset] 

997 has_flattened_params = ( 

998 len([param for param in flattened_params if param is not None]) > 0 

999 ) 

1000 if request is not None and has_flattened_params: 

1001 raise ValueError( 

1002 "If the `request` argument is set, then none of " 

1003 "the individual field arguments should be set." 

1004 ) 

1005 

1006 # - Use the request object if provided (there's no risk of modifying the input as 

1007 # there are no flattened fields), or create one. 

1008 if not isinstance(request, storage.ReadRowsRequest): 

1009 request = storage.ReadRowsRequest(request) 

1010 # If we have keyword arguments corresponding to fields on the 

1011 # request, apply these. 

1012 if read_stream is not None: 

1013 request.read_stream = read_stream 

1014 if offset is not None: 

1015 request.offset = offset 

1016 

1017 # Wrap the RPC method; this adds retry and timeout information, 

1018 # and friendly error handling. 

1019 rpc = self._transport._wrapped_methods[self._transport.read_rows] 

1020 

1021 # Certain fields should be provided within the metadata header; 

1022 # add these here. 

1023 metadata = tuple(metadata) + ( 

1024 gapic_v1.routing_header.to_grpc_metadata( 

1025 (("read_stream", request.read_stream),) 

1026 ), 

1027 ) 

1028 

1029 # Validate the universe domain. 

1030 self._validate_universe_domain() 

1031 

1032 # Send the request. 

1033 response = rpc( 

1034 request, 

1035 retry=retry, 

1036 timeout=timeout, 

1037 metadata=metadata, 

1038 ) 

1039 

1040 # Done; return the response. 

1041 return response 

1042 

1043 def split_read_stream( 

1044 self, 

1045 request: Optional[Union[storage.SplitReadStreamRequest, dict]] = None, 

1046 *, 

1047 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

1048 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

1049 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

1050 ) -> storage.SplitReadStreamResponse: 

1051 r"""Splits a given ``ReadStream`` into two ``ReadStream`` objects. 

1052 These ``ReadStream`` objects are referred to as the primary and 

1053 the residual streams of the split. The original ``ReadStream`` 

1054 can still be read from in the same manner as before. Both of the 

1055 returned ``ReadStream`` objects can also be read from, and the 

1056 rows returned by both child streams will be the same as the rows 

1057 read from the original stream. 

1058 

1059 Moreover, the two child streams will be allocated back-to-back 

1060 in the original ``ReadStream``. Concretely, it is guaranteed 

1061 that for streams original, primary, and residual, that 

1062 original[0-j] = primary[0-j] and original[j-n] = residual[0-m] 

1063 once the streams have been read to completion. 

1064 

1065 .. code-block:: python 

1066 

1067 # This snippet has been automatically generated and should be regarded as a 

1068 # code template only. 

1069 # It will require modifications to work: 

1070 # - It may require correct/in-range values for request initialization. 

1071 # - It may require specifying regional endpoints when creating the service 

1072 # client as shown in: 

1073 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

1074 from google.cloud import bigquery_storage_v1 

1075 

1076 def sample_split_read_stream(): 

1077 # Create a client 

1078 client = bigquery_storage_v1.BigQueryReadClient() 

1079 

1080 # Initialize request argument(s) 

1081 request = bigquery_storage_v1.SplitReadStreamRequest( 

1082 name="name_value", 

1083 ) 

1084 

1085 # Make the request 

1086 response = client.split_read_stream(request=request) 

1087 

1088 # Handle the response 

1089 print(response) 

1090 

1091 Args: 

1092 request (Union[google.cloud.bigquery_storage_v1.types.SplitReadStreamRequest, dict]): 

1093 The request object. Request message for ``SplitReadStream``. 

1094 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

1095 should be retried. 

1096 timeout (float): The timeout for this request. 

1097 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

1098 sent along with the request as metadata. Normally, each value must be of type `str`, 

1099 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

1100 be of type `bytes`. 

1101 

1102 Returns: 

1103 google.cloud.bigquery_storage_v1.types.SplitReadStreamResponse: 

1104 Response message for SplitReadStream. 

1105 """ 

1106 # Create or coerce a protobuf request object. 

1107 # - Use the request object if provided (there's no risk of modifying the input as 

1108 # there are no flattened fields), or create one. 

1109 if not isinstance(request, storage.SplitReadStreamRequest): 

1110 request = storage.SplitReadStreamRequest(request) 

1111 

1112 # Wrap the RPC method; this adds retry and timeout information, 

1113 # and friendly error handling. 

1114 rpc = self._transport._wrapped_methods[self._transport.split_read_stream] 

1115 

1116 # Certain fields should be provided within the metadata header; 

1117 # add these here. 

1118 metadata = tuple(metadata) + ( 

1119 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), 

1120 ) 

1121 

1122 # Validate the universe domain. 

1123 self._validate_universe_domain() 

1124 

1125 # Send the request. 

1126 response = rpc( 

1127 request, 

1128 retry=retry, 

1129 timeout=timeout, 

1130 metadata=metadata, 

1131 ) 

1132 

1133 # Done; return the response. 

1134 return response 

1135 

1136 def __enter__(self) -> "BigQueryReadClient": 

1137 return self 

1138 

1139 def __exit__(self, type, value, traceback): 

1140 """Releases underlying transport's resources. 

1141 

1142 .. warning:: 

1143 ONLY use as a context manager if the transport is NOT shared 

1144 with other clients! Exiting the with block will CLOSE the transport 

1145 and may cause errors in other clients! 

1146 """ 

1147 self.transport.close() 

1148 

1149 

1150DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

1151 gapic_version=package_version.__version__ 

1152) 

1153 

1154if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER 

1155 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__ 

1156 

1157__all__ = ("BigQueryReadClient",)