Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/client.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

273 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2024 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16from collections import OrderedDict 

17import os 

18import re 

19from typing import ( 

20 Dict, 

21 Iterable, 

22 Mapping, 

23 MutableMapping, 

24 MutableSequence, 

25 Optional, 

26 Sequence, 

27 Tuple, 

28 Type, 

29 Union, 

30 cast, 

31) 

32import warnings 

33 

34from google.api_core import client_options as client_options_lib 

35from google.api_core import exceptions as core_exceptions 

36from google.api_core import gapic_v1 

37from google.api_core import retry as retries 

38from google.auth import credentials as ga_credentials # type: ignore 

39from google.auth.exceptions import MutualTLSChannelError # type: ignore 

40from google.auth.transport import mtls # type: ignore 

41from google.auth.transport.grpc import SslCredentials # type: ignore 

42from google.oauth2 import service_account # type: ignore 

43 

44from google.cloud.bigquery_storage_v1 import gapic_version as package_version 

45 

46try: 

47 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None] 

48except AttributeError: # pragma: NO COVER 

49 OptionalRetry = Union[retries.Retry, object, None] # type: ignore 

50 

51from google.protobuf import timestamp_pb2 # type: ignore 

52 

53from google.cloud.bigquery_storage_v1.types import arrow, avro, storage, stream 

54 

55from .transports.base import DEFAULT_CLIENT_INFO, BigQueryReadTransport 

56from .transports.grpc import BigQueryReadGrpcTransport 

57from .transports.grpc_asyncio import BigQueryReadGrpcAsyncIOTransport 

58 

59 

60class BigQueryReadClientMeta(type): 

61 """Metaclass for the BigQueryRead client. 

62 

63 This provides class-level methods for building and retrieving 

64 support objects (e.g. transport) without polluting the client instance 

65 objects. 

66 """ 

67 

68 _transport_registry = OrderedDict() # type: Dict[str, Type[BigQueryReadTransport]] 

69 _transport_registry["grpc"] = BigQueryReadGrpcTransport 

70 _transport_registry["grpc_asyncio"] = BigQueryReadGrpcAsyncIOTransport 

71 

72 def get_transport_class( 

73 cls, 

74 label: Optional[str] = None, 

75 ) -> Type[BigQueryReadTransport]: 

76 """Returns an appropriate transport class. 

77 

78 Args: 

79 label: The name of the desired transport. If none is 

80 provided, then the first transport in the registry is used. 

81 

82 Returns: 

83 The transport class to use. 

84 """ 

85 # If a specific transport is requested, return that one. 

86 if label: 

87 return cls._transport_registry[label] 

88 

89 # No transport is requested; return the default (that is, the first one 

90 # in the dictionary). 

91 return next(iter(cls._transport_registry.values())) 

92 

93 

94class BigQueryReadClient(metaclass=BigQueryReadClientMeta): 

95 """BigQuery Read API. 

96 

97 The Read API can be used to read data from BigQuery. 

98 """ 

99 

100 @staticmethod 

101 def _get_default_mtls_endpoint(api_endpoint): 

102 """Converts api endpoint to mTLS endpoint. 

103 

104 Convert "*.sandbox.googleapis.com" and "*.googleapis.com" to 

105 "*.mtls.sandbox.googleapis.com" and "*.mtls.googleapis.com" respectively. 

106 Args: 

107 api_endpoint (Optional[str]): the api endpoint to convert. 

108 Returns: 

109 str: converted mTLS api endpoint. 

110 """ 

111 if not api_endpoint: 

112 return api_endpoint 

113 

114 mtls_endpoint_re = re.compile( 

115 r"(?P<name>[^.]+)(?P<mtls>\.mtls)?(?P<sandbox>\.sandbox)?(?P<googledomain>\.googleapis\.com)?" 

116 ) 

117 

118 m = mtls_endpoint_re.match(api_endpoint) 

119 name, mtls, sandbox, googledomain = m.groups() 

120 if mtls or not googledomain: 

121 return api_endpoint 

122 

123 if sandbox: 

124 return api_endpoint.replace( 

125 "sandbox.googleapis.com", "mtls.sandbox.googleapis.com" 

126 ) 

127 

128 return api_endpoint.replace(".googleapis.com", ".mtls.googleapis.com") 

129 

130 # Note: DEFAULT_ENDPOINT is deprecated. Use _DEFAULT_ENDPOINT_TEMPLATE instead. 

131 DEFAULT_ENDPOINT = "bigquerystorage.googleapis.com" 

132 DEFAULT_MTLS_ENDPOINT = _get_default_mtls_endpoint.__func__( # type: ignore 

133 DEFAULT_ENDPOINT 

134 ) 

135 

136 _DEFAULT_ENDPOINT_TEMPLATE = "bigquerystorage.{UNIVERSE_DOMAIN}" 

137 _DEFAULT_UNIVERSE = "googleapis.com" 

138 

139 @classmethod 

140 def from_service_account_info(cls, info: dict, *args, **kwargs): 

