Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/api_core/operations_v1/transports/base.py: 54%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

61 statements  

1# -*- coding: utf-8 -*- 

2# Copyright 2020 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import abc 

17import re 

18from typing import Awaitable, Callable, Optional, Sequence, Union 

19 

20import google.api_core # type: ignore 

21from google.api_core import exceptions as core_exceptions # type: ignore 

22from google.api_core import gapic_v1 # type: ignore 

23from google.api_core import retry as retries # type: ignore 

24from google.api_core import version 

25import google.auth # type: ignore 

26from google.auth import credentials as ga_credentials # type: ignore 

27from google.longrunning import operations_pb2 

28from google.oauth2 import service_account # type: ignore 

29import google.protobuf 

30from google.protobuf import empty_pb2, json_format # type: ignore 

31from grpc import Compression 

32 

33 

34PROTOBUF_VERSION = google.protobuf.__version__ 

35 

36DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

37 gapic_version=version.__version__, 

38) 

39 

40 

41class OperationsTransport(abc.ABC): 

42 """Abstract transport class for Operations.""" 

43 

44 AUTH_SCOPES = () 

45 

46 DEFAULT_HOST: str = "longrunning.googleapis.com" 

47 

48 def __init__( 

49 self, 

50 *, 

51 host: str = DEFAULT_HOST, 

52 # TODO(https://github.com/googleapis/python-api-core/issues/709): update type hint for credentials to include `google.auth.aio.Credentials`. 

53 credentials: Optional[ga_credentials.Credentials] = None, 

54 credentials_file: Optional[str] = None, 

55 scopes: Optional[Sequence[str]] = None, 

56 quota_project_id: Optional[str] = None, 

57 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

58 always_use_jwt_access: Optional[bool] = False, 

59 url_scheme="https", 

60 **kwargs, 

61 ) -> None: 

62 """Instantiate the transport. 

63 

64 Args: 

65 host (Optional[str]): 

66 The hostname to connect to. 

67 credentials (Optional[google.auth.credentials.Credentials]): The 

68 authorization credentials to attach to requests. These 

69 credentials identify the application to the service; if none 

70 are specified, the client will attempt to ascertain the 

71 credentials from the environment. 

72 credentials_file (Optional[str]): A file with credentials that can 

73 be loaded with :func:`google.auth.load_credentials_from_file`. 

74 This argument is mutually exclusive with credentials. 

75 

76 .. warning:: 

77 Important: If you accept a credential configuration (credential JSON/File/Stream) 

78 from an external source for authentication to Google Cloud Platform, you must 

79 validate it before providing it to any Google API or client library. Providing an 

80 unvalidated credential configuration to Google APIs or libraries can compromise 

81 the security of your systems and data. For more information, refer to 

82 `Validate credential configurations from external sources`_. 

83 

84 .. _Validate credential configurations from external sources: 

85 

86 https://cloud.google.com/docs/authentication/external/externally-sourced-credentials 

87 scopes (Optional[Sequence[str]]): A list of scopes. 

88 quota_project_id (Optional[str]): An optional project to use for billing 

89 and quota. 

90 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

91 The client info used to send a user-agent string along with 

92 API requests. If ``None``, then default info will be used. 

93 Generally, you only need to set this if you're developing 

94 your own client library. 

95 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

96 be used for service account credentials. 

97 url_scheme: the protocol scheme for the API endpoint. Normally 

98 "https", but for testing or local servers, 

99 "http" can be specified. 

100 """ 

101 maybe_url_match = re.match("^(?P<scheme>http(?:s)?://)?(?P<host>.*)$", host) 

102 if maybe_url_match is None: 

103 raise ValueError( 

104 f"Unexpected hostname structure: {host}" 

105 ) # pragma: NO COVER 

106 

107 url_match_items = maybe_url_match.groupdict() 

108 

109 host = f"{url_scheme}://{host}" if not url_match_items["scheme"] else host 

110 

111 # Save the hostname. Default to port 443 (HTTPS) if none is specified. 

112 if ":" not in host: 

113 host += ":443" # pragma: NO COVER 

114 self._host = host 

115 

116 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES} 

117 

118 # Save the scopes. 

119 self._scopes = scopes 

120 

121 # If no credentials are provided, then determine the appropriate 

122 # defaults. 

123 if credentials and credentials_file: 

124 raise core_exceptions.DuplicateCredentialArgs( 

125 "'credentials_file' and 'credentials' are mutually exclusive" 

126 ) 

127 

128 if credentials_file is not None: 

129 credentials, _ = google.auth.load_credentials_from_file( 

130 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id 

131 ) 

132 

133 elif credentials is None: 

134 credentials, _ = google.auth.default( 

135 **scopes_kwargs, quota_project_id=quota_project_id 

136 ) 

137 

138 # If the credentials are service account credentials, then always try to use self signed JWT. 

139 if ( 

140 always_use_jwt_access 

141 and isinstance(credentials, service_account.Credentials) 

142 and hasattr(service_account.Credentials, "with_always_use_jwt_access") 

143 ): 

