Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py: 44%

64 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:10 +0000

1# -*- coding: utf-8 -*- 

2# Copyright 2022 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import warnings 

17from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union 

18 

19from google.api_core import gapic_v1 

20from google.api_core import grpc_helpers_async 

21from google.auth import credentials as ga_credentials # type: ignore 

22from google.auth.transport.grpc import SslCredentials # type: ignore 

23 

24import grpc # type: ignore 

25from grpc.experimental import aio # type: ignore 

26 

27from google.cloud.bigquery_storage_v1.types import storage 

28from google.cloud.bigquery_storage_v1.types import stream 

29from .base import BigQueryReadTransport, DEFAULT_CLIENT_INFO 

30from .grpc import BigQueryReadGrpcTransport 

31 

32 

33class BigQueryReadGrpcAsyncIOTransport(BigQueryReadTransport): 

34 """gRPC AsyncIO backend transport for BigQueryRead. 

35 

36 BigQuery Read API. 

37 The Read API can be used to read data from BigQuery. 

38 

39 This class defines the same methods as the primary client, so the 

40 primary client can load the underlying transport implementation 

41 and call it. 

42 

43 It sends protocol buffers over the wire using gRPC (which is built on 

44 top of HTTP/2); the ``grpcio`` package must be installed. 

45 """ 

46 

47 _grpc_channel: aio.Channel 

48 _stubs: Dict[str, Callable] = {} 

49 

50 @classmethod 

51 def create_channel( 

52 cls, 

53 host: str = "bigquerystorage.googleapis.com", 

54 credentials: Optional[ga_credentials.Credentials] = None, 

55 credentials_file: Optional[str] = None, 

56 scopes: Optional[Sequence[str]] = None, 

57 quota_project_id: Optional[str] = None, 

58 **kwargs, 

59 ) -> aio.Channel: 

60 """Create and return a gRPC AsyncIO channel object. 

61 Args: 

62 host (Optional[str]): The host for the channel to use. 

63 credentials (Optional[~.Credentials]): The 

64 authorization credentials to attach to requests. These 

65 credentials identify this application to the service. If 

66 none are specified, the client will attempt to ascertain 

67 the credentials from the environment. 

68 credentials_file (Optional[str]): A file with credentials that can 

69 be loaded with :func:`google.auth.load_credentials_from_file`. 

70 This argument is ignored if ``channel`` is provided. 

71 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

72 service. These are only used when credentials are not specified and 

73 are passed to :func:`google.auth.default`. 

74 quota_project_id (Optional[str]): An optional project to use for billing 

75 and quota. 

76 kwargs (Optional[dict]): Keyword arguments, which are passed to the 

77 channel creation. 

78 Returns: 

79 aio.Channel: A gRPC AsyncIO channel object. 

80 """ 

81 

82 return grpc_helpers_async.create_channel( 

83 host, 

84 credentials=credentials, 

85 credentials_file=credentials_file, 

86 quota_project_id=quota_project_id, 

87 default_scopes=cls.AUTH_SCOPES, 

88 scopes=scopes, 

89 default_host=cls.DEFAULT_HOST, 

90 **kwargs, 

91 ) 

92 

93 def __init__( 

94 self, 

95 *, 

96 host: str = "bigquerystorage.googleapis.com", 

97 credentials: Optional[ga_credentials.Credentials] = None, 

98 credentials_file: Optional[str] = None, 

99 scopes: Optional[Sequence[str]] = None, 

100 channel: Optional[aio.Channel] = None, 

101 api_mtls_endpoint: Optional[str] = None, 

102 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

103 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 

104 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

105 quota_project_id: Optional[str] = None, 

106 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

107 always_use_jwt_access: Optional[bool] = False, 

108 api_audience: Optional[str] = None, 

109 ) -> None: 