141 """Creates an instance of this client using the provided credentials 

142 info. 

143 

144 Args: 

145 info (dict): The service account private key info. 

146 args: Additional arguments to pass to the constructor. 

147 kwargs: Additional arguments to pass to the constructor. 

148 

149 Returns: 

150 BigQueryReadClient: The constructed client. 

151 """ 

152 credentials = service_account.Credentials.from_service_account_info(info) 

153 kwargs["credentials"] = credentials 

154 return cls(*args, **kwargs) 

155 

156 @classmethod 

157 def from_service_account_file(cls, filename: str, *args, **kwargs): 

158 """Creates an instance of this client using the provided credentials 

159 file. 

160 

161 Args: 

162 filename (str): The path to the service account private key json 

163 file. 

164 args: Additional arguments to pass to the constructor. 

165 kwargs: Additional arguments to pass to the constructor. 

166 

167 Returns: 

168 BigQueryReadClient: The constructed client. 

169 """ 

170 credentials = service_account.Credentials.from_service_account_file(filename) 

171 kwargs["credentials"] = credentials 

172 return cls(*args, **kwargs) 

173 

174 from_service_account_json = from_service_account_file 

175 

176 @property 

177 def transport(self) -> BigQueryReadTransport: 

178 """Returns the transport used by the client instance. 

179 

180 Returns: 

181 BigQueryReadTransport: The transport used by the client 

182 instance. 

183 """ 

184 return self._transport 

185 

186 @staticmethod 

187 def read_session_path( 

188 project: str, 

189 location: str, 

190 session: str, 

191 ) -> str: 

192 """Returns a fully-qualified read_session string.""" 

193 return "projects/{project}/locations/{location}/sessions/{session}".format( 

194 project=project, 

195 location=location, 

196 session=session, 

197 ) 

198 

199 @staticmethod 

200 def parse_read_session_path(path: str) -> Dict[str, str]: 

201 """Parses a read_session path into its component segments.""" 

202 m = re.match( 

203 r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)/sessions/(?P<session>.+?)$", 

204 path, 

205 ) 

206 return m.groupdict() if m else {} 

207 

208 @staticmethod 

209 def read_stream_path( 

210 project: str, 

211 location: str, 

212 session: str, 

213 stream: str, 

214 ) -> str: 

215 """Returns a fully-qualified read_stream string.""" 

216 return "projects/{project}/locations/{location}/sessions/{session}/streams/{stream}".format( 

217 project=project, 

218 location=location, 

219 session=session, 

220 stream=stream, 

221 ) 

222 

223 @staticmethod 

224 def parse_read_stream_path(path: str) -> Dict[str, str]: 

225 """Parses a read_stream path into its component segments.""" 

226 m = re.match( 

227 r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)/sessions/(?P<session>.+?)/streams/(?P<stream>.+?)$", 

228 path, 

229 ) 

230 return m.groupdict() if m else {} 

231 

232 @staticmethod 

233 def table_path( 

234 project: str, 

235 dataset: str, 

236 table: str, 

237 ) -> str: 

238 """Returns a fully-qualified table string.""" 

239 return "projects/{project}/datasets/{dataset}/tables/{table}".format( 

240 project=project, 

241 dataset=dataset, 

242 table=table, 

243 ) 

244 

245 @staticmethod 

246 def parse_table_path(path: str) -> Dict[str, str]: 

247 """Parses a table path into its component segments.""" 

248 m = re.match( 

249 r"^projects/(?P<project>.+?)/datasets/(?P<dataset>.+?)/tables/(?P<table>.+?)$", 

250 path, 

251 ) 

252 return m.groupdict() if m else {} 

253 

254 @staticmethod 

255 def common_billing_account_path( 

256 billing_account: str, 

257 ) -> str: 

258 """Returns a fully-qualified billing_account string.""" 

259 return "billingAccounts/{billing_account}".format( 

260 billing_account=billing_account, 

261 ) 

262 

263 @staticmethod 

264 def parse_common_billing_account_path(path: str) -> Dict[str, str]: 

265 """Parse a billing_account path into its component segments.""" 

266 m = re.match(r"^billingAccounts/(?P<billing_account>.+?)$", path) 

267 return m.groupdict() if m else {} 

268 

269 @staticmethod 

270 def common_folder_path( 

271 folder: str, 

272 ) -> str: 

273 """Returns a fully-qualified folder string.""" 

274 return "folders/{folder}".format( 

275 folder=folder, 

276 ) 

277 

278 @staticmethod 

279 def parse_common_folder_path(path: str) -> Dict[str, str]: 

280 """Parse a folder path into its component segments.""" 

281 m = re.match(r"^folders/(?P<folder>.+?)$", path) 

282 return m.groupdict() if m else {} 

283 

284 @staticmethod 

285 def common_organization_path( 

286 organization: str, 

287 ) -> str: 

288 """Returns a fully-qualified organization string.""" 

289 return "organizations/{organization}".format( 

290 organization=organization, 

291 ) 

292 

293 @staticmethod 

294 def parse_common_organization_path(path: str) -> Dict[str, str]: 

