Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/client.py: 44%

203 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:10 +0000

1# -*- coding: utf-8 -*- 

2# Copyright 2022 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16from collections import OrderedDict 

17import os 

18import re 

19from typing import ( 

20 Dict, 

21 Mapping, 

22 MutableMapping, 

23 MutableSequence, 

24 Optional, 

25 Iterable, 

26 Sequence, 

27 Tuple, 

28 Type, 

29 Union, 

30 cast, 

31) 

32 

33from google.cloud.bigquery_storage_v1 import gapic_version as package_version 

34 

35from google.api_core import client_options as client_options_lib 

36from google.api_core import exceptions as core_exceptions 

37from google.api_core import gapic_v1 

38from google.api_core import retry as retries 

39from google.auth import credentials as ga_credentials # type: ignore 

40from google.auth.transport import mtls # type: ignore 

41from google.auth.transport.grpc import SslCredentials # type: ignore 

42from google.auth.exceptions import MutualTLSChannelError # type: ignore 

43from google.oauth2 import service_account # type: ignore 

44 

45try: 

46 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault] 

47except AttributeError: # pragma: NO COVER 

48 OptionalRetry = Union[retries.Retry, object] # type: ignore 

49 

50from google.cloud.bigquery_storage_v1.types import arrow 

51from google.cloud.bigquery_storage_v1.types import avro 

52from google.cloud.bigquery_storage_v1.types import storage 

53from google.cloud.bigquery_storage_v1.types import stream 

54from google.protobuf import timestamp_pb2 # type: ignore 

55from .transports.base import BigQueryReadTransport, DEFAULT_CLIENT_INFO 

56from .transports.grpc import BigQueryReadGrpcTransport 

57from .transports.grpc_asyncio import BigQueryReadGrpcAsyncIOTransport 

58 

59 

60class BigQueryReadClientMeta(type): 

61 """Metaclass for the BigQueryRead client. 

62 

63 This provides class-level methods for building and retrieving 

64 support objects (e.g. transport) without polluting the client instance 

65 objects. 

66 """ 

67 

68 _transport_registry = OrderedDict() # type: Dict[str, Type[BigQueryReadTransport]] 

69 _transport_registry["grpc"] = BigQueryReadGrpcTransport 

70 _transport_registry["grpc_asyncio"] = BigQueryReadGrpcAsyncIOTransport 

71 

72 def get_transport_class( 

73 cls, 

74 label: Optional[str] = None, 

75 ) -> Type[BigQueryReadTransport]: 

76 """Returns an appropriate transport class. 

77 

78 Args: 

79 label: The name of the desired transport. If none is 

80 provided, then the first transport in the registry is used. 

81 

82 Returns: 

83 The transport class to use. 

84 """ 

85 # If a specific transport is requested, return that one. 

86 if label: 

87 return cls._transport_registry[label] 

88 

89 # No transport is requested; return the default (that is, the first one 

90 # in the dictionary). 

91 return next(iter(cls._transport_registry.values())) 

92 

93 

94class BigQueryReadClient(metaclass=BigQueryReadClientMeta): 

95 """BigQuery Read API. 

96 The Read API can be used to read data from BigQuery. 

97 """ 

98 

99 @staticmethod 

100 def _get_default_mtls_endpoint(api_endpoint): 

101 """Converts api endpoint to mTLS endpoint. 

102 

103 Convert "*.sandbox.googleapis.com" and "*.googleapis.com" to 

104 "*.mtls.sandbox.googleapis.com" and "*.mtls.googleapis.com" respectively. 

105 Args: 

106 api_endpoint (Optional[str]): the api endpoint to convert. 

107 Returns: 

108 str: converted mTLS api endpoint. 

109 """ 

110 if not api_endpoint: 

111 return api_endpoint 

112 

113 mtls_endpoint_re = re.compile( 

114 r"(?P<name>[^.]+)(?P<mtls>\.mtls)?(?P<sandbox>\.sandbox)?(?P<googledomain>\.googleapis\.com)?" 

115 ) 

