Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/async_client.py: 61%

97 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:10 +0000

1# -*- coding: utf-8 -*- 

2# Copyright 2022 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16from collections import OrderedDict 

17import functools 

18import re 

19from typing import ( 

20 Dict, 

21 Mapping, 

22 MutableMapping, 

23 MutableSequence, 

24 Optional, 

25 AsyncIterable, 

26 Awaitable, 

27 Sequence, 

28 Tuple, 

29 Type, 

30 Union, 

31) 

32 

33from google.cloud.bigquery_storage_v1 import gapic_version as package_version 

34 

35from google.api_core.client_options import ClientOptions 

36from google.api_core import exceptions as core_exceptions 

37from google.api_core import gapic_v1 

38from google.api_core import retry as retries 

39from google.auth import credentials as ga_credentials # type: ignore 

40from google.oauth2 import service_account # type: ignore 

41 

42try: 

43 OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault] 

44except AttributeError: # pragma: NO COVER 

45 OptionalRetry = Union[retries.Retry, object] # type: ignore 

46 

47from google.cloud.bigquery_storage_v1.types import arrow 

48from google.cloud.bigquery_storage_v1.types import avro 

49from google.cloud.bigquery_storage_v1.types import storage 

50from google.cloud.bigquery_storage_v1.types import stream 

51from google.protobuf import timestamp_pb2 # type: ignore 

52from .transports.base import BigQueryReadTransport, DEFAULT_CLIENT_INFO 

53from .transports.grpc_asyncio import BigQueryReadGrpcAsyncIOTransport 

54from .client import BigQueryReadClient 

55 

56 

57class BigQueryReadAsyncClient: 

58 """BigQuery Read API. 

59 The Read API can be used to read data from BigQuery. 

60 """ 

61 

62 _client: BigQueryReadClient 

63 

64 DEFAULT_ENDPOINT = BigQueryReadClient.DEFAULT_ENDPOINT 

65 DEFAULT_MTLS_ENDPOINT = BigQueryReadClient.DEFAULT_MTLS_ENDPOINT 

66 

67 read_session_path = staticmethod(BigQueryReadClient.read_session_path) 

68 parse_read_session_path = staticmethod(BigQueryReadClient.parse_read_session_path) 

69 read_stream_path = staticmethod(BigQueryReadClient.read_stream_path) 

70 parse_read_stream_path = staticmethod(BigQueryReadClient.parse_read_stream_path) 

71 table_path = staticmethod(BigQueryReadClient.table_path) 

72 parse_table_path = staticmethod(BigQueryReadClient.parse_table_path) 

73 common_billing_account_path = staticmethod( 

74 BigQueryReadClient.common_billing_account_path 

75 ) 

76 parse_common_billing_account_path = staticmethod( 

77 BigQueryReadClient.parse_common_billing_account_path 

78 ) 

79 common_folder_path = staticmethod(BigQueryReadClient.common_folder_path) 

80 parse_common_folder_path = staticmethod(BigQueryReadClient.parse_common_folder_path) 

81 common_organization_path = staticmethod(BigQueryReadClient.common_organization_path) 

82 parse_common_organization_path = staticmethod( 

83 BigQueryReadClient.parse_common_organization_path 

84 ) 

85 common_project_path = staticmethod(BigQueryReadClient.common_project_path) 

86 parse_common_project_path = staticmethod( 

87 BigQueryReadClient.parse_common_project_path 

88 ) 

89 common_location_path = staticmethod(BigQueryReadClient.common_location_path) 

90 parse_common_location_path = staticmethod( 

91 BigQueryReadClient.parse_common_location_path 

92 ) 

93 

94 @classmethod 

95 def from_service_account_info(cls, info: dict, *args, **kwargs): 

96 """Creates an instance of this client using the provided credentials 

97 info. 

98 

99 Args: 

100 info (dict): The service account private key info. 

101 args: Additional arguments to pass to the constructor. 

102 kwargs: Additional arguments to pass to the constructor. 

103 

104 Returns: 

105 BigQueryReadAsyncClient: The constructed client. 

106 """ 

