Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

62 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2024 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union 

17import warnings 

18 

19from google.api_core import gapic_v1, grpc_helpers_async 

20from google.auth import credentials as ga_credentials # type: ignore 

21from google.auth.transport.grpc import SslCredentials # type: ignore 

22import grpc # type: ignore 

23from grpc.experimental import aio # type: ignore 

24 

25from google.cloud.bigquery_storage_v1.types import storage, stream 

26 

27from .base import DEFAULT_CLIENT_INFO, BigQueryReadTransport 

28from .grpc import BigQueryReadGrpcTransport 

29 

30 

31class BigQueryReadGrpcAsyncIOTransport(BigQueryReadTransport): 

32 """gRPC AsyncIO backend transport for BigQueryRead. 

33 

34 BigQuery Read API. 

35 

36 The Read API can be used to read data from BigQuery. 

37 

38 This class defines the same methods as the primary client, so the 

39 primary client can load the underlying transport implementation 

40 and call it. 

41 

42 It sends protocol buffers over the wire using gRPC (which is built on 

43 top of HTTP/2); the ``grpcio`` package must be installed. 

44 """ 

45 

46 _grpc_channel: aio.Channel 

47 _stubs: Dict[str, Callable] = {} 

48 

49 @classmethod 

50 def create_channel( 

51 cls, 

52 host: str = "bigquerystorage.googleapis.com", 

53 credentials: Optional[ga_credentials.Credentials] = None, 

54 credentials_file: Optional[str] = None, 

55 scopes: Optional[Sequence[str]] = None, 

56 quota_project_id: Optional[str] = None, 

57 **kwargs, 

58 ) -> aio.Channel: 

59 """Create and return a gRPC AsyncIO channel object. 

60 Args: 

61 host (Optional[str]): The host for the channel to use. 

62 credentials (Optional[~.Credentials]): The 

63 authorization credentials to attach to requests. These 

64 credentials identify this application to the service. If 

65 none are specified, the client will attempt to ascertain 

66 the credentials from the environment. 

67 credentials_file (Optional[str]): A file with credentials that can 

68 be loaded with :func:`google.auth.load_credentials_from_file`. 

69 This argument is ignored if ``channel`` is provided. 

70 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

71 service. These are only used when credentials are not specified and 

72 are passed to :func:`google.auth.default`. 

73 quota_project_id (Optional[str]): An optional project to use for billing 

74 and quota. 

75 kwargs (Optional[dict]): Keyword arguments, which are passed to the 

76 channel creation. 

77 Returns: 

78 aio.Channel: A gRPC AsyncIO channel object. 

79 """ 

80 

81 return grpc_helpers_async.create_channel( 

82 host, 

83 credentials=credentials, 

84 credentials_file=credentials_file, 

85 quota_project_id=quota_project_id, 

86 default_scopes=cls.AUTH_SCOPES, 

87 scopes=scopes, 

88 default_host=cls.DEFAULT_HOST, 

89 **kwargs, 

90 ) 

91 

92 def __init__( 

93 self, 

94 *, 

95 host: str = "bigquerystorage.googleapis.com", 

96 credentials: Optional[ga_credentials.Credentials] = None, 

97 credentials_file: Optional[str] = None, 

98 scopes: Optional[Sequence[str]] = None, 

99 channel: Optional[aio.Channel] = None, 

100 api_mtls_endpoint: Optional[str] = None, 

101 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

102 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None, 

103 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None, 

104 quota_project_id: Optional[str] = None, 

105 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

106 always_use_jwt_access: Optional[bool] = False, 

107 api_audience: Optional[str] = None, 

108 ) -> None: 