116 

117 m = mtls_endpoint_re.match(api_endpoint) 

118 name, mtls, sandbox, googledomain = m.groups() 

119 if mtls or not googledomain: 

120 return api_endpoint 

121 

122 if sandbox: 

123 return api_endpoint.replace( 

124 "sandbox.googleapis.com", "mtls.sandbox.googleapis.com" 

125 ) 

126 

127 return api_endpoint.replace(".googleapis.com", ".mtls.googleapis.com") 

128 

129 DEFAULT_ENDPOINT = "bigquerystorage.googleapis.com" 

130 DEFAULT_MTLS_ENDPOINT = _get_default_mtls_endpoint.__func__( # type: ignore 

131 DEFAULT_ENDPOINT 

132 ) 

133 

134 @classmethod 

135 def from_service_account_info(cls, info: dict, *args, **kwargs): 

136 """Creates an instance of this client using the provided credentials 

137 info. 

138 

139 Args: 

140 info (dict): The service account private key info. 

141 args: Additional arguments to pass to the constructor. 

142 kwargs: Additional arguments to pass to the constructor. 

143 

144 Returns: 

145 BigQueryReadClient: The constructed client. 

146 """ 

147 credentials = service_account.Credentials.from_service_account_info(info) 

148 kwargs["credentials"] = credentials 

149 return cls(*args, **kwargs) 

150 

151 @classmethod 

152 def from_service_account_file(cls, filename: str, *args, **kwargs): 

153 """Creates an instance of this client using the provided credentials 

154 file. 

155 

156 Args: 

157 filename (str): The path to the service account private key json 

158 file. 

159 args: Additional arguments to pass to the constructor. 

160 kwargs: Additional arguments to pass to the constructor. 

161 

162 Returns: 

163 BigQueryReadClient: The constructed client. 

164 """ 

165 credentials = service_account.Credentials.from_service_account_file(filename) 

166 kwargs["credentials"] = credentials 

167 return cls(*args, **kwargs) 

168 

169 from_service_account_json = from_service_account_file 

170 

171 @property 

172 def transport(self) -> BigQueryReadTransport: 

173 """Returns the transport used by the client instance. 

174 

175 Returns: 

176 BigQueryReadTransport: The transport used by the client 

177 instance. 

178 """ 

179 return self._transport 

180 

181 @staticmethod 

182 def read_session_path( 

183 project: str, 

184 location: str, 

185 session: str, 

186 ) -> str: 

187 """Returns a fully-qualified read_session string.""" 

188 return "projects/{project}/locations/{location}/sessions/{session}".format( 

189 project=project, 

190 location=location, 

191 session=session, 

192 ) 

193 

194 @staticmethod 

195 def parse_read_session_path(path: str) -> Dict[str, str]: 

196 """Parses a read_session path into its component segments.""" 

197 m = re.match( 

198 r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)/sessions/(?P<session>.+?)$", 

199 path, 

200 ) 

201 return m.groupdict() if m else {} 

202 

203 @staticmethod 

204 def read_stream_path( 

205 project: str, 

206 location: str, 

207 session: str, 

208 stream: str, 

209 ) -> str: 

210 """Returns a fully-qualified read_stream string.""" 

211 return "projects/{project}/locations/{location}/sessions/{session}/streams/{stream}".format( 

212 project=project, 

213 location=location, 

214 session=session, 

215 stream=stream, 

216 ) 

217 

218 @staticmethod 

219 def parse_read_stream_path(path: str) -> Dict[str, str]: 

220 """Parses a read_stream path into its component segments.""" 

221 m = re.match( 

222 r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)/sessions/(?P<session>.+?)/streams/(?P<stream>.+?)$", 

223 path, 

224 ) 

225 return m.groupdict() if m else {} 

226 

227 @staticmethod 

228 def table_path( 

229 project: str, 

230 dataset: str, 

231 table: str, 

232 ) -> str: 

