Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_write/transports/grpc_asyncio.py: 43%

79 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:10 +0000

1# -*- coding: utf-8 -*- 

2# Copyright 2022 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import warnings 

17from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union 

18 

19from google.api_core import gapic_v1 

20from google.api_core import grpc_helpers_async 

21from google.auth import credentials as ga_credentials # type: ignore 

22from google.auth.transport.grpc import SslCredentials # type: ignore 

23 

24import grpc # type: ignore 

25from grpc.experimental import aio # type: ignore 

26 

27from google.cloud.bigquery_storage_v1.types import storage 

28from google.cloud.bigquery_storage_v1.types import stream 

29from .base import BigQueryWriteTransport, DEFAULT_CLIENT_INFO 

30from .grpc import BigQueryWriteGrpcTransport 

31 

32 

33class BigQueryWriteGrpcAsyncIOTransport(BigQueryWriteTransport): 

34 """gRPC AsyncIO backend transport for BigQueryWrite. 

35 

36 BigQuery Write API. 

37 The Write API can be used to write data to BigQuery. 

38 For supplementary information about the Write API, see: 

39 https://cloud.google.com/bigquery/docs/write-api 

40 

41 This class defines the same methods as the primary client, so the 

42 primary client can load the underlying transport implementation 

43 and call it. 

44 

45 It sends protocol buffers over the wire using gRPC (which is built on 

46 top of HTTP/2); the ``grpcio`` package must be installed. 

47 """ 

48 

49 _grpc_channel: aio.Channel 

50 _stubs: Dict[str, Callable] = {} 

51 

52 @classmethod 

53 def create_channel( 

54 cls, 

55 host: str = "bigquerystorage.googleapis.com", 

56 credentials: Optional[ga_credentials.Credentials] = None, 

57 credentials_file: Optional[str] = None, 

58 scopes: Optional[Sequence[str]] = None, 

59 quota_project_id: Optional[str] = None, 

60 **kwargs, 

61 ) -> aio.Channel: 

62 """Create and return a gRPC AsyncIO channel object. 

63 Args: 

64 host (Optional[str]): The host for the channel to use. 

65 credentials (Optional[~.Credentials]): The 

66 authorization credentials to attach to requests. These 

67 credentials identify this application to the service. If 

68 none are specified, the client will attempt to ascertain 

69 the credentials from the environment. 

70 credentials_file (Optional[str]): A file with credentials that can 

71 be loaded with :func:`google.auth.load_credentials_from_file`. 

72 This argument is ignored if ``channel`` is provided. 

73 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

74 service. These are only used when credentials are not specified and 

75 are passed to :func:`google.auth.default`. 

76 quota_project_id (Optional[str]): An optional project to use for billing 

77 and quota. 

78 kwargs (Optional[dict]): Keyword arguments, which are passed to the 

79 channel creation. 

80 Returns: 

81 aio.Channel: A gRPC AsyncIO channel object. 

82 """ 

83 

84 return grpc_helpers_async.create_channel( 

85 host, 

86 credentials=credentials, 

87 credentials_file=credentials_file, 

88 quota_project_id=quota_project_id, 

89 default_scopes=cls.AUTH_SCOPES, 

90 scopes=scopes, 

91 default_host=cls.DEFAULT_HOST, 

92 **kwargs, 

93 ) 

94 

95 def __init__( 

96 self, 

97 *, 

98 host: str = "bigquerystorage.googleapis.com", 

99 credentials: Optional[ga_credentials.Credentials] = None, 

100 credentials_file: Optional[str] = None, 

101 scopes: Optional[Sequence[str]] = None, 

102 channel: Optional[aio.Channel] = None, 

103 api_mtls_endpoint: Optional[str] = None, 

104 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

105 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 

106 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

107 quota_project_id: Optional[str] = None, 

108 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

109 always_use_jwt_access: Optional[bool] = False, 

110 api_audience: Optional[str] = None, 

111 ) -> None: 

