Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/transports/base.py: 56%

50 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:10 +0000

1# -*- coding: utf-8 -*- 

2# Copyright 2022 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import abc 

17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union 

18 

19from google.cloud.bigquery_storage_v1 import gapic_version as package_version 

20 

21import google.auth # type: ignore 

22import google.api_core 

23from google.api_core import exceptions as core_exceptions 

24from google.api_core import gapic_v1 

25from google.api_core import retry as retries 

26from google.auth import credentials as ga_credentials # type: ignore 

27from google.oauth2 import service_account # type: ignore 

28 

29from google.cloud.bigquery_storage_v1.types import storage 

30from google.cloud.bigquery_storage_v1.types import stream 

31 

32DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

33 gapic_version=package_version.__version__ 

34) 

35 

36 

37class BigQueryReadTransport(abc.ABC): 

38 """Abstract transport class for BigQueryRead.""" 

39 

40 AUTH_SCOPES = ( 

41 "https://www.googleapis.com/auth/bigquery", 

42 "https://www.googleapis.com/auth/cloud-platform", 

43 ) 

44 

45 DEFAULT_HOST: str = "bigquerystorage.googleapis.com" 

46 

47 def __init__( 

48 self, 

49 *, 

50 host: str = DEFAULT_HOST, 

51 credentials: Optional[ga_credentials.Credentials] = None, 

52 credentials_file: Optional[str] = None, 

53 scopes: Optional[Sequence[str]] = None, 

54 quota_project_id: Optional[str] = None, 

55 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

56 always_use_jwt_access: Optional[bool] = False, 

57 api_audience: Optional[str] = None, 

58 **kwargs, 

59 ) -> None: 

60 """Instantiate the transport. 

61 

62 Args: 

63 host (Optional[str]): 

64 The hostname to connect to. 

65 credentials (Optional[google.auth.credentials.Credentials]): The 

66 authorization credentials to attach to requests. These 

67 credentials identify the application to the service; if none 

68 are specified, the client will attempt to ascertain the 

69 credentials from the environment. 

70 credentials_file (Optional[str]): A file with credentials that can 

71 be loaded with :func:`google.auth.load_credentials_from_file`. 

72 This argument is mutually exclusive with credentials. 

73 scopes (Optional[Sequence[str]]): A list of scopes. 

74 quota_project_id (Optional[str]): An optional project to use for billing 

75 and quota. 

76 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

77 The client info used to send a user-agent string along with 

78 API requests. If ``None``, then default info will be used. 

79 Generally, you only need to set this if you're developing 

80 your own client library. 

81 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

82 be used for service account credentials. 

83 """ 

84 

85 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES} 

86 

87 # Save the scopes. 

88 self._scopes = scopes 

89 

90 # If no credentials are provided, then determine the appropriate 

91 # defaults. 

92 if credentials and credentials_file: 

93 raise core_exceptions.DuplicateCredentialArgs( 

94 "'credentials_file' and 'credentials' are mutually exclusive" 

95 ) 

96 

97 if credentials_file is not None: 

98 credentials, _ = google.auth.load_credentials_from_file( 

99 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id 

100 ) 

101 elif credentials is None: 

102 credentials, _ = google.auth.default( 

103 **scopes_kwargs, quota_project_id=quota_project_id 

104 ) 

105 # Don't apply audience if the credentials file passed from user. 

106 if hasattr(credentials, "with_gdch_audience"): 

107 credentials = credentials.with_gdch_audience( 

108 api_audience if api_audience else host 

109 ) 

110 

111 # If the credentials are service account credentials, then always try to use self signed JWT. 

112 if ( 

113 always_use_jwt_access 

114 and isinstance(credentials, service_account.Credentials) 

115 and hasattr(service_account.Credentials, "with_always_use_jwt_access") 

116 ): 

117 credentials = credentials.with_always_use_jwt_access(True) 

118 

119 # Save the credentials. 

120 self._credentials = credentials 

121 

122 # Save the hostname. Default to port 443 (HTTPS) if none is specified. 

123 if ":" not in host: 

124 host += ":443" 

125 self._host = host 

126 

127 def _prep_wrapped_messages(self, client_info): 

128 # Precompute the wrapped methods. 

129 self._wrapped_methods = { 

130 self.create_read_session: gapic_v1.method.wrap_method( 

131 self.create_read_session, 

132 default_retry=retries.Retry( 

133 initial=0.1, 

134 maximum=60.0, 

135 multiplier=1.3, 

136 predicate=retries.if_exception_type( 

137 core_exceptions.DeadlineExceeded, 

138 core_exceptions.ServiceUnavailable, 

139 ), 

140 deadline=600.0, 

141 ), 

142 default_timeout=600.0, 

143 client_info=client_info, 

144 ), 

145 self.read_rows: gapic_v1.method.wrap_method( 

146 self.read_rows, 

147 default_retry=retries.Retry( 

148 initial=0.1, 

149 maximum=60.0, 

150 multiplier=1.3, 

151 predicate=retries.if_exception_type( 

152 core_exceptions.ServiceUnavailable, 

153 ), 

154 deadline=86400.0, 

155 ), 

156 default_timeout=86400.0, 

157 client_info=client_info, 

158 ), 

159 self.split_read_stream: gapic_v1.method.wrap_method( 

160 self.split_read_stream, 

161 default_retry=retries.Retry( 

162 initial=0.1, 

163 maximum=60.0, 

164 multiplier=1.3, 

165 predicate=retries.if_exception_type( 

166 core_exceptions.DeadlineExceeded, 

167 core_exceptions.ServiceUnavailable, 

168 ), 

169 deadline=600.0, 

170 ), 

171 default_timeout=600.0, 

172 client_info=client_info, 

173 ), 

174 } 

175 

176 def close(self): 

177 """Closes resources associated with the transport. 

178 

179 .. warning:: 

180 Only call this method if the transport is NOT shared 

181 with other clients - this may cause errors in other clients! 

182 """ 

183 raise NotImplementedError() 

184 

185 @property 

186 def create_read_session( 

187 self, 

188 ) -> Callable[ 

189 [storage.CreateReadSessionRequest], 

190 Union[stream.ReadSession, Awaitable[stream.ReadSession]], 

191 ]: 

192 raise NotImplementedError() 

193 

194 @property 

195 def read_rows( 

196 self, 

197 ) -> Callable[ 

198 [storage.ReadRowsRequest], 

199 Union[storage.ReadRowsResponse, Awaitable[storage.ReadRowsResponse]], 

200 ]: 

201 raise NotImplementedError() 

202 

203 @property 

204 def split_read_stream( 

205 self, 

206 ) -> Callable[ 

207 [storage.SplitReadStreamRequest], 

208 Union[ 

209 storage.SplitReadStreamResponse, Awaitable[storage.SplitReadStreamResponse] 

210 ], 

211 ]: 

212 raise NotImplementedError() 

213 

214 @property 

215 def kind(self) -> str: 

216 raise NotImplementedError() 

217 

218 

219__all__ = ("BigQueryReadTransport",)