233 """Returns a fully-qualified table string.""" 

234 return "projects/{project}/datasets/{dataset}/tables/{table}".format( 

235 project=project, 

236 dataset=dataset, 

237 table=table, 

238 ) 

239 

240 @staticmethod 

241 def parse_table_path(path: str) -> Dict[str, str]: 

242 """Parses a table path into its component segments.""" 

243 m = re.match( 

244 r"^projects/(?P<project>.+?)/datasets/(?P<dataset>.+?)/tables/(?P<table>.+?)$", 

245 path, 

246 ) 

247 return m.groupdict() if m else {} 

248 

249 @staticmethod 

250 def common_billing_account_path( 

251 billing_account: str, 

252 ) -> str: 

253 """Returns a fully-qualified billing_account string.""" 

254 return "billingAccounts/{billing_account}".format( 

255 billing_account=billing_account, 

256 ) 

257 

258 @staticmethod 

259 def parse_common_billing_account_path(path: str) -> Dict[str, str]: 

260 """Parse a billing_account path into its component segments.""" 

261 m = re.match(r"^billingAccounts/(?P<billing_account>.+?)$", path) 

262 return m.groupdict() if m else {} 

263 

264 @staticmethod 

265 def common_folder_path( 

266 folder: str, 

267 ) -> str: 

268 """Returns a fully-qualified folder string.""" 

269 return "folders/{folder}".format( 

270 folder=folder, 

271 ) 

272 

273 @staticmethod 

274 def parse_common_folder_path(path: str) -> Dict[str, str]: 

275 """Parse a folder path into its component segments.""" 

276 m = re.match(r"^folders/(?P<folder>.+?)$", path) 

277 return m.groupdict() if m else {} 

278 

279 @staticmethod 

280 def common_organization_path( 

281 organization: str, 

282 ) -> str: 

283 """Returns a fully-qualified organization string.""" 

284 return "organizations/{organization}".format( 

285 organization=organization, 

286 ) 

287 

288 @staticmethod 

289 def parse_common_organization_path(path: str) -> Dict[str, str]: 

290 """Parse a organization path into its component segments.""" 

291 m = re.match(r"^organizations/(?P<organization>.+?)$", path) 

292 return m.groupdict() if m else {} 

293 

294 @staticmethod 

295 def common_project_path( 

296 project: str, 

297 ) -> str: 

298 """Returns a fully-qualified project string.""" 

299 return "projects/{project}".format( 

300 project=project, 

301 ) 

302 

303 @staticmethod 

304 def parse_common_project_path(path: str) -> Dict[str, str]: 

305 """Parse a project path into its component segments.""" 

306 m = re.match(r"^projects/(?P<project>.+?)$", path) 

307 return m.groupdict() if m else {} 

308 

309 @staticmethod 

310 def common_location_path( 

311 project: str, 

312 location: str, 

313 ) -> str: 

314 """Returns a fully-qualified location string.""" 

315 return "projects/{project}/locations/{location}".format( 

316 project=project, 

317 location=location, 

318 ) 

319 

320 @staticmethod 

321 def parse_common_location_path(path: str) -> Dict[str, str]: 

322 """Parse a location path into its component segments.""" 

323 m = re.match(r"^projects/(?P<project>.+?)/locations/(?P<location>.+?)$", path) 

324 return m.groupdict() if m else {} 

325 

326 @classmethod 

327 def get_mtls_endpoint_and_cert_source( 

328 cls, client_options: Optional[client_options_lib.ClientOptions] = None 

329 ): 

