Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/transports/base.py: 54%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

56 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2025 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import abc 

17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union 

18 

19import google.api_core 

20from google.api_core import exceptions as core_exceptions 

21from google.api_core import gapic_v1 

22from google.api_core import retry as retries 

23import google.auth # type: ignore 

24from google.auth import credentials as ga_credentials # type: ignore 

25from google.oauth2 import service_account # type: ignore 

26import google.protobuf 

27 

28from google.cloud.bigquery_storage_v1 import gapic_version as package_version 

29from google.cloud.bigquery_storage_v1.types import storage, stream 

30 

31DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

32 gapic_version=package_version.__version__ 

33) 

34 

35if hasattr(DEFAULT_CLIENT_INFO, "protobuf_runtime_version"): # pragma: NO COVER 

36 DEFAULT_CLIENT_INFO.protobuf_runtime_version = google.protobuf.__version__ 

37 

38 

39class BigQueryReadTransport(abc.ABC): 

40 """Abstract transport class for BigQueryRead.""" 

41 

42 AUTH_SCOPES = ( 

43 "https://www.googleapis.com/auth/bigquery", 

44 "https://www.googleapis.com/auth/cloud-platform", 

45 ) 

46 

47 DEFAULT_HOST: str = "bigquerystorage.googleapis.com" 

48 

49 def __init__( 

50 self, 

51 *, 

52 host: str = DEFAULT_HOST, 

53 credentials: Optional[ga_credentials.Credentials] = None, 

54 credentials_file: Optional[str] = None, 

55 scopes: Optional[Sequence[str]] = None, 

56 quota_project_id: Optional[str] = None, 

57 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

58 always_use_jwt_access: Optional[bool] = False, 

59 api_audience: Optional[str] = None, 

60 **kwargs, 

61 ) -> None: 

62 """Instantiate the transport. 

63 

64 Args: 

65 host (Optional[str]): 

66 The hostname to connect to (default: 'bigquerystorage.googleapis.com'). 

67 credentials (Optional[google.auth.credentials.Credentials]): The 

68 authorization credentials to attach to requests. These 

69 credentials identify the application to the service; if none 

70 are specified, the client will attempt to ascertain the 

71 credentials from the environment. 

72 credentials_file (Optional[str]): A file with credentials that can 

73 be loaded with :func:`google.auth.load_credentials_from_file`. 

74 This argument is mutually exclusive with credentials. 

75 scopes (Optional[Sequence[str]]): A list of scopes. 

76 quota_project_id (Optional[str]): An optional project to use for billing 

77 and quota. 

78 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

79 The client info used to send a user-agent string along with 

80 API requests. If ``None``, then default info will be used. 

81 Generally, you only need to set this if you're developing 

82 your own client library. 

83 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

84 be used for service account credentials. 

85 """ 

86 

87 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES} 

88 

89 # Save the scopes. 

90 self._scopes = scopes 

91 if not hasattr(self, "_ignore_credentials"): 

92 self._ignore_credentials: bool = False 

93 

94 # If no credentials are provided, then determine the appropriate 

95 # defaults. 

96 if credentials and credentials_file: 

97 raise core_exceptions.DuplicateCredentialArgs( 

98 "'credentials_file' and 'credentials' are mutually exclusive" 

99 ) 

100 

101 if credentials_file is not None: 

102 credentials, _ = google.auth.load_credentials_from_file( 

103 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id 

104 ) 

105 elif credentials is None and not self._ignore_credentials: 

106 credentials, _ = google.auth.default( 

107 **scopes_kwargs, quota_project_id=quota_project_id 

108 ) 

109 # Don't apply audience if the credentials file passed from user. 

110 if hasattr(credentials, "with_gdch_audience"): 

111 credentials = credentials.with_gdch_audience( 

112 api_audience if api_audience else host 

113 ) 

114 

115 # If the credentials are service account credentials, then always try to use self signed JWT. 

116 if ( 

117 always_use_jwt_access 

118 and isinstance(credentials, service_account.Credentials) 

119 and hasattr(service_account.Credentials, "with_always_use_jwt_access") 

120 ): 

121 credentials = credentials.with_always_use_jwt_access(True) 

122 

123 # Save the credentials. 

124 self._credentials = credentials 

125 

126 # Save the hostname. Default to port 443 (HTTPS) if none is specified. 

127 if ":" not in host: 

128 host += ":443" 

129 self._host = host 

130 

131 @property 

132 def host(self): 

133 return self._host 

134 

135 def _prep_wrapped_messages(self, client_info): 

136 # Precompute the wrapped methods. 

137 self._wrapped_methods = { 

138 self.create_read_session: gapic_v1.method.wrap_method( 

139 self.create_read_session, 

140 default_retry=retries.Retry( 

141 initial=0.1, 

142 maximum=60.0, 

143 multiplier=1.3, 

144 predicate=retries.if_exception_type( 

145 core_exceptions.DeadlineExceeded, 

146 core_exceptions.ServiceUnavailable, 

147 ), 

148 deadline=600.0, 

149 ), 

150 default_timeout=600.0, 

151 client_info=client_info, 

152 ), 

153 self.read_rows: gapic_v1.method.wrap_method( 

154 self.read_rows, 

155 default_retry=retries.Retry( 

156 initial=0.1, 

157 maximum=60.0, 

158 multiplier=1.3, 

159 predicate=retries.if_exception_type( 

160 core_exceptions.ServiceUnavailable, 

161 ), 

162 deadline=86400.0, 

163 ), 

164 default_timeout=86400.0, 

165 client_info=client_info, 

166 ), 

167 self.split_read_stream: gapic_v1.method.wrap_method( 

168 self.split_read_stream, 

169 default_retry=retries.Retry( 

170 initial=0.1, 

171 maximum=60.0, 

172 multiplier=1.3, 

173 predicate=retries.if_exception_type( 

174 core_exceptions.DeadlineExceeded, 

175 core_exceptions.ServiceUnavailable, 

176 ), 

177 deadline=600.0, 

178 ), 

179 default_timeout=600.0, 

180 client_info=client_info, 

181 ), 

182 } 

183 

184 def close(self): 

185 """Closes resources associated with the transport. 

186 

187 .. warning:: 

188 Only call this method if the transport is NOT shared 

189 with other clients - this may cause errors in other clients! 

190 """ 

191 raise NotImplementedError() 

192 

193 @property 

194 def create_read_session( 

195 self, 

196 ) -> Callable[ 

197 [storage.CreateReadSessionRequest], 

198 Union[stream.ReadSession, Awaitable[stream.ReadSession]], 

199 ]: 

200 raise NotImplementedError() 

201 

202 @property 

203 def read_rows( 

204 self, 

205 ) -> Callable[ 

206 [storage.ReadRowsRequest], 

207 Union[storage.ReadRowsResponse, Awaitable[storage.ReadRowsResponse]], 

208 ]: 

209 raise NotImplementedError() 

210 

211 @property 

212 def split_read_stream( 

213 self, 

214 ) -> Callable[ 

215 [storage.SplitReadStreamRequest], 

216 Union[ 

217 storage.SplitReadStreamResponse, Awaitable[storage.SplitReadStreamResponse] 

218 ], 

219 ]: 

220 raise NotImplementedError() 

221 

222 @property 

223 def kind(self) -> str: 

224 raise NotImplementedError() 

225 

226 

227__all__ = ("BigQueryReadTransport",)