107 return BigQueryReadClient.from_service_account_info.__func__(BigQueryReadAsyncClient, info, *args, **kwargs) # type: ignore 

108 

109 @classmethod 

110 def from_service_account_file(cls, filename: str, *args, **kwargs): 

111 """Creates an instance of this client using the provided credentials 

112 file. 

113 

114 Args: 

115 filename (str): The path to the service account private key json 

116 file. 

117 args: Additional arguments to pass to the constructor. 

118 kwargs: Additional arguments to pass to the constructor. 

119 

120 Returns: 

121 BigQueryReadAsyncClient: The constructed client. 

122 """ 

123 return BigQueryReadClient.from_service_account_file.__func__(BigQueryReadAsyncClient, filename, *args, **kwargs) # type: ignore 

124 

125 from_service_account_json = from_service_account_file 

126 

127 @classmethod 

128 def get_mtls_endpoint_and_cert_source( 

129 cls, client_options: Optional[ClientOptions] = None 

130 ): 

131 """Return the API endpoint and client cert source for mutual TLS. 

132 

133 The client cert source is determined in the following order: 

134 (1) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not "true", the 

135 client cert source is None. 

136 (2) if `client_options.client_cert_source` is provided, use the provided one; if the 

137 default client cert source exists, use the default one; otherwise the client cert 

138 source is None. 

139 

140 The API endpoint is determined in the following order: 

141 (1) if `client_options.api_endpoint` if provided, use the provided one. 

142 (2) if `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is "always", use the 

143 default mTLS endpoint; if the environment variable is "never", use the default API 

144 endpoint; otherwise if client cert source exists, use the default mTLS endpoint, otherwise 

145 use the default API endpoint. 

146 

147 More details can be found at https://google.aip.dev/auth/4114. 

148 

149 Args: 

150 client_options (google.api_core.client_options.ClientOptions): Custom options for the 

151 client. Only the `api_endpoint` and `client_cert_source` properties may be used 

152 in this method. 

153 

154 Returns: 

155 Tuple[str, Callable[[], Tuple[bytes, bytes]]]: returns the API endpoint and the 

156 client cert source to use. 

157 

158 Raises: 

159 google.auth.exceptions.MutualTLSChannelError: If any errors happen. 

160 """ 

161 return BigQueryReadClient.get_mtls_endpoint_and_cert_source(client_options) # type: ignore 

162 

163 @property 

164 def transport(self) -> BigQueryReadTransport: 

165 """Returns the transport used by the client instance. 

166 

167 Returns: 

168 BigQueryReadTransport: The transport used by the client instance. 

169 """ 

170 return self._client.transport 

171 

172 get_transport_class = functools.partial( 

173 type(BigQueryReadClient).get_transport_class, type(BigQueryReadClient) 

174 ) 

175 

176 def __init__( 

177 self, 

178 *, 

179 credentials: Optional[ga_credentials.Credentials] = None, 

180 transport: Union[str, BigQueryReadTransport] = "grpc_asyncio", 

181 client_options: Optional[ClientOptions] = None, 

182 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

183 ) -> None: 