330 """Return the API endpoint and client cert source for mutual TLS. 

331 

332 The client cert source is determined in the following order: 

333 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the 

334 client cert source is None. 

335 (2) if `client_options.client_cert_source` is provided, use the provided one; if the 

336 default client cert source exists, use the default one; otherwise the client cert 

337 source is None. 

338 

339 The API endpoint is determined in the following order: 

340 (1) if `client_options.api_endpoint` if provided, use the provided one. 

341 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the 

342 default mTLS endpoint; if the environment variable is "never", use the default API 

343 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise 

344 use the default API endpoint. 

345 

346 More details can be found at https://google.aip.dev/auth/4114. 

347 

348 Args: 

349 client_options (google.api_core.client_options.ClientOptions): Custom options for the 

350 client. Only the `api_endpoint` and `client_cert_source` properties may be used 

351 in this method. 

352 

353 Returns: 

354 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the 

355 client cert source to use. 

356 

357 Raises: 

358 google.auth.exceptions.MutualTLSChannelError: If any errors happen. 

359 """ 

360 if client_options is None: 

361 client_options = client_options_lib.ClientOptions() 

362 use_client_cert = os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false") 

363 use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto") 

364 if use_client_cert not in ("true", "false"): 

365 raise ValueError( 

366 "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`" 

367 ) 

368 if use_mtls_endpoint not in ("auto", "never", "always"): 

369 raise MutualTLSChannelError( 

370 "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`" 

371 ) 

372 

373 # Figure out the client cert source to use. 

374 client_cert_source = None 

375 if use_client_cert == "true": 

376 if client_options.client_cert_source: 

377 client_cert_source = client_options.client_cert_source 

378 elif mtls.has_default_client_cert_source(): 

379 client_cert_source = mtls.default_client_cert_source() 

380 

381 # Figure out which api endpoint to use. 

382 if client_options.api_endpoint is not None: 

383 api_endpoint = client_options.api_endpoint 

384 elif use_mtls_endpoint == "always" or ( 

385 use_mtls_endpoint == "auto" and client_cert_source 

386 ): 

387 api_endpoint = cls.DEFAULT_MTLS_ENDPOINT 

388 else: 

389 api_endpoint = cls.DEFAULT_ENDPOINT 

390 

391 return api_endpoint, client_cert_source 

392 

393 def __init__( 

394 self, 

395 *, 

396 credentials: Optional[ga_credentials.Credentials] = None, 

397 transport: Optional[Union[str, BigQueryReadTransport]] = None, 

398 client_options: Optional[Union[client_options_lib.ClientOptions, dict]] = None, 

399 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

400 ) -> None: 

401 """Instantiates the big query read client. 

402 

403 Args: 

404 credentials (Optional[google.auth.credentials.Credentials]): The 

405 authorization credentials to attach to requests. These 

406 credentials identify the application to the service; if none 

407 are specified, the client will attempt to ascertain the 

408 credentials from the environment. 

409 transport (Union[str, BigQueryReadTransport]): The 

410 transport to use. If set to None, a transport is chosen 

411 automatically. 

412 client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]): Custom options for the 

413 client. It won't take effect if a ``transport`` instance is provided. 

414 (1) The ``api_endpoint`` property can be used to override the 

415 default endpoint provided by the client. GOOGLE_API_USE_MTLS_ENDPOINT 

416 environment variable can also be used to override the endpoint: 

417 "always" (always use the default mTLS endpoint), "never" (always 

418 use the default regular endpoint) and "auto" (auto switch to the 

419 default mTLS endpoint if client certificate is present, this is 

420 the default value). However, the ``api_endpoint`` property takes 

421 precedence if provided. 

422 (2) If GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable 

423 is "true", then the ``client_cert_source`` property can be used 

424 to provide client certificate for mutual TLS transport. If 

425 not provided, the default SSL client certificate will be used if 

426 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not 

427 set, no client certificate will be used. 

428 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

429 The client info used to send a user-agent string along with 

430 API requests. If ``None``, then default info will be used. 

431 Generally, you only need to set this if you're developing 

432 your own client library. 

433 

434 Raises: 

435 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport 

436 creation failed for any reason. 

437 """ 

438 if isinstance(client_options, dict): 

439 client_options = client_options_lib.from_dict(client_options) 

440 if client_options is None: 