112 """Instantiate the transport. 

113 

114 Args: 

115 host (Optional[str]): 

116 The hostname to connect to. 

117 credentials (Optional[google.auth.credentials.Credentials]): The 

118 authorization credentials to attach to requests. These 

119 credentials identify the application to the service; if none 

120 are specified, the client will attempt to ascertain the 

121 credentials from the environment. 

122 This argument is ignored if ``channel`` is provided. 

123 credentials_file (Optional[str]): A file with credentials that can 

124 be loaded with :func:`google.auth.load_credentials_from_file`. 

125 This argument is ignored if ``channel`` is provided. 

126 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

127 service. These are only used when credentials are not specified and 

128 are passed to :func:`google.auth.default`. 

129 channel (Optional[aio.Channel]): A ``Channel`` instance through 

130 which to make calls. 

131 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 

132 If provided, it overrides the ``host`` argument and tries to create 

133 a mutual TLS channel with client SSL credentials from 

134 ``client_cert_source`` or application default SSL credentials. 

135 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 

136 Deprecated. A callback to provide client SSL certificate bytes and 

137 private key bytes, both in PEM format. It is ignored if 

138 ``api_mtls_endpoint`` is None. 

139 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 

140 for the grpc channel. It is ignored if ``channel`` is provided. 

141 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 

142 A callback to provide client certificate bytes and private key bytes, 

143 both in PEM format. It is used to configure a mutual TLS channel. It is 

144 ignored if ``channel`` or ``ssl_channel_credentials`` is provided. 

145 quota_project_id (Optional[str]): An optional project to use for billing 

146 and quota. 

147 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

148 The client info used to send a user-agent string along with 

149 API requests. If ``None``, then default info will be used. 

150 Generally, you only need to set this if you're developing 

151 your own client library. 

152 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

153 be used for service account credentials. 

154 

155 Raises: 

156 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport 

157 creation failed for any reason. 

158 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

159 and ``credentials_file`` are passed. 

160 """ 

161 self._grpc_channel = None 

162 self._ssl_channel_credentials = ssl_channel_credentials 

163 self._stubs: Dict[str, Callable] = {} 

164 

165 if api_mtls_endpoint: 

166 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 

167 if client_cert_source: 

168 warnings.warn("client_cert_source is deprecated", DeprecationWarning) 

169 

170 if channel: 

171 # Ignore credentials if a channel was passed. 

172 credentials = False 

173 # If a channel was explicitly provided, set it. 

174 self._grpc_channel = channel 

175 self._ssl_channel_credentials = None 

176 else: 

177 if api_mtls_endpoint: 

178 host = api_mtls_endpoint 

179 

180 # Create SSL credentials with client_cert_source or application 

181 # default SSL credentials. 

182 if client_cert_source: 

183 cert, key = client_cert_source() 

184 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

185 certificate_chain=cert, private_key=key 

186 ) 

187 else: 

188 self._ssl_channel_credentials = SslCredentials().ssl_credentials 

189 

190 else: 

191 if client_cert_source_for_mtls and not ssl_channel_credentials: 

192 cert, key = client_cert_source_for_mtls() 

193 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

194 certificate_chain=cert, private_key=key 

195 ) 

196 

197 # The base transport sets the host, credentials and scopes 

198 super().__init__( 

199 host=host, 

200 credentials=credentials, 

201 credentials_file=credentials_file, 

202 scopes=scopes, 

203 quota_project_id=quota_project_id, 

204 client_info=client_info, 

205 always_use_jwt_access=always_use_jwt_access, 

206 api_audience=api_audience, 

207 ) 

208 

209 if not self._grpc_channel: 

210 self._grpc_channel = type(self).create_channel( 

211 self._host, 

212 # use the credentials which are saved 

213 credentials=self._credentials, 

214 # Set ``credentials_file`` to ``None`` here as 

215 # the credentials that we saved earlier should be used. 

216 credentials_file=None, 

217 scopes=self._scopes, 

218 ssl_credentials=self._ssl_channel_credentials, 

219 quota_project_id=quota_project_id, 

220 options=[ 

221 ("grpc.max_send_message_length", -1), 

222 ("grpc.max_receive_message_length", -1), 

223 ], 

224 ) 

225 

226 # Wrap messages. This must be done after self._grpc_channel exists 

227 self._prep_wrapped_messages(client_info) 

228 

229 @property 

230 def grpc_channel(self) -> aio.Channel: 

231 """Create the channel designed to connect to this service. 

232 

233 This property caches on the instance; repeated calls return 

234 the same channel. 

235 """ 

236 # Return the channel from cache. 

237 return self._grpc_channel 

238 

239 @property 

240 def create_write_stream( 

241 self, 

242 ) -> Callable[[storage.CreateWriteStreamRequest], Awaitable[stream.WriteStream]]: 

243 r"""Return a callable for the create write stream method over gRPC. 

244 

245 Creates a write stream to the given table. Additionally, every 

246 table has a special stream named '_default' to which data can be 

247 written. This stream doesn't need to be created using 

248 CreateWriteStream. It is a stream that can be used 

249 simultaneously by any number of clients. Data written to this 

250 stream is considered committed as soon as an acknowledgement is 

251 received. 

252 

253 Returns: 

254 Callable[[~.CreateWriteStreamRequest], 

255 Awaitable[~.WriteStream]]: 

256 A function that, when called, will call the underlying RPC 

257 on the server. 

258 """ 