109 """Instantiate the transport. 

110 

111 Args: 

112 host (Optional[str]): 

113 The hostname to connect to (default: 'bigquerystorage.googleapis.com'). 

114 credentials (Optional[google.auth.credentials.Credentials]): The 

115 authorization credentials to attach to requests. These 

116 credentials identify the application to the service; if none 

117 are specified, the client will attempt to ascertain the 

118 credentials from the environment. 

119 This argument is ignored if ``channel`` is provided. 

120 credentials_file (Optional[str]): A file with credentials that can 

121 be loaded with :func:`google.auth.load_credentials_from_file`. 

122 This argument is ignored if ``channel`` is provided. 

123 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this 

124 service. These are only used when credentials are not specified and 

125 are passed to :func:`google.auth.default`. 

126 channel (Optional[aio.Channel]): A ``Channel`` instance through 

127 which to make calls. 

128 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint. 

129 If provided, it overrides the ``host`` argument and tries to create 

130 a mutual TLS channel with client SSL credentials from 

131 ``client_cert_source`` or application default SSL credentials. 

132 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]): 

133 Deprecated. A callback to provide client SSL certificate bytes and 

134 private key bytes, both in PEM format. It is ignored if 

135 ``api_mtls_endpoint`` is None. 

136 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials 

137 for the grpc channel. It is ignored if ``channel`` is provided. 

138 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]): 

139 A callback to provide client certificate bytes and private key bytes, 

140 both in PEM format. It is used to configure a mutual TLS channel. It is 

141 ignored if ``channel`` or ``ssl_channel_credentials`` is provided. 

142 quota_project_id (Optional[str]): An optional project to use for billing 

143 and quota. 

144 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

145 The client info used to send a user-agent string along with 

146 API requests. If ``None``, then default info will be used. 

147 Generally, you only need to set this if you're developing 

148 your own client library. 

149 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

150 be used for service account credentials. 

151 

152 Raises: 

153 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport 

154 creation failed for any reason. 

155 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` 

156 and ``credentials_file`` are passed. 

157 """ 

158 self._grpc_channel = None 

159 self._ssl_channel_credentials = ssl_channel_credentials 

160 self._stubs: Dict[str, Callable] = {} 

161 

162 if api_mtls_endpoint: 

163 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) 

164 if client_cert_source: 

165 warnings.warn("client_cert_source is deprecated", DeprecationWarning) 

166 

167 if channel: 

168 # Ignore credentials if a channel was passed. 

169 credentials = False 

170 # If a channel was explicitly provided, set it. 

171 self._grpc_channel = channel 

172 self._ssl_channel_credentials = None 

173 else: 

174 if api_mtls_endpoint: 

175 host = api_mtls_endpoint 

176 

177 # Create SSL credentials with client_cert_source or application 

178 # default SSL credentials. 

179 if client_cert_source: 

180 cert, key = client_cert_source() 

181 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

182 certificate_chain=cert, private_key=key 

183 ) 

184 else: 

185 self._ssl_channel_credentials = SslCredentials().ssl_credentials 

186 

187 else: 

188 if client_cert_source_for_mtls and not ssl_channel_credentials: 

189 cert, key = client_cert_source_for_mtls() 

190 self._ssl_channel_credentials = grpc.ssl_channel_credentials( 

191 certificate_chain=cert, private_key=key 

192 ) 

193 

194 # The base transport sets the host, credentials and scopes 

195 super().__init__( 

196 host=host, 

197 credentials=credentials, 

198 credentials_file=credentials_file, 

199 scopes=scopes, 

200 quota_project_id=quota_project_id, 

201 client_info=client_info, 

202 always_use_jwt_access=always_use_jwt_access, 

203 api_audience=api_audience, 

204 ) 

205 

206 if not self._grpc_channel: 

207 self._grpc_channel = type(self).create_channel( 

208 self._host, 

209 # use the credentials which are saved 

210 credentials=self._credentials, 

211 # Set ``credentials_file`` to ``None`` here as 

212 # the credentials that we saved earlier should be used. 

213 credentials_file=None, 

214 scopes=self._scopes, 

215 ssl_credentials=self._ssl_channel_credentials, 

216 quota_project_id=quota_project_id, 

217 options=[ 

218 ("grpc.max_send_message_length", -1), 

219 ("grpc.max_receive_message_length", -1), 

220 ], 

221 ) 

222 

223 # Wrap messages. This must be done after self._grpc_channel exists 

224 self._prep_wrapped_messages(client_info) 

225 

226 @property 

227 def grpc_channel(self) -> aio.Channel: 

228 """Create the channel designed to connect to this service. 

229 

230 This property caches on the instance; repeated calls return 

231 the same channel. 

232 """ 

233 # Return the channel from cache. 

234 return self._grpc_channel 

235 

236 @property 

237 def create_read_session( 

238 self, 

239 ) -> Callable[[storage.CreateReadSessionRequest], Awaitable[stream.ReadSession]]: 

240 r"""Return a callable for the create read session method over gRPC. 

241 

242 Creates a new read session. A read session divides 

243 the contents of a BigQuery table into one or more 

244 streams, which can then be used to read data from the 

245 table. The read session also specifies properties of the 

246 data to be read, such as a list of columns or a 

247 push-down filter describing the rows to be returned. 

248 

249 A particular row can be read by at most one stream. When 

250 the caller has reached the end of each stream in the 

251 session, then all the data in the table has been read. 

252 

253 Data is assigned to each stream such that roughly the 

254 same number of rows can be read from each stream. 

255 Because the server-side unit for assigning data is 

256 collections of rows, the API does not guarantee that 

257 each stream will return the same number or rows. 

258 Additionally, the limits are enforced based on the 

259 number of pre-filtered rows, so some filters can lead to 

260 lopsided assignments. 

261 

262 Read sessions automatically expire 6 hours after they 

263 are created and do not require manual clean-up by the 

264 caller. 

265 

266 Returns: 

267 Callable[[~.CreateReadSessionRequest], 

268 Awaitable[~.ReadSession]]: 

269 A function that, when called, will call the underlying RPC 

270 on the server. 

271 """ 