441 client_options = client_options_lib.ClientOptions() 

442 client_options = cast(client_options_lib.ClientOptions, client_options) 

443 

444 api_endpoint, client_cert_source_func = self.get_mtls_endpoint_and_cert_source( 

445 client_options 

446 ) 

447 

448 api_key_value = getattr(client_options, "api_key", None) 

449 if api_key_value and credentials: 

450 raise ValueError( 

451 "client_options.api_key and credentials are mutually exclusive" 

452 ) 

453 

454 # Save or instantiate the transport. 

455 # Ordinarily, we provide the transport, but allowing a custom transport 

456 # instance provides an extensibility point for unusual situations. 

457 if isinstance(transport, BigQueryReadTransport): 

458 # transport is a BigQueryReadTransport instance. 

459 if credentials or client_options.credentials_file or api_key_value: 

460 raise ValueError( 

461 "When providing a transport instance, " 

462 "provide its credentials directly." 

463 ) 

464 if client_options.scopes: 

465 raise ValueError( 

466 "When providing a transport instance, provide its scopes " 

467 "directly." 

468 ) 

469 self._transport = transport 

470 else: 

471 import google.auth._default # type: ignore 

472 

473 if api_key_value and hasattr( 

474 google.auth._default, "get_api_key_credentials" 

475 ): 

476 credentials = google.auth._default.get_api_key_credentials( 

477 api_key_value 

478 ) 

479 

480 Transport = type(self).get_transport_class(transport) 

481 self._transport = Transport( 

482 credentials=credentials, 

483 credentials_file=client_options.credentials_file, 

484 host=api_endpoint, 

485 scopes=client_options.scopes, 

486 client_cert_source_for_mtls=client_cert_source_func, 

487 quota_project_id=client_options.quota_project_id, 

488 client_info=client_info, 

489 always_use_jwt_access=True, 

490 api_audience=client_options.api_audience, 

491 ) 

492 

493 def create_read_session( 

494 self, 

495 request: Optional[Union[storage.CreateReadSessionRequest, dict]] = None, 

496 *, 

497 parent: Optional[str] = None, 

498 read_session: Optional[stream.ReadSession] = None, 

499 max_stream_count: Optional[int] = None, 

500 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

501 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

502 metadata: Sequence[Tuple[str, str]] = (), 

503 ) -> stream.ReadSession: 

