Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/async_client.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

115 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16from collections import OrderedDict 

17import logging as std_logging 

18import re 

19from typing import ( 

20 AsyncIterable, 

21 Awaitable, 

22 Callable, 

23 Dict, 

24 Mapping, 

25 MutableMapping, 

26 MutableSequence, 

27 Optional, 

28 Sequence, 

29 Tuple, 

30 Type, 

31 Union, 

32) 

33 

34from google.api_core import exceptions as core_exceptions 

35from google.api_core import gapic_v1 

36from google.api_core import retry_async as retries 

37from google.api_core.client_options import ClientOptions 

38from google.auth import credentials as ga_credentials # type: ignore 

39from google.oauth2 import service_account # type: ignore 

40import google.protobuf 

41 

42from google.cloud.bigquery_storage_v1 import gapic_version as package_version 

43 

44try: 

45 OptionalRetry = Union[retries.AsyncRetry, gapic_v1.method._MethodDefault, None] 

46except AttributeError: # pragma: NO COVER 

47 OptionalRetry = Union[retries.AsyncRetry, object, None] # type: ignore 

48 

49from google.protobuf import timestamp_pb2 # type: ignore 

50 

51from google.cloud.bigquery_storage_v1.types import arrow, avro, storage, stream 

52 

53from .client import BigQueryReadClient 

54from .transports.base import DEFAULT_CLIENT_INFO, BigQueryReadTransport 

55from .transports.grpc_asyncio import BigQueryReadGrpcAsyncIOTransport 

56 

57try: 

58 from google.api_core import client_logging # type: ignore 

59 

60 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER 

61except ImportError: # pragma: NO COVER 

62 CLIENT_LOGGING_SUPPORTED = False 

63 

64_LOGGER = std_logging.getLogger(__name__) 

65 

66 

67class BigQueryReadAsyncClient: 

68 """BigQuery Read API. 

69 

70 The Read API can be used to read data from BigQuery. 

71 """ 

72 

73 _client: BigQueryReadClient 

74 

75 # Copy defaults from the synchronous client for use here. 

76 # Note: DEFAULT_ENDPOINT is deprecated. Use _DEFAULT_ENDPOINT_TEMPLATE instead. 

77 DEFAULT_ENDPOINT = BigQueryReadClient.DEFAULT_ENDPOINT 

78 DEFAULT_MTLS_ENDPOINT = BigQueryReadClient.DEFAULT_MTLS_ENDPOINT 

79 _DEFAULT_ENDPOINT_TEMPLATE = BigQueryReadClient._DEFAULT_ENDPOINT_TEMPLATE 

80 _DEFAULT_UNIVERSE = BigQueryReadClient._DEFAULT_UNIVERSE 

81 

82 read_session_path = staticmethod(BigQueryReadClient.read_session_path) 

83 parse_read_session_path = staticmethod(BigQueryReadClient.parse_read_session_path) 

84 read_stream_path = staticmethod(BigQueryReadClient.read_stream_path) 

85 parse_read_stream_path = staticmethod(BigQueryReadClient.parse_read_stream_path) 

86 table_path = staticmethod(BigQueryReadClient.table_path) 

87 parse_table_path = staticmethod(BigQueryReadClient.parse_table_path) 

88 common_billing_account_path = staticmethod( 

89 BigQueryReadClient.common_billing_account_path 

90 ) 

91 parse_common_billing_account_path = staticmethod( 

92 BigQueryReadClient.parse_common_billing_account_path 

93 ) 

94 common_folder_path = staticmethod(BigQueryReadClient.common_folder_path) 

95 parse_common_folder_path = staticmethod(BigQueryReadClient.parse_common_folder_path) 

96 common_organization_path = staticmethod(BigQueryReadClient.common_organization_path) 

97 parse_common_organization_path = staticmethod( 

98 BigQueryReadClient.parse_common_organization_path 

99 ) 

100 common_project_path = staticmethod(BigQueryReadClient.common_project_path) 

101 parse_common_project_path = staticmethod( 

102 BigQueryReadClient.parse_common_project_path 

103 ) 

104 common_location_path = staticmethod(BigQueryReadClient.common_location_path) 

105 parse_common_location_path = staticmethod( 

106 BigQueryReadClient.parse_common_location_path 

107 ) 