259 # Generate a "stub function" on-the-fly which will actually make 

260 # the request. 

261 # gRPC handles serialization and deserialization, so we just need 

262 # to pass in the functions for each. 

263 if "create_write_stream" not in self._stubs: 

264 self._stubs["create_write_stream"] = self.grpc_channel.unary_unary( 

265 "/google.cloud.bigquery.storage.v1.BigQueryWrite/CreateWriteStream", 

266 request_serializer=storage.CreateWriteStreamRequest.serialize, 

267 response_deserializer=stream.WriteStream.deserialize, 

268 ) 

269 return self._stubs["create_write_stream"] 

270 

271 @property 

272 def append_rows( 

273 self, 

274 ) -> Callable[[storage.AppendRowsRequest], Awaitable[storage.AppendRowsResponse]]: 

275 r"""Return a callable for the append rows method over gRPC. 

276 

277 Appends data to the given stream. 

278 

279 If ``offset`` is specified, the ``offset`` is checked against 

280 the end of stream. The server returns ``OUT_OF_RANGE`` in 

281 ``AppendRowsResponse`` if an attempt is made to append to an 

282 offset beyond the current end of the stream or 

283 ``ALREADY_EXISTS`` if user provides an ``offset`` that has 

284 already been written to. User can retry with adjusted offset 

285 within the same RPC connection. If ``offset`` is not specified, 

286 append happens at the end of the stream. 

287 

288 The response contains an optional offset at which the append 

289 happened. No offset information will be returned for appends to 

290 a default stream. 

291 

292 Responses are received in the same order in which requests are 

293 sent. There will be one response for each successful inserted 

294 request. Responses may optionally embed error information if the 

295 originating AppendRequest was not successfully processed. 

296 

297 The specifics of when successfully appended data is made visible 

298 to the table are governed by the type of stream: 

299 

300 - For COMMITTED streams (which includes the default stream), 

301 data is visible immediately upon successful append. 

302 

303 - For BUFFERED streams, data is made visible via a subsequent 

304 ``FlushRows`` rpc which advances a cursor to a newer offset 

305 in the stream. 

306 

307 - For PENDING streams, data is not made visible until the 

308 stream itself is finalized (via the ``FinalizeWriteStream`` 

309 rpc), and the stream is explicitly committed via the 

310 ``BatchCommitWriteStreams`` rpc. 

311 

312 Returns: 

313 Callable[[~.AppendRowsRequest], 

314 Awaitable[~.AppendRowsResponse]]: 

315 A function that, when called, will call the underlying RPC 

316 on the server. 

317 """ 

318 # Generate a "stub function" on-the-fly which will actually make 

319 # the request. 

320 # gRPC handles serialization and deserialization, so we just need 

321 # to pass in the functions for each. 

322 if "append_rows" not in self._stubs: 

323 self._stubs["append_rows"] = self.grpc_channel.stream_stream( 

324 "/google.cloud.bigquery.storage.v1.BigQueryWrite/AppendRows", 

325 request_serializer=storage.AppendRowsRequest.serialize, 

326 response_deserializer=storage.AppendRowsResponse.deserialize, 

327 ) 

328 return self._stubs["append_rows"] 

329 

330 @property 

331 def get_write_stream( 

332 self, 

333 ) -> Callable[[storage.GetWriteStreamRequest], Awaitable[stream.WriteStream]]: 

334 r"""Return a callable for the get write stream method over gRPC. 

335 

336 Gets information about a write stream. 

337 

338 Returns: 

339 Callable[[~.GetWriteStreamRequest], 

340 Awaitable[~.WriteStream]]: 

341 A function that, when called, will call the underlying RPC 

342 on the server. 

343 """ 

344 # Generate a "stub function" on-the-fly which will actually make 

345 # the request. 

346 # gRPC handles serialization and deserialization, so we just need 

347 # to pass in the functions for each. 

348 if "get_write_stream" not in self._stubs: 

349 self._stubs["get_write_stream"] = self.grpc_channel.unary_unary( 

350 "/google.cloud.bigquery.storage.v1.BigQueryWrite/GetWriteStream", 

351 request_serializer=storage.GetWriteStreamRequest.serialize, 

352 response_deserializer=stream.WriteStream.deserialize, 

353 ) 