504 r"""Creates a new read session. A read session divides 

505 the contents of a BigQuery table into one or more 

506 streams, which can then be used to read data from the 

507 table. The read session also specifies properties of the 

508 data to be read, such as a list of columns or a 

509 push-down filter describing the rows to be returned. 

510 

511 A particular row can be read by at most one stream. When 

512 the caller has reached the end of each stream in the 

513 session, then all the data in the table has been read. 

514 

515 Data is assigned to each stream such that roughly the 

516 same number of rows can be read from each stream. 

517 Because the server-side unit for assigning data is 

518 collections of rows, the API does not guarantee that 

519 each stream will return the same number or rows. 

520 Additionally, the limits are enforced based on the 

521 number of pre-filtered rows, so some filters can lead to 

522 lopsided assignments. 

523 

524 Read sessions automatically expire 6 hours after they 

525 are created and do not require manual clean-up by the 

526 caller. 

527 

528 .. code-block:: python 

529 

530 # This snippet has been automatically generated and should be regarded as a 

531 # code template only. 

532 # It will require modifications to work: 

533 # - It may require correct/in-range values for request initialization. 

534 # - It may require specifying regional endpoints when creating the service 

535 # client as shown in: 

536 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

537 from google.cloud import bigquery_storage_v1 

538 

539 def sample_create_read_session(): 

540 # Create a client 

541 client = bigquery_storage_v1.BigQueryReadClient() 

542 

543 # Initialize request argument(s) 

544 request = bigquery_storage_v1.CreateReadSessionRequest( 

545 parent="parent_value", 

546 ) 

547 

548 # Make the request 

549 response = client.create_read_session(request=request) 

550 

551 # Handle the response 

552 print(response) 

553 

554 Args: 

555 request (Union[google.cloud.bigquery_storage_v1.types.CreateReadSessionRequest, dict]): 

556 The request object. Request message for ``CreateReadSession``. 

557 parent (str): 

558 Required. The request project that owns the session, in 

559 the form of ``projects/{project_id}``. 

560 

561 This corresponds to the ``parent`` field 

562 on the ``request`` instance; if ``request`` is provided, this 

563 should not be set. 

564 read_session (google.cloud.bigquery_storage_v1.types.ReadSession): 

565 Required. Session to be created. 

566 This corresponds to the ``read_session`` field 

567 on the ``request`` instance; if ``request`` is provided, this 

568 should not be set. 

569 max_stream_count (int): 

570 Max initial number of streams. If unset or zero, the 

571 server will provide a value of streams so as to produce 

572 reasonable throughput. Must be non-negative. The number 

573 of streams may be lower than the requested number, 

574 depending on the amount parallelism that is reasonable 

575 for the table. There is a default system max limit of 

576 1,000. 

577 

578 This must be greater than or equal to 

579 preferred_min_stream_count. Typically, clients should 

580 either leave this unset to let the system to determine 

581 an upper bound OR set this a size for the maximum "units 

582 of work" it can gracefully handle. 

583 

584 This corresponds to the ``max_stream_count`` field 

585 on the ``request`` instance; if ``request`` is provided, this 

586 should not be set. 

587 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

588 should be retried. 

589 timeout (float): The timeout for this request. 

590 metadata (Sequence[Tuple[str, str]]): Strings which should be 

591 sent along with the request as metadata. 

592 

593 Returns: 

594 google.cloud.bigquery_storage_v1.types.ReadSession: 

595 Information about the ReadSession. 

596 """ 

597 # Create or coerce a protobuf request object. 

598 # Quick check: If we got a request object, we should *not* have 

599 # gotten any keyword arguments that map to the request. 

600 has_flattened_params = any([parent, read_session, max_stream_count]) 

601 if request is not None and has_flattened_params: 

602 raise ValueError( 

603 "If the `request` argument is set, then none of " 

604 "the individual field arguments should be set." 

605 ) 

606 

607 # Minor optimization to avoid making a copy if the user passes 

608 # in a storage.CreateReadSessionRequest. 

609 # There's no risk of modifying the input as we've already verified 

610 # there are no flattened fields. 

611 if not isinstance(request, storage.CreateReadSessionRequest): 

612 request = storage.CreateReadSessionRequest(request) 

613 # If we have keyword arguments corresponding to fields on the 

614 # request, apply these. 

615 if parent is not None: 

616 request.parent = parent 

617 if read_session is not None: 

618 request.read_session = read_session 

619 if max_stream_count is not None: 

620 request.max_stream_count = max_stream_count 

621 

622 # Wrap the RPC method; this adds retry and timeout information, 

623 # and friendly error handling. 

624 rpc = self._transport._wrapped_methods[self._transport.create_read_session] 

625 

626 # Certain fields should be provided within the metadata header; 

627 # add these here. 

628 metadata = tuple(metadata) + ( 

629 gapic_v1.routing_header.to_grpc_metadata( 

630 (("read_session.table", request.read_session.table),) 

631 ), 

632 ) 

633 

634 # Send the request. 

635 response = rpc( 

636 request, 

637 retry=retry, 

638 timeout=timeout, 

639 metadata=metadata, 

640 ) 

641 

642 # Done; return the response. 

643 return response 

644 

645 def read_rows( 

646 self, 

647 request: Optional[Union[storage.ReadRowsRequest, dict]] = None, 

648 *, 

649 read_stream: Optional[str] = None, 

650 offset: Optional[int] = None, 

651 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

652 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

653 metadata: Sequence[Tuple[str, str]] = (), 

654 ) -> Iterable[storage.ReadRowsResponse]: 