184 """Instantiates the big query read client. 

185 

186 Args: 

187 credentials (Optional[google.auth.credentials.Credentials]): The 

188 authorization credentials to attach to requests. These 

189 credentials identify the application to the service; if none 

190 are specified, the client will attempt to ascertain the 

191 credentials from the environment. 

192 transport (Union[str, ~.BigQueryReadTransport]): The 

193 transport to use. If set to None, a transport is chosen 

194 automatically. 

195 client_options (ClientOptions): Custom options for the client. It 

196 won't take effect if a ``transport`` instance is provided. 

197 (1) The ``api_endpoint`` property can be used to override the 

198 default endpoint provided by the client. GOOGLE_API_USE_MTLS_ENDPOINT 

199 environment variable can also be used to override the endpoint: 

200 "always" (always use the default mTLS endpoint), "never" (always 

201 use the default regular endpoint) and "auto" (auto switch to the 

202 default mTLS endpoint if client certificate is present, this is 

203 the default value). However, the ``api_endpoint`` property takes 

204 precedence if provided. 

205 (2) If GOOGLE_API_USE_CLIENT_CERTIFICATE environment variable 

206 is "true", then the ``client_cert_source`` property can be used 

207 to provide client certificate for mutual TLS transport. If 

208 not provided, the default SSL client certificate will be used if 

209 present. If GOOGLE_API_USE_CLIENT_CERTIFICATE is "false" or not 

210 set, no client certificate will be used. 

211 

212 Raises: 

213 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport 

214 creation failed for any reason. 

215 """ 

216 self._client = BigQueryReadClient( 

217 credentials=credentials, 

218 transport=transport, 

219 client_options=client_options, 

220 client_info=client_info, 

221 ) 

222 

223 async def create_read_session( 

224 self, 

225 request: Optional[Union[storage.CreateReadSessionRequest, dict]] = None, 

226 *, 

227 parent: Optional[str] = None, 

228 read_session: Optional[stream.ReadSession] = None, 

229 max_stream_count: Optional[int] = None, 

230 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

231 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

232 metadata: Sequence[Tuple[str, str]] = (), 

233 ) -> stream.ReadSession: 

234 r"""Creates a new read session. A read session divides 

235 the contents of a BigQuery table into one or more 

236 streams, which can then be used to read data from the 

237 table. The read session also specifies properties of the 

238 data to be read, such as a list of columns or a 

239 push-down filter describing the rows to be returned. 

240 

241 A particular row can be read by at most one stream. When 

242 the caller has reached the end of each stream in the 

243 session, then all the data in the table has been read. 

244 

245 Data is assigned to each stream such that roughly the 

246 same number of rows can be read from each stream. 

247 Because the server-side unit for assigning data is 

248 collections of rows, the API does not guarantee that 

249 each stream will return the same number or rows. 

250 Additionally, the limits are enforced based on the 

251 number of pre-filtered rows, so some filters can lead to 

252 lopsided assignments. 

253 

254 Read sessions automatically expire 6 hours after they 

255 are created and do not require manual clean-up by the 

256 caller. 

257 

258 .. code-block:: python 

259 

260 # This snippet has been automatically generated and should be regarded as a 

261 # code template only. 

262 # It will require modifications to work: 

263 # - It may require correct/in-range values for request initialization. 

264 # - It may require specifying regional endpoints when creating the service 

265 # client as shown in: 

266 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

267 from google.cloud import bigquery_storage_v1 

268 

269 async def sample_create_read_session(): 

270 # Create a client 

271 client = bigquery_storage_v1.BigQueryReadAsyncClient() 

272 

273 # Initialize request argument(s) 

274 request = bigquery_storage_v1.CreateReadSessionRequest( 

275 parent="parent_value", 

276 ) 

277 

278 # Make the request 

279 response = await client.create_read_session(request=request) 

280 

281 # Handle the response 

282 print(response) 

283 

284 Args: 

285 request (Optional[Union[google.cloud.bigquery_storage_v1.types.CreateReadSessionRequest, dict]]): 

286 The request object. Request message for ``CreateReadSession``. 

287 parent (:class:`str`): 

288 Required. The request project that owns the session, in 

289 the form of ``projects/{project_id}``. 

290 

291 This corresponds to the ``parent`` field 

292 on the ``request`` instance; if ``request`` is provided, this 

293 should not be set. 

294 read_session (:class:`google.cloud.bigquery_storage_v1.types.ReadSession`): 

295 Required. Session to be created. 

296 This corresponds to the ``read_session`` field 

297 on the ``request`` instance; if ``request`` is provided, this 

298 should not be set. 

299 max_stream_count (:class:`int`): 

300 Max initial number of streams. If unset or zero, the 

301 server will provide a value of streams so as to produce 

302 reasonable throughput. Must be non-negative. The number 

303 of streams may be lower than the requested number, 

304 depending on the amount parallelism that is reasonable 

305 for the table. There is a default system max limit of 

306 1,000. 

307 

308 This must be greater than or equal to 

309 preferred_min_stream_count. Typically, clients should 

310 either leave this unset to let the system to determine 

311 an upper bound OR set this a size for the maximum "units 

312 of work" it can gracefully handle. 

313 

314 This corresponds to the ``max_stream_count`` field 

315 on the ``request`` instance; if ``request`` is provided, this 

316 should not be set. 

317 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

318 should be retried. 

319 timeout (float): The timeout for this request. 

320 metadata (Sequence[Tuple[str, str]]): Strings which should be 

321 sent along with the request as metadata. 

322 

323 Returns: 

324 google.cloud.bigquery_storage_v1.types.ReadSession: 

325 Information about the ReadSession. 

326 """ 