295 """Parse a organization path into its component segments.""" 

296 m = re.match(r"^organizations/(?P<organization>.+?)$", path) 

297 return m.groupdict() if m else {} 

298 

299 @staticmethod 

300 def common_project_path( 

301 project: str, 

302 ) -> str: 

303 """Returns a fully-qualified project string.""" 

304 return "projects/{project}".format( 

305 project=project, 

306 ) 

307 

308 @staticmethod 

309 def parse_common_project_path(path: str) -> Dict[str, str]: 

310 """Parse a project path into its component segments.""" 

311 m = re.match(r"^projects/(?P<project>.+?)$", path) 

312 return m.groupdict() if m else {} 

313 

314 @staticmethod 

315 def common_location_path( 

316 project: str, 

317 location: str, 

318 ) -> str: 

319 """Returns a fully-qualified location string.""" 

320 return "projects/{project}/locations/{location}".format( 

321 project=project, 

322 location=location, 

323 ) 

324 

325 @staticmethod 

326 def parse_common_location_path(path: str) -> Dict[str, str]: 

327 """Parse a location path into its component segments.""" 

328 m = re.match(r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)$", path) 

329 return m.groupdict() if m else {} 

330 

331 @classmethod 

332 def get_mtls_endpoint_and_cert_source( 

333 cls, client_options: Optional[client_options_lib.ClientOptions] = None 

334 ): 

335 """Deprecated. Return the API endpoint and client cert source for mutual TLS. 

336 

337 The client cert source is determined in the following order: 

338 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the 

339 client cert source is None. 

340 (2) if `client_options.client_cert_source` is provided, use the provided one; if the 

341 default client cert source exists, use the default one; otherwise the client cert 

342 source is None. 

343 

344 The API endpoint is determined in the following order: 

345 (1) if `client_options.api_endpoint` if provided, use the provided one. 

346 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the 

347 default mTLS endpoint; if the environment variable is "never", use the default API 

348 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise 

349 use the default API endpoint. 

350 

351 More details can be found at https://google.aip.dev/auth/4114. 

352 

353 Args: 

354 client_options (google.api_core.client_options.ClientOptions): Custom options for the 

355 client. Only the `api_endpoint` and `client_cert_source` properties may be used 

356 in this method. 

357 

358 Returns: 

359 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the 

360 client cert source to use. 

361 

362 Raises: 

363 google.auth.exceptions.MutualTLSChannelError: If any errors happen. 

364 """ 

365 

366 warnings.warn( 

367 "get_mtls_endpoint_and_cert_source is deprecated. Use the api_endpoint property instead.", 

368 DeprecationWarning, 

369 ) 

370 if client_options is None: 

371 client_options = client_options_lib.ClientOptions() 

372 use_client_cert = os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false") 

373 use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto") 

374 if use_client_cert not in ("true", "false"): 

375 raise ValueError( 

376 "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`" 

377 ) 

378 if use_mtls_endpoint not in ("auto", "never", "always"): 

379 raise MutualTLSChannelError( 

380 "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`" 

381 ) 

382 

383 # Figure out the client cert source to use. 

384 client_cert_source = None 

385 if use_client_cert == "true": 

386 if client_options.client_cert_source: 

387 client_cert_source = client_options.client_cert_source 

388 elif mtls.has_default_client_cert_source(): 

389 client_cert_source = mtls.default_client_cert_source() 

390 

391 # Figure out which api endpoint to use. 

392 if client_options.api_endpoint is not None: 

393 api_endpoint = client_options.api_endpoint 

394 elif use_mtls_endpoint == "always" or ( 

395 use_mtls_endpoint == "auto" and client_cert_source 

396 ): 

397 api_endpoint = cls.DEFAULT_MTLS_ENDPOINT 

398 else: 

399 api_endpoint = cls.DEFAULT_ENDPOINT 

400 

401 return api_endpoint, client_cert_source 

402 

403 @staticmethod 

404 def _read_environment_variables(): 

405 """Returns the environment variables used by the client. 

406 

407 Returns: 

408 Tuple[bool, str, str]: returns the GOOGLE_API_USE_CLIENT_CERTIFICATE, 

409 GOOGLE_API_USE_MTLS_ENDPOINT, and GOOGLE_CLOUD_UNIVERSE_DOMAIN environment variables. 

410 

411 Raises: 

412 ValueError: If GOOGLE_API_USE_CLIENT_CERTIFICATE is not 

413 any of ["true", "false"]. 

414 google.auth.exceptions.MutualTLSChannelError: If GOOGLE_API_USE_MTLS_ENDPOINT 

415 is not any of ["auto", "never", "always"]. 

416 """ 

417 use_client_cert = os.getenv( 

418 "GOOGLE_API_USE_CLIENT_CERTIFICATE", "false" 

419 ).lower() 

420 use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto").lower() 

421 universe_domain_env = os.getenv("GOOGLE_CLOUD_UNIVERSE_DOMAIN") 

422 if use_client_cert not in ("true", "false"): 

423 raise ValueError( 

424 "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`" 

425 ) 

