Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_write/transports/base.py: 58%

59 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:10 +0000

1# -*- coding: utf-8 -*- 

2# Copyright 2022 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import abc 

17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union 

18 

19from google.cloud.bigquery_storage_v1 import gapic_version as package_version 

20 

21import google.auth # type: ignore 

22import google.api_core 

23from google.api_core import exceptions as core_exceptions 

24from google.api_core import gapic_v1 

25from google.api_core import retry as retries 

26from google.auth import credentials as ga_credentials # type: ignore 

27from google.oauth2 import service_account # type: ignore 

28 

29from google.cloud.bigquery_storage_v1.types import storage 

30from google.cloud.bigquery_storage_v1.types import stream 

31 

32DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

33 gapic_version=package_version.__version__ 

34) 

35 

36 

37class BigQueryWriteTransport(abc.ABC): 

38 """Abstract transport class for BigQueryWrite.""" 

39 

40 AUTH_SCOPES = ( 

41 "https://www.googleapis.com/auth/bigquery", 

42 "https://www.googleapis.com/auth/bigquery.insertdata", 

43 "https://www.googleapis.com/auth/cloud-platform", 

44 ) 

45 

46 DEFAULT_HOST: str = "bigquerystorage.googleapis.com" 

47 

48 def __init__( 

49 self, 

50 *, 

51 host: str = DEFAULT_HOST, 

52 credentials: Optional[ga_credentials.Credentials] = None, 

53 credentials_file: Optional[str] = None, 

54 scopes: Optional[Sequence[str]] = None, 

55 quota_project_id: Optional[str] = None, 

56 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

57 always_use_jwt_access: Optional[bool] = False, 

58 api_audience: Optional[str] = None, 

59 **kwargs, 

60 ) -> None: 

61 """Instantiate the transport. 

62 

63 Args: 

64 host (Optional[str]): 

65 The hostname to connect to. 

66 credentials (Optional[google.auth.credentials.Credentials]): The 

67 authorization credentials to attach to requests. These 

68 credentials identify the application to the service; if none 

69 are specified, the client will attempt to ascertain the 

70 credentials from the environment. 

71 credentials_file (Optional[str]): A file with credentials that can 

72 be loaded with :func:`google.auth.load_credentials_from_file`. 

73 This argument is mutually exclusive with credentials. 

74 scopes (Optional[Sequence[str]]): A list of scopes. 

75 quota_project_id (Optional[str]): An optional project to use for billing 

76 and quota. 

77 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

78 The client info used to send a user-agent string along with 

79 API requests. If ``None``, then default info will be used. 

80 Generally, you only need to set this if you're developing 

81 your own client library. 

82 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

83 be used for service account credentials. 

84 """ 

85 

86 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES} 

87 

88 # Save the scopes. 

89 self._scopes = scopes 

90 

91 # If no credentials are provided, then determine the appropriate 

92 # defaults. 

93 if credentials and credentials_file: 

94 raise core_exceptions.DuplicateCredentialArgs( 

95 "'credentials_file' and 'credentials' are mutually exclusive" 

96 ) 

97 

98 if credentials_file is not None: 

99 credentials, _ = google.auth.load_credentials_from_file( 

100 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id 

101 ) 

102 elif credentials is None: 

103 credentials, _ = google.auth.default( 

104 **scopes_kwargs, quota_project_id=quota_project_id 

105 ) 

106 # Don't apply audience if the credentials file passed from user. 

107 if hasattr(credentials, "with_gdch_audience"): 

108 credentials = credentials.with_gdch_audience( 

109 api_audience if api_audience else host 

110 ) 

111 

112 # If the credentials are service account credentials, then always try to use self signed JWT. 

113 if ( 

114 always_use_jwt_access 

115 and isinstance(credentials, service_account.Credentials) 

116 and hasattr(service_account.Credentials, "with_always_use_jwt_access") 

117 ): 

118 credentials = credentials.with_always_use_jwt_access(True) 

119 

120 # Save the credentials. 

121 self._credentials = credentials 

122 

123 # Save the hostname. Default to port 443 (HTTPS) if none is specified. 

124 if ":" not in host: 

125 host += ":443" 

126 self._host = host 

127 

128 def _prep_wrapped_messages(self, client_info): 

129 # Precompute the wrapped methods. 