327 # Create or coerce a protobuf request object. 

328 # Quick check: If we got a request object, we should *not* have 

329 # gotten any keyword arguments that map to the request. 

330 has_flattened_params = any([parent, read_session, max_stream_count]) 

331 if request is not None and has_flattened_params: 

332 raise ValueError( 

333 "If the `request` argument is set, then none of " 

334 "the individual field arguments should be set." 

335 ) 

336 

337 request = storage.CreateReadSessionRequest(request) 

338 

339 # If we have keyword arguments corresponding to fields on the 

340 # request, apply these. 

341 if parent is not None: 

342 request.parent = parent 

343 if read_session is not None: 

344 request.read_session = read_session 

345 if max_stream_count is not None: 

346 request.max_stream_count = max_stream_count 

347 

348 # Wrap the RPC method; this adds retry and timeout information, 

349 # and friendly error handling. 

350 rpc = gapic_v1.method_async.wrap_method( 

351 self._client._transport.create_read_session, 

352 default_retry=retries.Retry( 

353 initial=0.1, 

354 maximum=60.0, 

355 multiplier=1.3, 

356 predicate=retries.if_exception_type( 

357 core_exceptions.DeadlineExceeded, 

358 core_exceptions.ServiceUnavailable, 

359 ), 

360 deadline=600.0, 

361 ), 

362 default_timeout=600.0, 

363 client_info=DEFAULT_CLIENT_INFO, 

364 ) 

365 

366 # Certain fields should be provided within the metadata header; 

367 # add these here. 

368 metadata = tuple(metadata) + ( 

369 gapic_v1.routing_header.to_grpc_metadata( 

370 (("read_session.table", request.read_session.table),) 

371 ), 

372 ) 

373 

374 # Send the request. 

375 response = await rpc( 

376 request, 

377 retry=retry, 

378 timeout=timeout, 

379 metadata=metadata, 

380 ) 

381 

382 # Done; return the response. 

383 return response 

384 

385 def read_rows( 

386 self, 

387 request: Optional[Union[storage.ReadRowsRequest, dict]] = None, 

388 *, 

389 read_stream: Optional[str] = None, 

390 offset: Optional[int] = None, 

391 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

392 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

393 metadata: Sequence[Tuple[str, str]] = (), 

394 ) -> Awaitable[AsyncIterable[storage.ReadRowsResponse]]: 