426 if use_mtls_endpoint not in ("auto", "never", "always"): 

427 raise MutualTLSChannelError( 

428 "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`" 

429 ) 

430 return use_client_cert == "true", use_mtls_endpoint, universe_domain_env 

431 

432 @staticmethod 

433 def _get_client_cert_source(provided_cert_source, use_cert_flag): 

434 """Return the client cert source to be used by the client. 

435 

436 Args: 

437 provided_cert_source (bytes): The client certificate source provided. 

438 use_cert_flag (bool): A flag indicating whether to use the client certificate. 

439 

440 Returns: 

441 bytes or None: The client cert source to be used by the client. 

442 """ 

443 client_cert_source = None 

444 if use_cert_flag: 

445 if provided_cert_source: 

446 client_cert_source = provided_cert_source 

447 elif mtls.has_default_client_cert_source(): 

448 client_cert_source = mtls.default_client_cert_source() 

449 return client_cert_source 

450 

451 @staticmethod 

452 def _get_api_endpoint( 

453 api_override, client_cert_source, universe_domain, use_mtls_endpoint 

454 ): 

455 """Return the API endpoint used by the client. 

456 

457 Args: 

458 api_override (str): The API endpoint override. If specified, this is always 

459 the return value of this function and the other arguments are not used. 

460 client_cert_source (bytes): The client certificate source used by the client. 

461 universe_domain (str): The universe domain used by the client. 

462 use_mtls_endpoint (str): How to use the mTLS endpoint, which depends also on the other parameters. 

463 Possible values are "always", "auto", or "never". 

464 

465 Returns: 

466 str: The API endpoint to be used by the client. 

467 """ 

468 if api_override is not None: 

469 api_endpoint = api_override 

470 elif use_mtls_endpoint == "always" or ( 

471 use_mtls_endpoint == "auto" and client_cert_source 

472 ): 

473 _default_universe = BigQueryReadClient._DEFAULT_UNIVERSE 

474 if universe_domain != _default_universe: 

475 raise MutualTLSChannelError( 

476 f"mTLS is not supported in any universe other than {_default_universe}." 

477 ) 

478 api_endpoint = BigQueryReadClient.DEFAULT_MTLS_ENDPOINT 

479 else: 

480 api_endpoint = BigQueryReadClient._DEFAULT_ENDPOINT_TEMPLATE.format( 

481 UNIVERSE_DOMAIN=universe_domain 

482 ) 

483 return api_endpoint 

484 

485 @staticmethod 

486 def _get_universe_domain( 

487 client_universe_domain: Optional[str], universe_domain_env: Optional[str] 

488 ) -> str: 

489 """Return the universe domain used by the client. 

490 

491 Args: 

492 client_universe_domain (Optional[str]): The universe domain configured via the client options. 

493 universe_domain_env (Optional[str]): The universe domain configured via the "GOOGLE_CLOUD_UNIVERSE_DOMAIN" environment variable. 

494 

495 Returns: 

496 str: The universe domain to be used by the client. 

497 

498 Raises: 

499 ValueError: If the universe domain is an empty string. 

500 """ 

501 universe_domain = BigQueryReadClient._DEFAULT_UNIVERSE 

502 if client_universe_domain is not None: 

503 universe_domain = client_universe_domain 

504 elif universe_domain_env is not None: 

505 universe_domain = universe_domain_env 

506 if len(universe_domain.strip()) == 0: 

507 raise ValueError("Universe Domain cannot be an empty string.") 

508 return universe_domain 

509 

510 @staticmethod 

511 def _compare_universes( 

512 client_universe: str, credentials: ga_credentials.Credentials 

513 ) -> bool: 

514 """Returns True iff the universe domains used by the client and credentials match. 

515 

516 Args: 

517 client_universe (str): The universe domain configured via the client options. 

518 credentials (ga_credentials.Credentials): The credentials being used in the client. 

519 

520 Returns: 

521 bool: True iff client_universe matches the universe in credentials. 

522 

523 Raises: 

524 ValueError: when client_universe does not match the universe in credentials. 

525 """ 

526 

527 default_universe = BigQueryReadClient._DEFAULT_UNIVERSE 

528 credentials_universe = getattr(credentials, "universe_domain", default_universe) 

529 

530 if client_universe != credentials_universe: 

531 raise ValueError( 

532 "The configured universe domain " 

533 f"({client_universe}) does not match the universe domain " 

534 f"found in the credentials ({credentials_universe}). " 

535 "If you haven't configured the universe domain explicitly, " 

536 f"`{default_universe}` is the default." 

537 ) 

538 return True 

539 

540 def _validate_universe_domain(self): 

541 """Validates client's and credentials' universe domains are consistent. 

542 

543 Returns: 

544 bool: True iff the configured universe domain is valid. 

545 

546 Raises: 

547 ValueError: If the configured universe domain is not valid. 

548 """ 

549 self._is_universe_domain_valid = ( 

550 self._is_universe_domain_valid 

551 or BigQueryReadClient._compare_universes( 

552 self.universe_domain, self.transport._credentials 

553 ) 

554 ) 