655 r"""Reads rows from the stream in the format prescribed 

656 by the ReadSession. Each response contains one or more 

657 table rows, up to a maximum of 100 MiB per response; 

658 read requests which attempt to read individual rows 

659 larger than 100 MiB will fail. 

660 

661 Each request also returns a set of stream statistics 

662 reflecting the current state of the stream. 

663 

664 .. code-block:: python 

665 

666 # This snippet has been automatically generated and should be regarded as a 

667 # code template only. 

668 # It will require modifications to work: 

669 # - It may require correct/in-range values for request initialization. 

670 # - It may require specifying regional endpoints when creating the service 

671 # client as shown in: 

672 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

673 from google.cloud import bigquery_storage_v1 

674 

675 def sample_read_rows(): 

676 # Create a client 

677 client = bigquery_storage_v1.BigQueryReadClient() 

678 

679 # Initialize request argument(s) 

680 request = bigquery_storage_v1.ReadRowsRequest( 

681 read_stream="read_stream_value", 

682 ) 

683 

684 # Make the request 

685 stream = client.read_rows(request=request) 

686 

687 # Handle the response 

688 for response in stream: 

689 print(response) 

690 

691 Args: 

692 request (Union[google.cloud.bigquery_storage_v1.types.ReadRowsRequest, dict]): 

693 The request object. Request message for ``ReadRows``. 

694 read_stream (str): 

695 Required. Stream to read rows from. 

696 This corresponds to the ``read_stream`` field 

697 on the ``request`` instance; if ``request`` is provided, this 

698 should not be set. 

699 offset (int): 

700 The offset requested must be less 

701 than the last row read from Read. 

702 Requesting a larger offset is undefined. 

703 If not specified, start reading from 

704 offset zero. 

705 

706 This corresponds to the ``offset`` field 

707 on the ``request`` instance; if ``request`` is provided, this 

708 should not be set. 

709 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

710 should be retried. 

711 timeout (float): The timeout for this request. 

712 metadata (Sequence[Tuple[str, str]]): Strings which should be 

713 sent along with the request as metadata. 

714 

715 Returns: 

716 Iterable[google.cloud.bigquery_storage_v1.types.ReadRowsResponse]: 

717 Response from calling ReadRows may include row data, progress and 

718 throttling information. 

719 

720 """ 

721 # Create or coerce a protobuf request object. 

722 # Quick check: If we got a request object, we should *not* have 

723 # gotten any keyword arguments that map to the request. 

724 has_flattened_params = any([read_stream, offset]) 

725 if request is not None and has_flattened_params: 

726 raise ValueError( 

727 "If the `request` argument is set, then none of " 

728 "the individual field arguments should be set." 

729 ) 

730 

731 # Minor optimization to avoid making a copy if the user passes 

732 # in a storage.ReadRowsRequest. 

733 # There's no risk of modifying the input as we've already verified 

734 # there are no flattened fields. 

735 if not isinstance(request, storage.ReadRowsRequest): 

736 request = storage.ReadRowsRequest(request) 

737 # If we have keyword arguments corresponding to fields on the 

738 # request, apply these. 

739 if read_stream is not None: 

740 request.read_stream = read_stream 

741 if offset is not None: 

742 request.offset = offset 

743 

744 # Wrap the RPC method; this adds retry and timeout information, 

745 # and friendly error handling. 

746 rpc = self._transport._wrapped_methods[self._transport.read_rows] 

747 

748 # Certain fields should be provided within the metadata header; 

749 # add these here. 

750 metadata = tuple(metadata) + ( 

751 gapic_v1.routing_header.to_grpc_metadata( 

752 (("read_stream", request.read_stream),) 

753 ), 

754 ) 

755 

756 # Send the request. 

757 response = rpc( 

758 request, 

759 retry=retry, 

760 timeout=timeout, 

761 metadata=metadata, 

762 ) 

763 