110 """Instantiate the transport. 

111 

112 Args: 

113 host (Optional[str]): 

114 The hostname to connect to. 

115 credentials (Optional[google.auth.credentials.Credentials]): The 

116 authorization credentials to attach to requests. These 

117 credentials identify the application to the service; if none 

118 are specified, the client will attempt to ascertain the 

119 credentials from the environment. 

120 This argument is ignored if ``channel`` is provided. 

121 credentials_file (Optional[str]): A file with credentials that can 

122 be loaded with :func:`google.auth.load_credentials_from_file`. 

123 This argument is ignored if ``channel`` is provided. 

124 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

125 service. These are only used when credentials are not specified and 

126 are passed to :func:`google.auth.default`. 

127 channel (Optional[aio.Channel]): A ``Channel`` instance through 

128 which to make calls. 

129 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 

130 If provided, it overrides the ``host`` argument and tries to create 

131 a mutual TLS channel with client SSL credentials from 

132 ``client_cert_source`` or application default SSL credentials. 

133 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 

134 Deprecated. A callback to provide client SSL certificate bytes and 

135 private key bytes, both in PEM format. It is ignored if 

136 ``api_mtls_endpoint`` is None. 

137 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 

138 for the grpc channel. It is ignored if ``channel`` is provided. 

139 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 

140 A callback to provide client certificate bytes and private key bytes, 

141 both in PEM format. It is used to configure a mutual TLS channel. It is 

142 ignored if ``channel`` or ``ssl_channel_credentials`` is provided. 

143 quota_project_id (Optional[str]): An optional project to use for billing 

144 and quota. 

145 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

146 The client info used to send a user-agent string along with 

147 API requests. If ``None``, then default info will be used. 

148 Generally, you only need to set this if you're developing 

149 your own client library. 

150 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

151 be used for service account credentials. 

152 

153 Raises: 

154 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport 

155 creation failed for any reason. 

156 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

157 and ``credentials_file`` are passed. 

158 """ 

159 self._grpc_channel = None 

160 self._ssl_channel_credentials = ssl_channel_credentials 

161 self._stubs: Dict[str, Callable] = {} 

162 

163 if api_mtls_endpoint: 

164 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 

165 if client_cert_source: 

166 warnings.warn("client_cert_source is deprecated", DeprecationWarning) 

167 

168 if channel: 

169 # Ignore credentials if a channel was passed. 

170 credentials = False 

171 # If a channel was explicitly provided, set it. 

172 self._grpc_channel = channel 

173 self._ssl_channel_credentials = None 

174 else: 

175 if api_mtls_endpoint: 

176 host = api_mtls_endpoint 

177 

178 # Create SSL credentials with client_cert_source or application 

179 # default SSL credentials. 

180 if client_cert_source: 

181 cert, key = client_cert_source() 

182 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

183 certificate_chain=cert, private_key=key 

184 ) 

185 else: 

186 self._ssl_channel_credentials = SslCredentials().ssl_credentials 

187 

188 else: 

189 if client_cert_source_for_mtls and not ssl_channel_credentials: 

190 cert, key = client_cert_source_for_mtls() 

191 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

192 certificate_chain=cert, private_key=key 

193 ) 

194 

195 # The base transport sets the host, credentials and scopes 

196 super().__init__( 

197 host=host, 

198 credentials=credentials, 

199 credentials_file=credentials_file, 

200 scopes=scopes, 

201 quota_project_id=quota_project_id, 

202 client_info=client_info, 

203 always_use_jwt_access=always_use_jwt_access, 

204 api_audience=api_audience, 

205 ) 

206 

207 if not self._grpc_channel: 

208 self._grpc_channel = type(self).create_channel( 

209 self._host, 

210 # use the credentials which are saved 

211 credentials=self._credentials, 

212 # Set ``credentials_file`` to ``None`` here as 

213 # the credentials that we saved earlier should be used. 

214 credentials_file=None, 

215 scopes=self._scopes, 

216 ssl_credentials=self._ssl_channel_credentials, 

217 quota_project_id=quota_project_id, 

218 options=[ 

219 ("grpc.max_send_message_length", -1), 

220 ("grpc.max_receive_message_length", -1), 

221 ], 

222 ) 

223 

224 # Wrap messages. This must be done after self._grpc_channel exists 

225 self._prep_wrapped_messages(client_info) 

226 

227 @property 

228 def grpc_channel(self) -> aio.Channel: 

229 """Create the channel designed to connect to this service. 

230 

231 This property caches on the instance; repeated calls return 

232 the same channel. 

233 """ 

234 # Return the channel from cache. 

235 return self._grpc_channel 

236 

237 @property 

238 def create_read_session( 

239 self, 

240 ) -> Callable[[storage.CreateReadSessionRequest], Awaitable[stream.ReadSession]]: 

241 r"""Return a callable for the create read session method over gRPC. 

242 

243 Creates a new read session. A read session divides 

244 the contents of a BigQuery table into one or more 

245 streams, which can then be used to read data from the 

246 table. The read session also specifies properties of the 

247 data to be read, such as a list of columns or a 

248 push-down filter describing the rows to be returned. 

249 

250 A particular row can be read by at most one stream. When 

251 the caller has reached the end of each stream in the 

252 session, then all the data in the table has been read. 

253 

254 Data is assigned to each stream such that roughly the 

255 same number of rows can be read from each stream. 

256 Because the server-side unit for assigning data is 

257 collections of rows, the API does not guarantee that 

258 each stream will return the same number or rows. 

259 Additionally, the limits are enforced based on the 

260 number of pre-filtered rows, so some filters can lead to 

261 lopsided assignments. 

262 

263 Read sessions automatically expire 6 hours after they 

264 are created and do not require manual clean-up by the 

265 caller. 

266 

267 Returns: 

268 Callable[[~.CreateReadSessionRequest], 

269 Awaitable[~.ReadSession]]: 

270 A function that, when called, will call the underlying RPC 

271 on the server. 

272 """ 