108 

109 @classmethod 

110 def from_service_account_info(cls, info: dict, *args, **kwargs): 

111 """Creates an instance of this client using the provided credentials 

112 info. 

113 

114 Args: 

115 info (dict): The service account private key info. 

116 args: Additional arguments to pass to the constructor. 

117 kwargs: Additional arguments to pass to the constructor. 

118 

119 Returns: 

120 BigQueryReadAsyncClient: The constructed client. 

121 """ 

122 return BigQueryReadClient.from_service_account_info.__func__(BigQueryReadAsyncClient, info, *args, **kwargs) # type: ignore 

123 

124 @classmethod 

125 def from_service_account_file(cls, filename: str, *args, **kwargs): 

126 """Creates an instance of this client using the provided credentials 

127 file. 

128 

129 Args: 

130 filename (str): The path to the service account private key json 

131 file. 

132 args: Additional arguments to pass to the constructor. 

133 kwargs: Additional arguments to pass to the constructor. 

134 

135 Returns: 

136 BigQueryReadAsyncClient: The constructed client. 

137 """ 

138 return BigQueryReadClient.from_service_account_file.__func__(BigQueryReadAsyncClient, filename, *args, **kwargs) # type: ignore 

139 

140 from_service_account_json = from_service_account_file 

141 

142 @classmethod 

143 def get_mtls_endpoint_and_cert_source( 

144 cls, client_options: Optional[ClientOptions] = None 

145 ): 

146 """Return the API endpoint and client cert source for mutual TLS. 

147 

148 The client cert source is determined in the following order: 

149 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the 

150 client cert source is None. 

151 (2) if `client_options.client_cert_source` is provided, use the provided one; if the 

152 default client cert source exists, use the default one; otherwise the client cert 

153 source is None. 

154 

155 The API endpoint is determined in the following order: 

156 (1) if `client_options.api_endpoint` if provided, use the provided one. 

157 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the 

158 default mTLS endpoint; if the environment variable is "never", use the default API 

159 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise 

160 use the default API endpoint. 

161 

162 More details can be found at https://google.aip.dev/auth/4114. 

163 

164 Args: 

165 client_options (google.api_core.client_options.ClientOptions): Custom options for the 

166 client. Only the `api_endpoint` and `client_cert_source` properties may be used 

167 in this method. 

168 

169 Returns: 

170 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the 

171 client cert source to use. 

172 

173 Raises: 

174 google.auth.exceptions.MutualTLSChannelError: If any errors happen. 

175 """ 

176 return BigQueryReadClient.get_mtls_endpoint_and_cert_source(client_options) # type: ignore 

177 

178 @property 

179 def transport(self) -> BigQueryReadTransport: 

180 """Returns the transport used by the client instance. 

181 

182 Returns: 

183 BigQueryReadTransport: The transport used by the client instance. 

184 """ 

185 return self._client.transport 

186 

187 @property 

188 def api_endpoint(self): 

189 """Return the API endpoint used by the client instance. 

190 

191 Returns: 

192 str: The API endpoint used by the client instance. 

193 """ 

194 return self._client._api_endpoint 

195 

196 @property 

197 def universe_domain(self) -> str: 

198 """Return the universe domain used by the client instance. 

199 

200 Returns: 

201 str: The universe domain used 

202 by the client instance. 

203 """ 

204 return self._client._universe_domain 

205 

206 get_transport_class = BigQueryReadClient.get_transport_class 

207 

208 def __init__( 

209 self, 

210 *, 

211 credentials: Optional[ga_credentials.Credentials] = None, 

212 transport: Optional[ 

213 Union[str, BigQueryReadTransport, Callable[..., BigQueryReadTransport]] 

214 ] = "grpc_asyncio", 

215 client_options: Optional[ClientOptions] = None, 

216 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

217 ) -> None: 