555 return self._is_universe_domain_valid 

556 

557 @property 

558 def api_endpoint(self): 

559 """Return the API endpoint used by the client instance. 

560 

561 Returns: 

562 str: The API endpoint used by the client instance. 

563 """ 

564 return self._api_endpoint 

565 

566 @property 

567 def universe_domain(self) -> str: 

568 """Return the universe domain used by the client instance. 

569 

570 Returns: 

571 str: The universe domain used by the client instance. 

572 """ 

573 return self._universe_domain 

574 

575 def __init__( 

576 self, 

577 *, 

578 credentials: Optional[ga_credentials.Credentials] = None, 

579 transport: Optional[Union[str, BigQueryReadTransport]] = None, 

580 client_options: Optional[Union[client_options_lib.ClientOptions, dict]] = None, 

581 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

582 ) -> None: 

583 """Instantiates the big query read client. 

584 

585 Args: 

586 credentials (Optional[google.auth.credentials.Credentials]): The 

587 authorization credentials to attach to requests. These 

588 credentials identify the application to the service; if none 

589 are specified, the client will attempt to ascertain the 

590 credentials from the environment. 

591 transport (Union[str, BigQueryReadTransport]): The 

592 transport to use. If set to None, a transport is chosen 

593 automatically. 

594 client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]): 

595 Custom options for the client. 

596 

597 1. The ``api_endpoint`` property can be used to override the 

598 default endpoint provided by the client when ``transport`` is 

599 not explicitly provided. Only if this property is not set and 

600 ``transport`` was not explicitly provided, the endpoint is 

601 determined by the GOOGLE_API_USE_MTLS_ENDPOINT environment 

602 variable, which have one of the following values: 

603 "always" (always use the default mTLS endpoint), "never" (always 

604 use the default regular endpoint) and "auto" (auto-switch to the 

605 default mTLS endpoint if client certificate is present; this is 

606 the default value). 

607 

608 2. If the GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable 

609 is "true", then the ``client_cert_source`` property can be used 

610 to provide a client certificate for mTLS transport. If 

611 not provided, the default SSL client certificate will be used if 

612 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not 

613 set, no client certificate will be used. 

614 

615 3. The ``universe_domain`` property can be used to override the 

616 default "googleapis.com" universe. Note that the ``api_endpoint`` 

617 property still takes precedence; and ``universe_domain`` is 

618 currently not supported for mTLS. 

619 

620 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

621 The client info used to send a user-agent string along with 

622 API requests. If ``None``, then default info will be used. 

623 Generally, you only need to set this if you're developing 

624 your own client library. 

625 

626 Raises: 

627 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport 

628 creation failed for any reason. 

629 """ 

630 self._client_options = client_options 

631 if isinstance(self._client_options, dict): 

632 self._client_options = client_options_lib.from_dict(self._client_options) 

633 if self._client_options is None: 

634 self._client_options = client_options_lib.ClientOptions() 

635 self._client_options = cast( 

636 client_options_lib.ClientOptions, self._client_options 

637 ) 

638 

639 universe_domain_opt = getattr(self._client_options, "universe_domain", None) 

640 

641 ( 

642 self._use_client_cert, 

643 self._use_mtls_endpoint, 

644 self._universe_domain_env, 

645 ) = BigQueryReadClient._read_environment_variables() 

646 self._client_cert_source = BigQueryReadClient._get_client_cert_source( 

647 self._client_options.client_cert_source, self._use_client_cert 

648 ) 

649 self._universe_domain = BigQueryReadClient._get_universe_domain( 

650 universe_domain_opt, self._universe_domain_env 

651 ) 

652 self._api_endpoint = None # updated below, depending on `transport` 

653 

654 # Initialize the universe domain validation. 

655 self._is_universe_domain_valid = False 

656 

657 api_key_value = getattr(self._client_options, "api_key", None) 

658 if api_key_value and credentials: 

659 raise ValueError( 

660 "client_options.api_key and credentials are mutually exclusive" 

661 ) 

662 

663 # Save or instantiate the transport. 

664 # Ordinarily, we provide the transport, but allowing a custom transport 

665 # instance provides an extensibility point for unusual situations. 

666 transport_provided = isinstance(transport, BigQueryReadTransport) 

667 if transport_provided: 

668 # transport is a BigQueryReadTransport instance. 

669 if credentials or self._client_options.credentials_file or api_key_value: 

670 raise ValueError( 

671 "When providing a transport instance, " 

672 "provide its credentials directly." 

673 ) 

674 if self._client_options.scopes: 

675 raise ValueError( 

676 "When providing a transport instance, provide its scopes " 

677 "directly." 

678 ) 

679 self._transport = cast(BigQueryReadTransport, transport) 

680 self._api_endpoint = self._transport.host 

681 

682 self._api_endpoint = self._api_endpoint or BigQueryReadClient._get_api_endpoint( 

683 self._client_options.api_endpoint, 

684 self._client_cert_source, 

685 self._universe_domain, 

686 self._use_mtls_endpoint, 

687 ) 