144 credentials = credentials.with_always_use_jwt_access(True) 

145 

146 # Save the credentials. 

147 self._credentials = credentials 

148 

149 def _prep_wrapped_messages(self, client_info): 

150 # Precompute the wrapped methods. 

151 self._wrapped_methods = { 

152 self.list_operations: gapic_v1.method.wrap_method( 

153 self.list_operations, 

154 default_retry=retries.Retry( 

155 initial=0.5, 

156 maximum=10.0, 

157 multiplier=2.0, 

158 predicate=retries.if_exception_type( 

159 core_exceptions.ServiceUnavailable, 

160 ), 

161 deadline=10.0, 

162 ), 

163 default_timeout=10.0, 

164 default_compression=Compression.NoCompression, 

165 client_info=client_info, 

166 ), 

167 self.get_operation: gapic_v1.method.wrap_method( 

168 self.get_operation, 

169 default_retry=retries.Retry( 

170 initial=0.5, 

171 maximum=10.0, 

172 multiplier=2.0, 

173 predicate=retries.if_exception_type( 

174 core_exceptions.ServiceUnavailable, 

175 ), 

176 deadline=10.0, 

177 ), 

178 default_timeout=10.0, 

179 default_compression=Compression.NoCompression, 

180 client_info=client_info, 

181 ), 

182 self.delete_operation: gapic_v1.method.wrap_method( 

183 self.delete_operation, 

184 default_retry=retries.Retry( 

185 initial=0.5, 

186 maximum=10.0, 

187 multiplier=2.0, 

188 predicate=retries.if_exception_type( 

189 core_exceptions.ServiceUnavailable, 

190 ), 

191 deadline=10.0, 

192 ), 

193 default_timeout=10.0, 

194 default_compression=Compression.NoCompression, 

195 client_info=client_info, 

196 ), 

197 self.cancel_operation: gapic_v1.method.wrap_method( 

198 self.cancel_operation, 

199 default_retry=retries.Retry( 

200 initial=0.5, 

201 maximum=10.0, 

202 multiplier=2.0, 

203 predicate=retries.if_exception_type( 

204 core_exceptions.ServiceUnavailable, 

205 ), 

206 deadline=10.0, 

207 ), 

208 default_timeout=10.0, 

209 default_compression=Compression.NoCompression, 

210 client_info=client_info, 

211 ), 

212 } 

213 

214 def close(self): 

215 """Closes resources associated with the transport. 

216 

217 .. warning:: 

218 Only call this method if the transport is NOT shared 

219 with other clients - this may cause errors in other clients! 

220 """ 

221 raise NotImplementedError() 

222 

223 def _convert_protobuf_message_to_dict( 

224 self, message: google.protobuf.message.Message 

225 ): 

226 r"""Converts protobuf message to a dictionary. 

227 

228 When the dictionary is encoded to JSON, it conforms to proto3 JSON spec. 

229 

230 Args: 

231 message(google.protobuf.message.Message): The protocol buffers message 

232 instance to serialize. 

233 

234 Returns: 

235 A dict representation of the protocol buffer message. 

236 """ 

237 # TODO(https://github.com/googleapis/python-api-core/issues/643): For backwards compatibility 

238 # with protobuf 3.x 4.x, Remove once support for protobuf 3.x and 4.x is dropped. 

239 if PROTOBUF_VERSION[0:2] in ["3.", "4."]: 

240 result = json_format.MessageToDict( 

241 message, 

242 preserving_proto_field_name=True, 

243 including_default_value_fields=True, # type: ignore # backward compatibility 

244 ) 

245 else: 

246 result = json_format.MessageToDict( 

247 message, 

248 preserving_proto_field_name=True, 

249 always_print_fields_with_no_presence=True, 

250 ) 

251 

252 return result 

253 

254 @property 

255 def list_operations( 

256 self, 

257 ) -> Callable[ 

258 [operations_pb2.ListOperationsRequest], 

259 Union[ 

260 operations_pb2.ListOperationsResponse, 

261 Awaitable[operations_pb2.ListOperationsResponse], 

262 ], 

263 ]: 

264 raise NotImplementedError() 

265 

266 @property 

267 def get_operation( 

268 self, 

269 ) -> Callable[ 

270 [operations_pb2.GetOperationRequest], 

271 Union[operations_pb2.Operation, Awaitable[operations_pb2.Operation]], 

272 ]: 

273 raise NotImplementedError() 

274 

275 @property 

276 def delete_operation( 

277 self, 

278 ) -> Callable[ 

279 [operations_pb2.DeleteOperationRequest], 

280 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]], 

281 ]: 

282 raise NotImplementedError() 

283 

284 @property 

285 def cancel_operation( 

286 self, 

287 ) -> Callable[ 

288 [operations_pb2.CancelOperationRequest], 

289 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]], 

290 ]: 

291 raise NotImplementedError() 

292 

293 

294__all__ = ("OperationsTransport",)