218 """Instantiates the big query read async client. 

219 

220 Args: 

221 credentials (Optional[google.auth.credentials.Credentials]): The 

222 authorization credentials to attach to requests. These 

223 credentials identify the application to the service; if none 

224 are specified, the client will attempt to ascertain the 

225 credentials from the environment. 

226 transport (Optional[Union[str,BigQueryReadTransport,Callable[..., BigQueryReadTransport]]]): 

227 The transport to use, or a Callable that constructs and returns a new transport to use. 

228 If a Callable is given, it will be called with the same set of initialization 

229 arguments as used in the BigQueryReadTransport constructor. 

230 If set to None, a transport is chosen automatically. 

231 client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]): 

232 Custom options for the client. 

233 

234 1. The ``api_endpoint`` property can be used to override the 

235 default endpoint provided by the client when ``transport`` is 

236 not explicitly provided. Only if this property is not set and 

237 ``transport`` was not explicitly provided, the endpoint is 

238 determined by the GOOGLE_API_USE_MTLS_ENDPOINT environment 

239 variable, which have one of the following values: 

240 "always" (always use the default mTLS endpoint), "never" (always 

241 use the default regular endpoint) and "auto" (auto-switch to the 

242 default mTLS endpoint if client certificate is present; this is 

243 the default value). 

244 

245 2. If the GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable 

246 is "true", then the ``client_cert_source`` property can be used 

247 to provide a client certificate for mTLS transport. If 

248 not provided, the default SSL client certificate will be used if 

249 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not 

250 set, no client certificate will be used. 

251 

252 3. The ``universe_domain`` property can be used to override the 

253 default "googleapis.com" universe. Note that ``api_endpoint`` 

254 property still takes precedence; and ``universe_domain`` is 

255 currently not supported for mTLS. 

256 

257 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

258 The client info used to send a user-agent string along with 

259 API requests. If ``None``, then default info will be used. 

260 Generally, you only need to set this if you're developing 

261 your own client library. 

262 

263 Raises: 

264 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport 

265 creation failed for any reason. 

266 """ 

267 self._client = BigQueryReadClient( 

268 credentials=credentials, 

269 transport=transport, 

270 client_options=client_options, 

271 client_info=client_info, 

272 ) 

273 

274 if CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor( 

275 std_logging.DEBUG 

276 ): # pragma: NO COVER 

277 _LOGGER.debug( 

278 "Created client `google.cloud.bigquery.storage_v1.BigQueryReadAsyncClient`.", 

279 extra={ 

280 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryRead", 

281 "universeDomain": getattr( 

282 self._client._transport._credentials, "universe_domain", "" 

283 ), 

284 "credentialsType": f"{type(self._client._transport._credentials).__module__}.{type(self._client._transport._credentials).__qualname__}", 

285 "credentialsInfo": getattr( 

286 self.transport._credentials, "get_cred_info", lambda: None 

287 )(), 

288 } 

289 if hasattr(self._client._transport, "_credentials") 

290 else { 

291 "serviceName": "google.cloud.bigquery.storage.v1.BigQueryRead", 

292 "credentialsType": None, 

293 }, 

294 ) 

295 

296 async def create_read_session( 

297 self, 

298 request: Optional[Union[storage.CreateReadSessionRequest, dict]] = None, 

299 *, 

300 parent: Optional[str] = None, 

301 read_session: Optional[stream.ReadSession] = None, 

302 max_stream_count: Optional[int] = None, 

303 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

304 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

305 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

306 ) -> stream.ReadSession: 