688 

689 if not transport_provided: 

690 import google.auth._default # type: ignore 

691 

692 if api_key_value and hasattr( 

693 google.auth._default, "get_api_key_credentials" 

694 ): 

695 credentials = google.auth._default.get_api_key_credentials( 

696 api_key_value 

697 ) 

698 

699 Transport = type(self).get_transport_class(cast(str, transport)) 

700 self._transport = Transport( 

701 credentials=credentials, 

702 credentials_file=self._client_options.credentials_file, 

703 host=self._api_endpoint, 

704 scopes=self._client_options.scopes, 

705 client_cert_source_for_mtls=self._client_cert_source, 

706 quota_project_id=self._client_options.quota_project_id, 

707 client_info=client_info, 

708 always_use_jwt_access=True, 

709 api_audience=self._client_options.api_audience, 

710 ) 

711 

712 def create_read_session( 

713 self, 

714 request: Optional[Union[storage.CreateReadSessionRequest, dict]] = None, 

715 *, 

716 parent: Optional[str] = None, 

717 read_session: Optional[stream.ReadSession] = None, 

718 max_stream_count: Optional[int] = None, 

719 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

720 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

721 metadata: Sequence[Tuple[str, str]] = (), 

722 ) -> stream.ReadSession: 

723 r"""Creates a new read session. A read session divides 

724 the contents of a BigQuery table into one or more 

725 streams, which can then be used to read data from the 

726 table. The read session also specifies properties of the 

727 data to be read, such as a list of columns or a 

728 push-down filter describing the rows to be returned. 

729 

730 A particular row can be read by at most one stream. When 

731 the caller has reached the end of each stream in the 

732 session, then all the data in the table has been read. 

733 

734 Data is assigned to each stream such that roughly the 

735 same number of rows can be read from each stream. 

736 Because the server-side unit for assigning data is 

737 collections of rows, the API does not guarantee that 

738 each stream will return the same number or rows. 

739 Additionally, the limits are enforced based on the 

740 number of pre-filtered rows, so some filters can lead to 

741 lopsided assignments. 

742 

743 Read sessions automatically expire 6 hours after they 

744 are created and do not require manual clean-up by the 

745 caller. 

746 

747 .. code-block:: python 

748 

749 # This snippet has been automatically generated and should be regarded as a 

750 # code template only. 

751 # It will require modifications to work: 

752 # - It may require correct/in-range values for request initialization. 

753 # - It may require specifying regional endpoints when creating the service 

754 # client as shown in: 

755 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

756 from google.cloud import bigquery_storage_v1 

757 

758 def sample_create_read_session(): 

759 # Create a client 

760 client = bigquery_storage_v1.BigQueryReadClient() 

761 

762 # Initialize request argument(s) 

763 request = bigquery_storage_v1.CreateReadSessionRequest( 

764 parent="parent_value", 

765 ) 

766 

767 # Make the request 

768 response = client.create_read_session(request=request) 

769 

770 # Handle the response 

771 print(response) 

772 

773 Args: 

774 request (Union[google.cloud.bigquery_storage_v1.types.CreateReadSessionRequest, dict]): 

775 The request object. Request message for ``CreateReadSession``. 

776 parent (str): 

777 Required. The request project that owns the session, in 

778 the form of ``projects/{project_id}``. 

779 

780 This corresponds to the ``parent`` field 

781 on the ``request`` instance; if ``request`` is provided, this 

782 should not be set. 

783 read_session (google.cloud.bigquery_storage_v1.types.ReadSession): 

784 Required. Session to be created. 

785 This corresponds to the ``read_session`` field 

786 on the ``request`` instance; if ``request`` is provided, this 

787 should not be set. 

788 max_stream_count (int): 

789 Max initial number of streams. If unset or zero, the 

790 server will provide a value of streams so as to produce 

791 reasonable throughput. Must be non-negative. The number 

792 of streams may be lower than the requested number, 

793 depending on the amount parallelism that is reasonable 

794 for the table. There is a default system max limit of 

795 1,000. 

796 

797 This must be greater than or equal to 

798 preferred_min_stream_count. Typically, clients should 

799 either leave this unset to let the system to determine 

800 an upper bound OR set this a size for the maximum "units 

801 of work" it can gracefully handle. 

802 

803 This corresponds to the ``max_stream_count`` field 

804 on the ``request`` instance; if ``request`` is provided, this 

805 should not be set. 

806 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

807 should be retried. 

808 timeout (float): The timeout for this request. 

809 metadata (Sequence[Tuple[str, str]]): Strings which should be 

810 sent along with the request as metadata. 

811 

812 Returns: 

813 google.cloud.bigquery_storage_v1.types.ReadSession: 

814 Information about the ReadSession. 

815 """ 

816 # Create or coerce a protobuf request object. 

817 # Quick check: If we got a request object, we should *not* have 

818 # gotten any keyword arguments that map to the request. 

819 has_flattened_params = any([parent, read_session, max_stream_count]) 

820 if request is not None and has_flattened_params: 