130 self._wrapped_methods = { 

131 self.create_write_stream: gapic_v1.method.wrap_method( 

132 self.create_write_stream, 

133 default_retry=retries.Retry( 

134 initial=10.0, 

135 maximum=120.0, 

136 multiplier=1.3, 

137 predicate=retries.if_exception_type( 

138 core_exceptions.DeadlineExceeded, 

139 core_exceptions.ResourceExhausted, 

140 core_exceptions.ServiceUnavailable, 

141 ), 

142 deadline=1200.0, 

143 ), 

144 default_timeout=1200.0, 

145 client_info=client_info, 

146 ), 

147 self.append_rows: gapic_v1.method.wrap_method( 

148 self.append_rows, 

149 default_retry=retries.Retry( 

150 initial=0.1, 

151 maximum=60.0, 

152 multiplier=1.3, 

153 predicate=retries.if_exception_type( 

154 core_exceptions.ServiceUnavailable, 

155 ), 

156 deadline=86400.0, 

157 ), 

158 default_timeout=86400.0, 

159 client_info=client_info, 

160 ), 

161 self.get_write_stream: gapic_v1.method.wrap_method( 

162 self.get_write_stream, 

163 default_retry=retries.Retry( 

164 initial=0.1, 

165 maximum=60.0, 

166 multiplier=1.3, 

167 predicate=retries.if_exception_type( 

168 core_exceptions.DeadlineExceeded, 

169 core_exceptions.ServiceUnavailable, 

170 ), 

171 deadline=600.0, 

172 ), 

173 default_timeout=600.0, 

174 client_info=client_info, 

175 ), 

176 self.finalize_write_stream: gapic_v1.method.wrap_method( 

177 self.finalize_write_stream, 

178 default_retry=retries.Retry( 

179 initial=0.1, 

180 maximum=60.0, 

181 multiplier=1.3, 

182 predicate=retries.if_exception_type( 

183 core_exceptions.DeadlineExceeded, 

184 core_exceptions.ServiceUnavailable, 

185 ), 

186 deadline=600.0, 

187 ), 

188 default_timeout=600.0, 

189 client_info=client_info, 

190 ), 

191 self.batch_commit_write_streams: gapic_v1.method.wrap_method( 

192 self.batch_commit_write_streams, 

193 default_retry=retries.Retry( 

194 initial=0.1, 

195 maximum=60.0, 

196 multiplier=1.3, 

197 predicate=retries.if_exception_type( 

198 core_exceptions.DeadlineExceeded, 

199 core_exceptions.ServiceUnavailable, 

200 ), 

201 deadline=600.0, 

202 ), 

203 default_timeout=600.0, 

204 client_info=client_info, 

205 ), 

206 self.flush_rows: gapic_v1.method.wrap_method( 

207 self.flush_rows, 

208 default_retry=retries.Retry( 

209 initial=0.1, 

210 maximum=60.0, 

211 multiplier=1.3, 

212 predicate=retries.if_exception_type( 

213 core_exceptions.DeadlineExceeded, 

214 core_exceptions.ServiceUnavailable, 

215 ), 

216 deadline=600.0, 

217 ), 

218 default_timeout=600.0, 

219 client_info=client_info, 

220 ), 

221 } 

222 

223 def close(self): 

224 """Closes resources associated with the transport. 

225 

226 .. warning:: 

227 Only call this method if the transport is NOT shared 

228 with other clients - this may cause errors in other clients! 

229 """ 

230 raise NotImplementedError() 

231 

232 @property 

233 def create_write_stream( 

234 self, 

235 ) -> Callable[ 

236 [storage.CreateWriteStreamRequest], 

237 Union[stream.WriteStream, Awaitable[stream.WriteStream]], 

238 ]: 

239 raise NotImplementedError() 

240 

241 @property 

242 def append_rows( 

243 self, 

244 ) -> Callable[ 

245 [storage.AppendRowsRequest], 

246 Union[storage.AppendRowsResponse, Awaitable[storage.AppendRowsResponse]], 

247 ]: 

248 raise NotImplementedError() 

249 

250 @property 

251 def get_write_stream( 

252 self, 

253 ) -> Callable[ 

254 [storage.GetWriteStreamRequest], 

255 Union[stream.WriteStream, Awaitable[stream.WriteStream]], 

256 ]: 

257 raise NotImplementedError() 

258 

259 @property 

260 def finalize_write_stream( 

261 self, 

262 ) -> Callable[ 

263 [storage.FinalizeWriteStreamRequest], 

264 Union[ 

265 storage.FinalizeWriteStreamResponse, 

266 Awaitable[storage.FinalizeWriteStreamResponse], 

267 ], 

268 ]: 

269 raise NotImplementedError() 

270 

271 @property 

272 def batch_commit_write_streams( 

273 self, 

274 ) -> Callable[ 

275 [storage.BatchCommitWriteStreamsRequest], 

276 Union[ 

277 storage.BatchCommitWriteStreamsResponse, 

278 Awaitable[storage.BatchCommitWriteStreamsResponse], 

279 ], 

280 ]: 

281 raise NotImplementedError() 

282 

283 @property 

284 def flush_rows( 

285 self, 

286 ) -> Callable[ 

287 [storage.FlushRowsRequest], 

288 Union[storage.FlushRowsResponse, Awaitable[storage.FlushRowsResponse]], 

289 ]: 

290 raise NotImplementedError() 

291 

292 @property 

293 def kind(self) -> str: 

294 raise NotImplementedError() 

295 

296 

297__all__ = ("BigQueryWriteTransport",)