307 r"""Creates a new read session. A read session divides 

308 the contents of a BigQuery table into one or more 

309 streams, which can then be used to read data from the 

310 table. The read session also specifies properties of the 

311 data to be read, such as a list of columns or a 

312 push-down filter describing the rows to be returned. 

313 

314 A particular row can be read by at most one stream. When 

315 the caller has reached the end of each stream in the 

316 session, then all the data in the table has been read. 

317 

318 Data is assigned to each stream such that roughly the 

319 same number of rows can be read from each stream. 

320 Because the server-side unit for assigning data is 

321 collections of rows, the API does not guarantee that 

322 each stream will return the same number or rows. 

323 Additionally, the limits are enforced based on the 

324 number of pre-filtered rows, so some filters can lead to 

325 lopsided assignments. 

326 

327 Read sessions automatically expire 6 hours after they 

328 are created and do not require manual clean-up by the 

329 caller. 

330 

331 .. code-block:: python 

332 

333 # This snippet has been automatically generated and should be regarded as a 

334 # code template only. 

335 # It will require modifications to work: 

336 # - It may require correct/in-range values for request initialization. 

337 # - It may require specifying regional endpoints when creating the service 

338 # client as shown in: 

339 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

340 from google.cloud import bigquery_storage_v1 

341 

342 async def sample_create_read_session(): 

343 # Create a client 

344 client = bigquery_storage_v1.BigQueryReadAsyncClient() 

345 

346 # Initialize request argument(s) 

347 request = bigquery_storage_v1.CreateReadSessionRequest( 

348 parent="parent_value", 

349 ) 

350 

351 # Make the request 

352 response = await client.create_read_session(request=request) 

353 

354 # Handle the response 

355 print(response) 

356 

357 Args: 

358 request (Optional[Union[google.cloud.bigquery_storage_v1.types.CreateReadSessionRequest, dict]]): 

359 The request object. Request message for ``CreateReadSession``. 

360 parent (:class:`str`): 

361 Required. The request project that owns the session, in 

362 the form of ``projects/{project_id}``. 

363 

364 This corresponds to the ``parent`` field 

365 on the ``request`` instance; if ``request`` is provided, this 

366 should not be set. 

367 read_session (:class:`google.cloud.bigquery_storage_v1.types.ReadSession`): 

368 Required. Session to be created. 

369 This corresponds to the ``read_session`` field 

370 on the ``request`` instance; if ``request`` is provided, this 

371 should not be set. 

372 max_stream_count (:class:`int`): 

373 Max initial number of streams. If unset or zero, the 

374 server will provide a value of streams so as to produce 

375 reasonable throughput. Must be non-negative. The number 

376 of streams may be lower than the requested number, 

377 depending on the amount parallelism that is reasonable 

378 for the table. There is a default system max limit of 

379 1,000. 

380 

381 This must be greater than or equal to 

382 preferred_min_stream_count. Typically, clients should 

383 either leave this unset to let the system to determine 

384 an upper bound OR set this a size for the maximum "units 

385 of work" it can gracefully handle. 

386 

387 This corresponds to the ``max_stream_count`` field 

388 on the ``request`` instance; if ``request`` is provided, this 

389 should not be set. 

390 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any, 

391 should be retried. 

392 timeout (float): The timeout for this request. 

393 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

394 sent along with the request as metadata. Normally, each value must be of type `str`, 

395 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

396 be of type `bytes`. 

397 

398 Returns: 

399 google.cloud.bigquery_storage_v1.types.ReadSession: 

400 Information about the ReadSession. 

401 """ 

402 # Create or coerce a protobuf request object. 

403 # - Quick check: If we got a request object, we should *not* have 

404 # gotten any keyword arguments that map to the request. 

405 flattened_params = [parent, read_session, max_stream_count] 

406 has_flattened_params = ( 

407 len([param for param in flattened_params if param is not None]) > 0 

408 ) 

409 if request is not None and has_flattened_params: 

410 raise ValueError( 

411 "If the `request` argument is set, then none of " 

412 "the individual field arguments should be set." 

413 ) 

414 

415 # - Use the request object if provided (there's no risk of modifying the input as 

416 # there are no flattened fields), or create one. 

417 if not isinstance(request, storage.CreateReadSessionRequest): 

418 request = storage.CreateReadSessionRequest(request) 

419 

420 # If we have keyword arguments corresponding to fields on the 

421 # request, apply these. 

422 if parent is not None: 

423 request.parent = parent 

424 if read_session is not None: 

425 request.read_session = read_session 

426 if max_stream_count is not None: 

427 request.max_stream_count = max_stream_count 

428 

429 # Wrap the RPC method; this adds retry and timeout information, 

430 # and friendly error handling. 

431 rpc = self._client._transport._wrapped_methods[ 

432 self._client._transport.create_read_session 

433 ] 

434 

435 # Certain fields should be provided within the metadata header; 

436 # add these here. 

437 metadata = tuple(metadata) + ( 

438 gapic_v1.routing_header.to_grpc_metadata( 

439 (("read_session.table", request.read_session.table),) 

440 ), 

441 ) 

442 

443 # Validate the universe domain. 

444 self._client._validate_universe_domain() 

445 

446 # Send the request. 

447 response = await rpc( 

448 request, 

449 retry=retry, 

450 timeout=timeout, 

451 metadata=metadata, 

452 ) 