272 # Generate a "stub function" on-the-fly which will actually make 

273 # the request. 

274 # gRPC handles serialization and deserialization, so we just need 

275 # to pass in the functions for each. 

276 if "create_read_session" not in self._stubs: 

277 self._stubs["create_read_session"] = self.grpc_channel.unary_unary( 

278 "/google.cloud.bigquery.storage.v1.BigQueryRead/CreateReadSession", 

279 request_serializer=storage.CreateReadSessionRequest.serialize, 

280 response_deserializer=stream.ReadSession.deserialize, 

281 ) 

282 return self._stubs["create_read_session"] 

283 

284 @property 

285 def read_rows( 

286 self, 

287 ) -> Callable[[storage.ReadRowsRequest], Awaitable[storage.ReadRowsResponse]]: 

288 r"""Return a callable for the read rows method over gRPC. 

289 

290 Reads rows from the stream in the format prescribed 

291 by the ReadSession. Each response contains one or more 

292 table rows, up to a maximum of 100 MiB per response; 

293 read requests which attempt to read individual rows 

294 larger than 100 MiB will fail. 

295 

296 Each request also returns a set of stream statistics 

297 reflecting the current state of the stream. 

298 

299 Returns: 

300 Callable[[~.ReadRowsRequest], 

301 Awaitable[~.ReadRowsResponse]]: 

302 A function that, when called, will call the underlying RPC 

303 on the server. 

304 """ 

305 # Generate a "stub function" on-the-fly which will actually make 

306 # the request. 

307 # gRPC handles serialization and deserialization, so we just need 

308 # to pass in the functions for each. 

309 if "read_rows" not in self._stubs: 

310 self._stubs["read_rows"] = self.grpc_channel.unary_stream( 

311 "/google.cloud.bigquery.storage.v1.BigQueryRead/ReadRows", 

312 request_serializer=storage.ReadRowsRequest.serialize, 

313 response_deserializer=storage.ReadRowsResponse.deserialize, 

314 ) 

315 return self._stubs["read_rows"] 

316 

317 @property 

318 def split_read_stream( 

319 self, 

320 ) -> Callable[ 

321 [storage.SplitReadStreamRequest], Awaitable[storage.SplitReadStreamResponse] 

322 ]: 

323 r"""Return a callable for the split read stream method over gRPC. 

324 

325 Splits a given ``ReadStream`` into two ``ReadStream`` objects. 

326 These ``ReadStream`` objects are referred to as the primary and 

327 the residual streams of the split. The original ``ReadStream`` 

328 can still be read from in the same manner as before. Both of the 

329 returned ``ReadStream`` objects can also be read from, and the 

330 rows returned by both child streams will be the same as the rows 

331 read from the original stream. 

332 

333 Moreover, the two child streams will be allocated back-to-back 

334 in the original ``ReadStream``. Concretely, it is guaranteed 

335 that for streams original, primary, and residual, that 

336 original[0-j] = primary[0-j] and original[j-n] = residual[0-m] 

337 once the streams have been read to completion. 

338 

339 Returns: 

340 Callable[[~.SplitReadStreamRequest], 

341 Awaitable[~.SplitReadStreamResponse]]: 

342 A function that, when called, will call the underlying RPC 

343 on the server. 

344 """ 

345 # Generate a "stub function" on-the-fly which will actually make 

346 # the request. 

347 # gRPC handles serialization and deserialization, so we just need 

348 # to pass in the functions for each. 

349 if "split_read_stream" not in self._stubs: 

350 self._stubs["split_read_stream"] = self.grpc_channel.unary_unary( 

351 "/google.cloud.bigquery.storage.v1.BigQueryRead/SplitReadStream", 

352 request_serializer=storage.SplitReadStreamRequest.serialize, 

353 response_deserializer=storage.SplitReadStreamResponse.deserialize, 

354 ) 

355 return self._stubs["split_read_stream"] 

356 

357 def close(self): 

358 return self.grpc_channel.close() 

359 

360 

361__all__ = ("BigQueryReadGrpcAsyncIOTransport",)