764 # Done; return the response. 

765 return response 

766 

767 def split_read_stream( 

768 self, 

769 request: Optional[Union[storage.SplitReadStreamRequest, dict]] = None, 

770 *, 

771 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

772 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

773 metadata: Sequence[Tuple[str, str]] = (), 

774 ) -> storage.SplitReadStreamResponse: 

775 r"""Splits a given ``ReadStream`` into two ``ReadStream`` objects. 

776 These ``ReadStream`` objects are referred to as the primary and 

777 the residual streams of the split. The original ``ReadStream`` 

778 can still be read from in the same manner as before. Both of the 

779 returned ``ReadStream`` objects can also be read from, and the 

780 rows returned by both child streams will be the same as the rows 

781 read from the original stream. 

782 

783 Moreover, the two child streams will be allocated back-to-back 

784 in the original ``ReadStream``. Concretely, it is guaranteed 

785 that for streams original, primary, and residual, that 

786 original[0-j] = primary[0-j] and original[j-n] = residual[0-m] 

787 once the streams have been read to completion. 

788 

789 .. code-block:: python 

790 

791 # This snippet has been automatically generated and should be regarded as a 

792 # code template only. 

793 # It will require modifications to work: 

794 # - It may require correct/in-range values for request initialization. 

795 # - It may require specifying regional endpoints when creating the service 

796 # client as shown in: 

797 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

798 from google.cloud import bigquery_storage_v1 

799 

800 def sample_split_read_stream(): 

801 # Create a client 

802 client = bigquery_storage_v1.BigQueryReadClient() 

803 

804 # Initialize request argument(s) 

805 request = bigquery_storage_v1.SplitReadStreamRequest( 

806 name="name_value", 

807 ) 

808 

809 # Make the request 

810 response = client.split_read_stream(request=request) 

811 

812 # Handle the response 

813 print(response) 

814 

815 Args: 

816 request (Union[google.cloud.bigquery_storage_v1.types.SplitReadStreamRequest, dict]): 

817 The request object. Request message for ``SplitReadStream``. 

818 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

819 should be retried. 

820 timeout (float): The timeout for this request. 

821 metadata (Sequence[Tuple[str, str]]): Strings which should be 

822 sent along with the request as metadata. 

823 

824 Returns: 

825 google.cloud.bigquery_storage_v1.types.SplitReadStreamResponse: 

826 Response message for SplitReadStream. 

827 """ 

828 # Create or coerce a protobuf request object. 

829 # Minor optimization to avoid making a copy if the user passes 

830 # in a storage.SplitReadStreamRequest. 

831 # There's no risk of modifying the input as we've already verified 

832 # there are no flattened fields. 

833 if not isinstance(request, storage.SplitReadStreamRequest): 

834 request = storage.SplitReadStreamRequest(request) 

835 

836 # Wrap the RPC method; this adds retry and timeout information, 

837 # and friendly error handling. 

838 rpc = self._transport._wrapped_methods[self._transport.split_read_stream] 

839 

840 # Certain fields should be provided within the metadata header; 

841 # add these here. 

842 metadata = tuple(metadata) + ( 

843 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), 

844 ) 

845 

846 # Send the request. 

847 response = rpc( 

848 request, 

849 retry=retry, 

850 timeout=timeout, 

851 metadata=metadata, 

852 ) 

853 

854 # Done; return the response. 

855 return response 

856 

857 def __enter__(self) -> "BigQueryReadClient": 

858 return self 

859 

860 def __exit__(self, type, value, traceback): 

861 """Releases underlying transport's resources. 

862 

863 .. warning:: 

864 ONLY use as a context manager if the transport is NOT shared 

865 with other clients! Exiting the with block will CLOSE the transport 

866 and may cause errors in other clients! 

867 """ 

868 self.transport.close() 

869 

870 

871DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

872 gapic_version=package_version.__version__ 

873) 

874 

875 

876__all__ = ("BigQueryReadClient",)