453 

454 # Done; return the response. 

455 return response 

456 

457 def read_rows( 

458 self, 

459 request: Optional[Union[storage.ReadRowsRequest, dict]] = None, 

460 *, 

461 read_stream: Optional[str] = None, 

462 offset: Optional[int] = None, 

463 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

464 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

465 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

466 ) -> Awaitable[AsyncIterable[storage.ReadRowsResponse]]: 

467 r"""Reads rows from the stream in the format prescribed 

468 by the ReadSession. Each response contains one or more 

469 table rows, up to a maximum of 100 MiB per response; 

470 read requests which attempt to read individual rows 

471 larger than 100 MiB will fail. 

472 

473 Each request also returns a set of stream statistics 

474 reflecting the current state of the stream. 

475 

476 .. code-block:: python 

477 

478 # This snippet has been automatically generated and should be regarded as a 

479 # code template only. 

480 # It will require modifications to work: 

481 # - It may require correct/in-range values for request initialization. 

482 # - It may require specifying regional endpoints when creating the service 

483 # client as shown in: 

484 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

485 from google.cloud import bigquery_storage_v1 

486 

487 async def sample_read_rows(): 

488 # Create a client 

489 client = bigquery_storage_v1.BigQueryReadAsyncClient() 

490 

491 # Initialize request argument(s) 

492 request = bigquery_storage_v1.ReadRowsRequest( 

493 read_stream="read_stream_value", 

494 ) 

495 

496 # Make the request 

497 stream = await client.read_rows(request=request) 

498 

499 # Handle the response 

500 async for response in stream: 

501 print(response) 

502 

503 Args: 

504 request (Optional[Union[google.cloud.bigquery_storage_v1.types.ReadRowsRequest, dict]]): 

505 The request object. Request message for ``ReadRows``. 

506 read_stream (:class:`str`): 

507 Required. Stream to read rows from. 

508 This corresponds to the ``read_stream`` field 

509 on the ``request`` instance; if ``request`` is provided, this 

510 should not be set. 

511 offset (:class:`int`): 

512 The offset requested must be less 

513 than the last row read from Read. 

514 Requesting a larger offset is undefined. 

515 If not specified, start reading from 

516 offset zero. 

517 

518 This corresponds to the ``offset`` field 

519 on the ``request`` instance; if ``request`` is provided, this 

520 should not be set. 

521 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any, 

522 should be retried. 

523 timeout (float): The timeout for this request. 

524 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

525 sent along with the request as metadata. Normally, each value must be of type `str`, 

526 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

527 be of type `bytes`. 

528 

529 Returns: 

530 AsyncIterable[google.cloud.bigquery_storage_v1.types.ReadRowsResponse]: 

531 Response from calling ReadRows may include row data, progress and 

532 throttling information. 

533 

534 """ 

535 # Create or coerce a protobuf request object. 

536 # - Quick check: If we got a request object, we should *not* have 

537 # gotten any keyword arguments that map to the request. 

538 flattened_params = [read_stream, offset] 

539 has_flattened_params = ( 

540 len([param for param in flattened_params if param is not None]) > 0 

541 ) 

542 if request is not None and has_flattened_params: 

543 raise ValueError( 

544 "If the `request` argument is set, then none of " 

545 "the individual field arguments should be set." 

546 ) 

547 

548 # - Use the request object if provided (there's no risk of modifying the input as 

549 # there are no flattened fields), or create one. 

550 if not isinstance(request, storage.ReadRowsRequest): 

551 request = storage.ReadRowsRequest(request) 

552 

553 # If we have keyword arguments corresponding to fields on the 

554 # request, apply these. 

555 if read_stream is not None: 

556 request.read_stream = read_stream 

557 if offset is not None: 

558 request.offset = offset 

559 

560 # Wrap the RPC method; this adds retry and timeout information, 

561 # and friendly error handling. 

562 rpc = self._client._transport._wrapped_methods[ 

563 self._client._transport.read_rows 

564 ] 

565 

566 # Certain fields should be provided within the metadata header; 

567 # add these here. 

568 metadata = tuple(metadata) + ( 

569 gapic_v1.routing_header.to_grpc_metadata( 

570 (("read_stream", request.read_stream),) 

571 ), 

572 ) 