395 r"""Reads rows from the stream in the format prescribed 

396 by the ReadSession. Each response contains one or more 

397 table rows, up to a maximum of 100 MiB per response; 

398 read requests which attempt to read individual rows 

399 larger than 100 MiB will fail. 

400 

401 Each request also returns a set of stream statistics 

402 reflecting the current state of the stream. 

403 

404 .. code-block:: python 

405 

406 # This snippet has been automatically generated and should be regarded as a 

407 # code template only. 

408 # It will require modifications to work: 

409 # - It may require correct/in-range values for request initialization. 

410 # - It may require specifying regional endpoints when creating the service 

411 # client as shown in: 

412 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

413 from google.cloud import bigquery_storage_v1 

414 

415 async def sample_read_rows(): 

416 # Create a client 

417 client = bigquery_storage_v1.BigQueryReadAsyncClient() 

418 

419 # Initialize request argument(s) 

420 request = bigquery_storage_v1.ReadRowsRequest( 

421 read_stream="read_stream_value", 

422 ) 

423 

424 # Make the request 

425 stream = await client.read_rows(request=request) 

426 

427 # Handle the response 

428 async for response in stream: 

429 print(response) 

430 

431 Args: 

432 request (Optional[Union[google.cloud.bigquery_storage_v1.types.ReadRowsRequest, dict]]): 

433 The request object. Request message for ``ReadRows``. 

434 read_stream (:class:`str`): 

435 Required. Stream to read rows from. 

436 This corresponds to the ``read_stream`` field 

437 on the ``request`` instance; if ``request`` is provided, this 

438 should not be set. 

439 offset (:class:`int`): 

440 The offset requested must be less 

441 than the last row read from Read. 

442 Requesting a larger offset is undefined. 

443 If not specified, start reading from 

444 offset zero. 

445 

446 This corresponds to the ``offset`` field 

447 on the ``request`` instance; if ``request`` is provided, this 

448 should not be set. 

449 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

450 should be retried. 

451 timeout (float): The timeout for this request. 

452 metadata (Sequence[Tuple[str, str]]): Strings which should be 

453 sent along with the request as metadata. 

454 

455 Returns: 

456 AsyncIterable[google.cloud.bigquery_storage_v1.types.ReadRowsResponse]: 

457 Response from calling ReadRows may include row data, progress and 

458 throttling information. 

459 

460 """ 

461 # Create or coerce a protobuf request object. 

462 # Quick check: If we got a request object, we should *not* have 

463 # gotten any keyword arguments that map to the request. 

464 has_flattened_params = any([read_stream, offset]) 

465 if request is not None and has_flattened_params: 

466 raise ValueError( 

467 "If the `request` argument is set, then none of " 

468 "the individual field arguments should be set." 

469 ) 

470 

471 request = storage.ReadRowsRequest(request) 

472 

473 # If we have keyword arguments corresponding to fields on the 

474 # request, apply these. 

475 if read_stream is not None: 

476 request.read_stream = read_stream 

477 if offset is not None: 

478 request.offset = offset 

479 

480 # Wrap the RPC method; this adds retry and timeout information, 

481 # and friendly error handling. 

482 rpc = gapic_v1.method_async.wrap_method( 

483 self._client._transport.read_rows, 

484 default_retry=retries.Retry( 

485 initial=0.1, 

486 maximum=60.0, 

487 multiplier=1.3, 

488 predicate=retries.if_exception_type( 

489 core_exceptions.ServiceUnavailable, 

490 ), 

491 deadline=86400.0, 

492 ), 

493 default_timeout=86400.0, 

494 client_info=DEFAULT_CLIENT_INFO, 

495 ) 

496 

497 # Certain fields should be provided within the metadata header; 

498 # add these here. 

499 metadata = tuple(metadata) + ( 

500 gapic_v1.routing_header.to_grpc_metadata( 

501 (("read_stream", request.read_stream),) 

502 ), 

503 ) 

504 

505 # Send the request. 

506 response = rpc( 

507 request, 

508 retry=retry, 

509 timeout=timeout, 

510 metadata=metadata, 

511 ) 

512 

513 # Done; return the response. 

514 return response 

515 

516 async def split_read_stream( 

517 self, 

518 request: Optional[Union[storage.SplitReadStreamRequest, dict]] = None, 

519 *, 

520 retry: OptionalRetry = gapic_v1.method.DEFAULT, 

521 timeout: Union[float, object] = gapic_v1.method.DEFAULT, 

522 metadata: Sequence[Tuple[str, str]] = (), 

523 ) -> storage.SplitReadStreamResponse: 