821 raise ValueError( 

822 "If the `request` argument is set, then none of " 

823 "the individual field arguments should be set." 

824 ) 

825 

826 # Minor optimization to avoid making a copy if the user passes 

827 # in a storage.CreateReadSessionRequest. 

828 # There's no risk of modifying the input as we've already verified 

829 # there are no flattened fields. 

830 if not isinstance(request, storage.CreateReadSessionRequest): 

831 request = storage.CreateReadSessionRequest(request) 

832 # If we have keyword arguments corresponding to fields on the 

833 # request, apply these. 

834 if parent is not None: 

835 request.parent = parent 

836 if read_session is not None: 

837 request.read_session = read_session 

838 if max_stream_count is not None: 

839 request.max_stream_count = max_stream_count 

840 

841 # Wrap the RPC method; this adds retry and timeout information, 

842 # and friendly error handling. 

843 rpc = self._transport._wrapped_methods[self._transport.create_read_session] 

844 

845 # Certain fields should be provided within the metadata header; 

846 # add these here. 

847 metadata = tuple(metadata) + ( 

848 gapic_v1.routing_header.to_grpc_metadata( 

849 (("read_session.table", request.read_session.table),) 

850 ), 

851 ) 

852 

853 # Validate the universe domain. 

854 self._validate_universe_domain() 

855 

856 # Send the request. 

857 response = rpc( 

858 request, 

859 retry=retry, 

860 timeout=timeout, 

861 metadata=metadata, 

862 ) 

863 

864 # Done; return the response. 

865 return response 

866 

867 def read_rows( 

868 self, 

869 request: Optional[Union[storage.ReadRowsRequest, dict]] = None, 

870 *, 

871 read_stream: Optional[str] = None, 

872 offset: Optional[int] = None, 

873 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

874 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

875 metadata: Sequence[Tuple[str, str]] = (), 

876 ) -> Iterable[storage.ReadRowsResponse]: 

877 r"""Reads rows from the stream in the format prescribed 

878 by the ReadSession. Each response contains one or more 

879 table rows, up to a maximum of 100 MiB per response; 

880 read requests which attempt to read individual rows 

881 larger than 100 MiB will fail. 

882 

883 Each request also returns a set of stream statistics 

884 reflecting the current state of the stream. 

885 

886 .. code-block:: python 

887 

888 # This snippet has been automatically generated and should be regarded as a 

889 # code template only. 

890 # It will require modifications to work: 

891 # - It may require correct/in-range values for request initialization. 

892 # - It may require specifying regional endpoints when creating the service 

893 # client as shown in: 

894 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

895 from google.cloud import bigquery_storage_v1 

896 

897 def sample_read_rows(): 

898 # Create a client 

899 client = bigquery_storage_v1.BigQueryReadClient() 

900 

901 # Initialize request argument(s) 

902 request = bigquery_storage_v1.ReadRowsRequest( 

903 read_stream="read_stream_value", 

904 ) 

905 

906 # Make the request 

907 stream = client.read_rows(request=request) 

908 

909 # Handle the response 

910 for response in stream: 

911 print(response) 

912 

913 Args: 

914 request (Union[google.cloud.bigquery_storage_v1.types.ReadRowsRequest, dict]): 

915 The request object. Request message for ``ReadRows``. 

916 read_stream (str): 

917 Required. Stream to read rows from. 

918 This corresponds to the ``read_stream`` field 

919 on the ``request`` instance; if ``request`` is provided, this 

920 should not be set. 

921 offset (int): 

922 The offset requested must be less 

923 than the last row read from Read. 

924 Requesting a larger offset is undefined. 

925 If not specified, start reading from 

926 offset zero. 

927 

928 This corresponds to the ``offset`` field 

929 on the ``request`` instance; if ``request`` is provided, this 

930 should not be set. 

931 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

932 should be retried. 

933 timeout (float): The timeout for this request. 

934 metadata (Sequence[Tuple[str, str]]): Strings which should be 

935 sent along with the request as metadata. 

936 

937 Returns: 

938 Iterable[google.cloud.bigquery_storage_v1.types.ReadRowsResponse]: 

939 Response from calling ReadRows may include row data, progress and 

940 throttling information. 

941 

942 """ 

943 # Create or coerce a protobuf request object. 

944 # Quick check: If we got a request object, we should *not* have 

945 # gotten any keyword arguments that map to the request. 

946 has_flattened_params = any([read_stream, offset]) 

947 if request is not None and has_flattened_params: 

948 raise ValueError( 

949 "If the `request` argument is set, then none of " 

950 "the individual field arguments should be set." 

951 ) 

952 

953 # Minor optimization to avoid making a copy if the user passes 

954 # in a storage.ReadRowsRequest. 

955 # There's no risk of modifying the input as we've already verified 

956 # there are no flattened fields. 

957 if not isinstance(request, storage.ReadRowsRequest): 

958 request = storage.ReadRowsRequest(request) 

959 # If we have keyword arguments corresponding to fields on the 

960 # request, apply these. 

961 if read_stream is not None: 

962 request.read_stream = read_stream 