273 # Generate a "stub function" on-the-fly which will actually make 

274 # the request. 

275 # gRPC handles serialization and deserialization, so we just need 

276 # to pass in the functions for each. 

277 if "create_read_session" not in self._stubs: 

278 self._stubs["create_read_session"] = self.grpc_channel.unary_unary( 

279 "/google.cloud.bigquery.storage.v1.BigQueryRead/CreateReadSession", 

280 request_serializer=storage.CreateReadSessionRequest.serialize, 

281 response_deserializer=stream.ReadSession.deserialize, 

282 ) 

283 return self._stubs["create_read_session"] 

284 

285 @property 

286 def read_rows( 

287 self, 

288 ) -> Callable[[storage.ReadRowsRequest], Awaitable[storage.ReadRowsResponse]]: 

289 r"""Return a callable for the read rows method over gRPC. 

290 

291 Reads rows from the stream in the format prescribed 

292 by the ReadSession. Each response contains one or more 

293 table rows, up to a maximum of 100 MiB per response; 

294 read requests which attempt to read individual rows 

295 larger than 100 MiB will fail. 

296 

297 Each request also returns a set of stream statistics 

298 reflecting the current state of the stream. 

299 

300 Returns: 

301 Callable[[~.ReadRowsRequest], 

302 Awaitable[~.ReadRowsResponse]]: 

303 A function that, when called, will call the underlying RPC 

304 on the server. 

305 """ 

306 # Generate a "stub function" on-the-fly which will actually make 

307 # the request. 

308 # gRPC handles serialization and deserialization, so we just need 

309 # to pass in the functions for each. 

310 if "read_rows" not in self._stubs: 

311 self._stubs["read_rows"] = self.grpc_channel.unary_stream( 

312 "/google.cloud.bigquery.storage.v1.BigQueryRead/ReadRows", 

313 request_serializer=storage.ReadRowsRequest.serialize, 

314 response_deserializer=storage.ReadRowsResponse.deserialize, 

315 ) 

316 return self._stubs["read_rows"] 

317 

318 @property 

319 def split_read_stream( 

320 self, 

321 ) -> Callable[ 

322 [storage.SplitReadStreamRequest], Awaitable[storage.SplitReadStreamResponse] 

323 ]: 

324 r"""Return a callable for the split read stream method over gRPC. 

325 

326 Splits a given ``ReadStream`` into two ``ReadStream`` objects. 

327 These ``ReadStream`` objects are referred to as the primary and 

328 the residual streams of the split. The original ``ReadStream`` 

329 can still be read from in the same manner as before. Both of the 

330 returned ``ReadStream`` objects can also be read from, and the 

331 rows returned by both child streams will be the same as the rows 

332 read from the original stream. 

333 

334 Moreover, the two child streams will be allocated back-to-back 

335 in the original ``ReadStream``. Concretely, it is guaranteed 

336 that for streams original, primary, and residual, that 

337 original[0-j] = primary[0-j] and original[j-n] = residual[0-m] 

338 once the streams have been read to completion. 

339 

340 Returns: 

341 Callable[[~.SplitReadStreamRequest], 

342 Awaitable[~.SplitReadStreamResponse]]: 

343 A function that, when called, will call the underlying RPC 

344 on the server. 

345 """ 

346 # Generate a "stub function" on-the-fly which will actually make 

347 # the request. 

348 # gRPC handles serialization and deserialization, so we just need 

349 # to pass in the functions for each. 

350 if "split_read_stream" not in self._stubs: 

351 self._stubs["split_read_stream"] = self.grpc_channel.unary_unary( 

352 "/google.cloud.bigquery.storage.v1.BigQueryRead/SplitReadStream", 

353 request_serializer=storage.SplitReadStreamRequest.serialize, 

354 response_deserializer=storage.SplitReadStreamResponse.deserialize, 

355 ) 

356 return self._stubs["split_read_stream"] 

357 

358 def close(self): 

359 return self.grpc_channel.close() 

360 

361 

362__all__ = ("BigQueryReadGrpcAsyncIOTransport",)