524 r"""Splits a given ``ReadStream`` into two ``ReadStream`` objects. 

525 These ``ReadStream`` objects are referred to as the primary and 

526 the residual streams of the split. The original ``ReadStream`` 

527 can still be read from in the same manner as before. Both of the 

528 returned ``ReadStream`` objects can also be read from, and the 

529 rows returned by both child streams will be the same as the rows 

530 read from the original stream. 

531 

532 Moreover, the two child streams will be allocated back-to-back 

533 in the original ``ReadStream``. Concretely, it is guaranteed 

534 that for streams original, primary, and residual, that 

535 original[0-j] = primary[0-j] and original[j-n] = residual[0-m] 

536 once the streams have been read to completion. 

537 

538 .. code-block:: python 

539 

540 # This snippet has been automatically generated and should be regarded as a 

541 # code template only. 

542 # It will require modifications to work: 

543 # - It may require correct/in-range values for request initialization. 

544 # - It may require specifying regional endpoints when creating the service 

545 # client as shown in: 

546 # https://googleapis.dev/python/google-api-core/latest/client_options.html 

547 from google.cloud import bigquery_storage_v1 

548 

549 async def sample_split_read_stream(): 

550 # Create a client 

551 client = bigquery_storage_v1.BigQueryReadAsyncClient() 

552 

553 # Initialize request argument(s) 

554 request = bigquery_storage_v1.SplitReadStreamRequest( 

555 name="name_value", 

556 ) 

557 

558 # Make the request 

559 response = await client.split_read_stream(request=request) 

560 

561 # Handle the response 

562 print(response) 

563 

564 Args: 

565 request (Optional[Union[google.cloud.bigquery_storage_v1.types.SplitReadStreamRequest, dict]]): 

566 The request object. Request message for ``SplitReadStream``. 

567 retry (google.api_core.retry.Retry): Designation of what errors, if any, 

568 should be retried. 

569 timeout (float): The timeout for this request. 

570 metadata (Sequence[Tuple[str, str]]): Strings which should be 

571 sent along with the request as metadata. 

572 

573 Returns: 

574 google.cloud.bigquery_storage_v1.types.SplitReadStreamResponse: 

575 Response message for SplitReadStream. 

576 """ 

577 # Create or coerce a protobuf request object. 

578 request = storage.SplitReadStreamRequest(request) 

579 

580 # Wrap the RPC method; this adds retry and timeout information, 

581 # and friendly error handling. 

582 rpc = gapic_v1.method_async.wrap_method( 

583 self._client._transport.split_read_stream, 

584 default_retry=retries.Retry( 

585 initial=0.1, 

586 maximum=60.0, 

587 multiplier=1.3, 

588 predicate=retries.if_exception_type( 

589 core_exceptions.DeadlineExceeded, 

590 core_exceptions.ServiceUnavailable, 

591 ), 

592 deadline=600.0, 

593 ), 

594 default_timeout=600.0, 

595 client_info=DEFAULT_CLIENT_INFO, 

596 ) 

597 

598 # Certain fields should be provided within the metadata header; 

599 # add these here. 

600 metadata = tuple(metadata) + ( 

601 gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), 

602 ) 

603 

604 # Send the request. 

605 response = await rpc( 

606 request, 

607 retry=retry, 

608 timeout=timeout, 

609 metadata=metadata, 

610 ) 

611 

612 # Done; return the response. 

613 return response 

614 

615 async def __aenter__(self): 

616 return self 

617 

618 async def __aexit__(self, exc_type, exc, tb): 

619 await self.transport.close() 

620 

621 

622DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

623 gapic_version=package_version.__version__ 

624) 

625 

626 

627__all__ = ("BigQueryReadAsyncClient",)