963 if offset is not None: 

964 request.offset = offset 

965 

966 # Wrap the RPC method; this adds retry and timeout information, 

967 # and friendly error handling. 

968 rpc = self._transport._wrapped_methods[self._transport.read_rows] 

969 

970 # Certain fields should be provided within the metadata header; 

971 # add these here. 

972 metadata = tuple(metadata) + ( 

973 gapic_v1.routing_header.to_grpc_metadata( 

974 (("read_stream", request.read_stream),) 

975 ), 

976 ) 

977 

978 # Validate the universe domain. 

979 self._validate_universe_domain() 

980 

981 # Send the request. 

982 response = rpc( 

983 request, 

984 retry=retry, 

985 timeout=timeout, 

986 metadata=metadata, 

987 ) 

988 

989 # Done; return the response. 

990 return response 

991 

992 def split_read_stream( 

993 self, 

994 request: Optional[Union[storage.SplitReadStreamRequest, dict]] = None, 

995 *, 

996 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

997 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

998 metadata: Sequence[Tuple[str, str]] = (), 

999 ) -> storage.SplitReadStreamResponse: 

1000 r"""Splits a given ``ReadStream`` into two ``ReadStream`` objects. 

1001 These ``ReadStream`` objects are referred to as the primary and 

1002 the residual streams of the split. The original ``ReadStream`` 

1003 can still be read from in the same manner as before. Both of the 

1004 returned ``ReadStream`` objects can also be read from, and the 

1005 rows returned by both child streams will be the same as the rows 

1006 read from the original stream. 

1007 

1008 Moreover, the two child streams will be allocated back-to-back 

1009 in the original ``ReadStream``. Concretely, it is guaranteed 

1010 that for streams original, primary, and residual, that 

1011 original[0-j] = primary[0-j] and original[j-n] = residual[0-m] 

1012 once the streams have been read to completion. 

1013 

1014 .. code-block:: python 

1015 

1016 # This snippet has been automatically generated and should be regarded as a 

1017 # code template only. 

1018 # It will require modifications to work: 

1019 # - It may require correct/in-range values for request initialization. 

1020 # - It may require specifying regional endpoints when creating the service 

1021 # client as shown in: 

1022 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

1023 from google.cloud import bigquery_storage_v1 

1024 

1025 def sample_split_read_stream(): 

1026 # Create a client 

1027 client = bigquery_storage_v1.BigQueryReadClient() 

1028 

1029 # Initialize request argument(s) 

1030 request = bigquery_storage_v1.SplitReadStreamRequest( 

1031 name="name_value", 

1032 ) 

1033 

1034 # Make the request 

1035 response = client.split_read_stream(request=request) 

1036 

1037 # Handle the response 

1038 print(response) 

1039 

1040 Args: 

1041 request (Union[google.cloud.bigquery_storage_v1.types.SplitReadStreamRequest, dict]): 

1042 The request object. Request message for ``SplitReadStream``. 

1043 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

1044 should be retried. 

1045 timeout (float): The timeout for this request. 

1046 metadata (Sequence[Tuple[str, str]]): Strings which should be 

1047 sent along with the request as metadata. 

1048 

1049 Returns: 

1050 google.cloud.bigquery_storage_v1.types.SplitReadStreamResponse: 

1051 Response message for SplitReadStream. 

1052 """ 

1053 # Create or coerce a protobuf request object. 

1054 # Minor optimization to avoid making a copy if the user passes 

1055 # in a storage.SplitReadStreamRequest. 

1056 # There's no risk of modifying the input as we've already verified 

1057 # there are no flattened fields. 

1058 if not isinstance(request, storage.SplitReadStreamRequest): 

1059 request = storage.SplitReadStreamRequest(request) 

1060 

1061 # Wrap the RPC method; this adds retry and timeout information, 

1062 # and friendly error handling. 

1063 rpc = self._transport._wrapped_methods[self._transport.split_read_stream] 

1064 

1065 # Certain fields should be provided within the metadata header; 

1066 # add these here. 

1067 metadata = tuple(metadata) + ( 

1068 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), 

1069 ) 

1070 

1071 # Validate the universe domain. 

1072 self._validate_universe_domain() 

1073 

1074 # Send the request. 

1075 response = rpc( 

1076 request, 

1077 retry=retry, 

1078 timeout=timeout, 

1079 metadata=metadata, 

1080 ) 

1081 

1082 # Done; return the response. 

1083 return response 

1084 

1085 def __enter__(self) -> "BigQueryReadClient": 

1086 return self 

1087 

1088 def __exit__(self, type, value, traceback): 

1089 """Releases underlying transport's resources. 

1090 

1091 .. warning:: 

1092 ONLY use as a context manager if the transport is NOT shared 

1093 with other clients! Exiting the with block will CLOSE the transport 

1094 and may cause errors in other clients! 

1095 """ 

1096 self.transport.close() 

1097 

1098 

1099DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

1100 gapic_version=package_version.__version__ 

1101) 

1102 

1103 

1104__all__ = ("BigQueryReadClient",)