573 

574 # Validate the universe domain. 

575 self._client._validate_universe_domain() 

576 

577 # Send the request. 

578 response = rpc( 

579 request, 

580 retry=retry, 

581 timeout=timeout, 

582 metadata=metadata, 

583 ) 

584 

585 # Done; return the response. 

586 return response 

587 

588 async def split_read_stream( 

589 self, 

590 request: Optional[Union[storage.SplitReadStreamRequest, dict]] = None, 

591 *, 

592 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

593 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

594 metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), 

595 ) -> storage.SplitReadStreamResponse: 

596 r"""Splits a given ``ReadStream`` into two ``ReadStream`` objects. 

597 These ``ReadStream`` objects are referred to as the primary and 

598 the residual streams of the split. The original ``ReadStream`` 

599 can still be read from in the same manner as before. Both of the 

600 returned ``ReadStream`` objects can also be read from, and the 

601 rows returned by both child streams will be the same as the rows 

602 read from the original stream. 

603 

604 Moreover, the two child streams will be allocated back-to-back 

605 in the original ``ReadStream``. Concretely, it is guaranteed 

606 that for streams original, primary, and residual, that 

607 original[0-j] = primary[0-j] and original[j-n] = residual[0-m] 

608 once the streams have been read to completion. 

609 

610 .. code-block:: python 

611 

612 # This snippet has been automatically generated and should be regarded as a 

613 # code template only. 

614 # It will require modifications to work: 

615 # - It may require correct/in-range values for request initialization. 

616 # - It may require specifying regional endpoints when creating the service 

617 # client as shown in: 

618 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

619 from google.cloud import bigquery_storage_v1 

620 

621 async def sample_split_read_stream(): 

622 # Create a client 

623 client = bigquery_storage_v1.BigQueryReadAsyncClient() 

624 

625 # Initialize request argument(s) 

626 request = bigquery_storage_v1.SplitReadStreamRequest( 

627 name="name_value", 

628 ) 

629 

630 # Make the request 

631 response = await client.split_read_stream(request=request) 

632 

633 # Handle the response 

634 print(response) 

635 

636 Args: 

637 request (Optional[Union[google.cloud.bigquery_storage_v1.types.SplitReadStreamRequest, dict]]): 

638 The request object. Request message for ``SplitReadStream``. 

639 retry (google.api_core.retry_async.AsyncRetry): Designation of what errors, if any, 

640 should be retried. 

641 timeout (float): The timeout for this request. 

642 metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be 

643 sent along with the request as metadata. Normally, each value must be of type `str`, 

644 but for metadata keys ending with the suffix `-bin`, the corresponding values must 

645 be of type `bytes`. 

646 

647 Returns: 

648 google.cloud.bigquery_storage_v1.types.SplitReadStreamResponse: 

649 Response message for SplitReadStream. 

650 """ 

651 # Create or coerce a protobuf request object. 

652 # - Use the request object if provided (there's no risk of modifying the input as 

653 # there are no flattened fields), or create one. 

654 if not isinstance(request, storage.SplitReadStreamRequest): 

655 request = storage.SplitReadStreamRequest(request) 

656 

657 # Wrap the RPC method; this adds retry and timeout information, 

658 # and friendly error handling. 

659 rpc = self._client._transport._wrapped_methods[ 

660 self._client._transport.split_read_stream 

661 ] 

662 

663 # Certain fields should be provided within the metadata header; 

664 # add these here. 

665 metadata = tuple(metadata) + ( 

666 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), 

667 ) 

668 

669 # Validate the universe domain. 

670 self._client._validate_universe_domain() 

671 

672 # Send the request. 

673 response = await rpc( 

674 request, 

675 retry=retry, 

676 timeout=timeout, 

677 metadata=metadata, 

678 ) 

679 

680 # Done; return the response. 

681 return response 

682 

683 async def __aenter__(self) -> "BigQueryReadAsyncClient": 

684 return self 

685 

686 async def __aexit__(self, exc_type, exc, tb): 

687 await self.transport.close() 

688 

689 

690DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

691 gapic_version=package_version.__version__ 

692) 

693 

694if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER 

695 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__ 

696 

697 

698__all__ = ("BigQueryReadAsyncClient",)