354 return self._stubs["get_write_stream"] 

355 

356 @property 

357 def finalize_write_stream( 

358 self, 

359 ) -> Callable[ 

360 [storage.FinalizeWriteStreamRequest], 

361 Awaitable[storage.FinalizeWriteStreamResponse], 

362 ]: 

363 r"""Return a callable for the finalize write stream method over gRPC. 

364 

365 Finalize a write stream so that no new data can be appended to 

366 the stream. Finalize is not supported on the '_default' stream. 

367 

368 Returns: 

369 Callable[[~.FinalizeWriteStreamRequest], 

370 Awaitable[~.FinalizeWriteStreamResponse]]: 

371 A function that, when called, will call the underlying RPC 

372 on the server. 

373 """ 

374 # Generate a "stub function" on-the-fly which will actually make 

375 # the request. 

376 # gRPC handles serialization and deserialization, so we just need 

377 # to pass in the functions for each. 

378 if "finalize_write_stream" not in self._stubs: 

379 self._stubs["finalize_write_stream"] = self.grpc_channel.unary_unary( 

380 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FinalizeWriteStream", 

381 request_serializer=storage.FinalizeWriteStreamRequest.serialize, 

382 response_deserializer=storage.FinalizeWriteStreamResponse.deserialize, 

383 ) 

384 return self._stubs["finalize_write_stream"] 

385 

386 @property 

387 def batch_commit_write_streams( 

388 self, 

389 ) -> Callable[ 

390 [storage.BatchCommitWriteStreamsRequest], 

391 Awaitable[storage.BatchCommitWriteStreamsResponse], 

392 ]: 

393 r"""Return a callable for the batch commit write streams method over gRPC. 

394 

395 Atomically commits a group of ``PENDING`` streams that belong to 

396 the same ``parent`` table. 

397 

398 Streams must be finalized before commit and cannot be committed 

399 multiple times. Once a stream is committed, data in the stream 

400 becomes available for read operations. 

401 

402 Returns: 

403 Callable[[~.BatchCommitWriteStreamsRequest], 

404 Awaitable[~.BatchCommitWriteStreamsResponse]]: 

405 A function that, when called, will call the underlying RPC 

406 on the server. 

407 """ 

408 # Generate a "stub function" on-the-fly which will actually make 

409 # the request. 

410 # gRPC handles serialization and deserialization, so we just need 

411 # to pass in the functions for each. 

412 if "batch_commit_write_streams" not in self._stubs: 

413 self._stubs["batch_commit_write_streams"] = self.grpc_channel.unary_unary( 

414 "/google.cloud.bigquery.storage.v1.BigQueryWrite/BatchCommitWriteStreams", 

415 request_serializer=storage.BatchCommitWriteStreamsRequest.serialize, 

416 response_deserializer=storage.BatchCommitWriteStreamsResponse.deserialize, 

417 ) 

418 return self._stubs["batch_commit_write_streams"] 

419 

420 @property 

421 def flush_rows( 

422 self, 

423 ) -> Callable[[storage.FlushRowsRequest], Awaitable[storage.FlushRowsResponse]]: 

424 r"""Return a callable for the flush rows method over gRPC. 

425 

426 Flushes rows to a BUFFERED stream. 

427 

428 If users are appending rows to BUFFERED stream, flush operation 

429 is required in order for the rows to become available for 

430 reading. A Flush operation flushes up to any previously flushed 

431 offset in a BUFFERED stream, to the offset specified in the 

432 request. 

433 

434 Flush is not supported on the \_default stream, since it is not 

435 BUFFERED. 

436 

437 Returns: 

438 Callable[[~.FlushRowsRequest], 

439 Awaitable[~.FlushRowsResponse]]: 

440 A function that, when called, will call the underlying RPC 

441 on the server. 

442 """ 

443 # Generate a "stub function" on-the-fly which will actually make 

444 # the request. 

445 # gRPC handles serialization and deserialization, so we just need 

446 # to pass in the functions for each. 

447 if "flush_rows" not in self._stubs: 

448 self._stubs["flush_rows"] = self.grpc_channel.unary_unary( 

449 "/google.cloud.bigquery.storage.v1.BigQueryWrite/FlushRows", 

450 request_serializer=storage.FlushRowsRequest.serialize, 

451 response_deserializer=storage.FlushRowsResponse.deserialize, 

452 ) 

453 return self._stubs["flush_rows"] 

454 

455 def close(self): 

456 return self.grpc_channel.close() 

457 

458 

459__all__ = ("BigQueryWriteGrpcAsyncIOTransport",)