Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/transports/base.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

52 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2024 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import abc 

17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union 

18 

19import google.api_core 

20from google.api_core import exceptions as core_exceptions 

21from google.api_core import gapic_v1 

22from google.api_core import retry as retries 

23import google.auth # type: ignore 

24from google.auth import credentials as ga_credentials # type: ignore 

25from google.oauth2 import service_account # type: ignore 

26 

27from google.cloud.bigquery_storage_v1 import gapic_version as package_version 

28from google.cloud.bigquery_storage_v1.types import storage, stream 

29 

30DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

31 gapic_version=package_version.__version__ 

32) 

33 

34 

35class BigQueryReadTransport(abc.ABC): 

36 """Abstract transport class for BigQueryRead.""" 

37 

38 AUTH_SCOPES = ( 

39 "https://www.googleapis.com/auth/bigquery", 

40 "https://www.googleapis.com/auth/cloud-platform", 

41 ) 

42 

43 DEFAULT_HOST: str = "bigquerystorage.googleapis.com" 

44 

45 def __init__( 

46 self, 

47 *, 

48 host: str = DEFAULT_HOST, 

49 credentials: Optional[ga_credentials.Credentials] = None, 

50 credentials_file: Optional[str] = None, 

51 scopes: Optional[Sequence[str]] = None, 

52 quota_project_id: Optional[str] = None, 

53 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

54 always_use_jwt_access: Optional[bool] = False, 

55 api_audience: Optional[str] = None, 

56 **kwargs, 

57 ) -> None: 

58 """Instantiate the transport. 

59 

60 Args: 

61 host (Optional[str]): 

62 The hostname to connect to (default: 'bigquerystorage.googleapis.com'). 

63 credentials (Optional[google.auth.credentials.Credentials]): The 

64 authorization credentials to attach to requests. These 

65 credentials identify the application to the service; if none 

66 are specified, the client will attempt to ascertain the 

67 credentials from the environment. 

68 credentials_file (Optional[str]): A file with credentials that can 

69 be loaded with :func:`google.auth.load_credentials_from_file`. 

70 This argument is mutually exclusive with credentials. 

71 scopes (Optional[Sequence[str]]): A list of scopes. 

72 quota_project_id (Optional[str]): An optional project to use for billing 

73 and quota. 

74 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

75 The client info used to send a user-agent string along with 

76 API requests. If ``None``, then default info will be used. 

77 Generally, you only need to set this if you're developing 

78 your own client library. 

79 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

80 be used for service account credentials. 

81 """ 

82 

83 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES} 

84 

85 # Save the scopes. 

86 self._scopes = scopes 

87 

88 # If no credentials are provided, then determine the appropriate 

89 # defaults. 

90 if credentials and credentials_file: 

91 raise core_exceptions.DuplicateCredentialArgs( 

92 "'credentials_file' and 'credentials' are mutually exclusive" 

93 ) 

94 

95 if credentials_file is not None: 

96 credentials, _ = google.auth.load_credentials_from_file( 

97 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id 

98 ) 

99 elif credentials is None: 

100 credentials, _ = google.auth.default( 

101 **scopes_kwargs, quota_project_id=quota_project_id 

102 ) 

103 # Don't apply audience if the credentials file passed from user. 

104 if hasattr(credentials, "with_gdch_audience"): 

105 credentials = credentials.with_gdch_audience( 

106 api_audience if api_audience else host 

107 ) 

108 

109 # If the credentials are service account credentials, then always try to use self signed JWT. 

110 if ( 

111 always_use_jwt_access 

112 and isinstance(credentials, service_account.Credentials) 

113 and hasattr(service_account.Credentials, "with_always_use_jwt_access") 

114 ): 

115 credentials = credentials.with_always_use_jwt_access(True) 

116 

117 # Save the credentials. 

118 self._credentials = credentials 

119 

120 # Save the hostname. Default to port 443 (HTTPS) if none is specified. 

121 if ":" not in host: 

122 host += ":443" 

123 self._host = host 

124 

125 @property 

126 def host(self): 

127 return self._host 

128 

129 def _prep_wrapped_messages(self, client_info): 

130 # Precompute the wrapped methods. 

131 self._wrapped_methods = { 

132 self.create_read_session: gapic_v1.method.wrap_method( 

133 self.create_read_session, 

134 default_retry=retries.Retry( 

135 initial=0.1, 

136 maximum=60.0, 

137 multiplier=1.3, 

138 predicate=retries.if_exception_type( 

139 core_exceptions.DeadlineExceeded, 

140 core_exceptions.ServiceUnavailable, 

141 ), 

142 deadline=600.0, 

143 ), 

144 default_timeout=600.0, 

145 client_info=client_info, 

146 ), 

147 self.read_rows: gapic_v1.method.wrap_method( 

148 self.read_rows, 

149 default_retry=retries.Retry( 

150 initial=0.1, 

151 maximum=60.0, 

152 multiplier=1.3, 

153 predicate=retries.if_exception_type( 

154 core_exceptions.ServiceUnavailable, 

155 ), 

156 deadline=86400.0, 

157 ), 

158 default_timeout=86400.0, 

159 client_info=client_info, 

160 ), 

161 self.split_read_stream: gapic_v1.method.wrap_method( 

162 self.split_read_stream, 

163 default_retry=retries.Retry( 

164 initial=0.1, 

165 maximum=60.0, 

166 multiplier=1.3, 

167 predicate=retries.if_exception_type( 

168 core_exceptions.DeadlineExceeded, 

169 core_exceptions.ServiceUnavailable, 

170 ), 

171 deadline=600.0, 

172 ), 

173 default_timeout=600.0, 

174 client_info=client_info, 

175 ), 

176 } 

177 

178 def close(self): 

179 """Closes resources associated with the transport. 

180 

181 .. warning:: 

182 Only call this method if the transport is NOT shared 

183 with other clients - this may cause errors in other clients! 

184 """ 

185 raise NotImplementedError() 

186 

187 @property 

188 def create_read_session( 

189 self, 

190 ) -> Callable[ 

191 [storage.CreateReadSessionRequest], 

192 Union[stream.ReadSession, Awaitable[stream.ReadSession]], 

193 ]: 

194 raise NotImplementedError() 

195 

196 @property 

197 def read_rows( 

198 self, 

199 ) -> Callable[ 

200 [storage.ReadRowsRequest], 

201 Union[storage.ReadRowsResponse, Awaitable[storage.ReadRowsResponse]], 

202 ]: 

203 raise NotImplementedError() 

204 

205 @property 

206 def split_read_stream( 

207 self, 

208 ) -> Callable[ 

209 [storage.SplitReadStreamRequest], 

210 Union[ 

211 storage.SplitReadStreamResponse, Awaitable[storage.SplitReadStreamResponse] 

212 ], 

213 ]: 

214 raise NotImplementedError() 

215 

216 @property 

217 def kind(self) -> str: 

218 raise NotImplementedError() 

219 

220 

221__all__ = ("BigQueryReadTransport",)