Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/resourcemanager_v3/services/tag_values/transports/base.py: 75%

76 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-06 06:03 +0000

1# -*- coding: utf-8 -*- 

2# Copyright 2022 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import abc 

17from typing import Awaitable, Callable, Dict, Optional, Sequence, Union 

18 

19import google.api_core 

20from google.api_core import exceptions as core_exceptions 

21from google.api_core import gapic_v1, operations_v1 

22from google.api_core import retry as retries 

23import google.auth # type: ignore 

24from google.auth import credentials as ga_credentials # type: ignore 

25from google.iam.v1 import iam_policy_pb2 # type: ignore 

26from google.iam.v1 import policy_pb2 # type: ignore 

27from google.longrunning import operations_pb2 # type: ignore 

28from google.oauth2 import service_account # type: ignore 

29 

30from google.cloud.resourcemanager_v3 import gapic_version as package_version 

31from google.cloud.resourcemanager_v3.types import tag_values 

32 

33DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

34 gapic_version=package_version.__version__ 

35) 

36 

37 

38class TagValuesTransport(abc.ABC): 

39 """Abstract transport class for TagValues.""" 

40 

41 AUTH_SCOPES = ( 

42 "https://www.googleapis.com/auth/cloud-platform", 

43 "https://www.googleapis.com/auth/cloud-platform.read-only", 

44 ) 

45 

46 DEFAULT_HOST: str = "cloudresourcemanager.googleapis.com" 

47 

48 def __init__( 

49 self, 

50 *, 

51 host: str = DEFAULT_HOST, 

52 credentials: Optional[ga_credentials.Credentials] = None, 

53 credentials_file: Optional[str] = None, 

54 scopes: Optional[Sequence[str]] = None, 

55 quota_project_id: Optional[str] = None, 

56 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

57 always_use_jwt_access: Optional[bool] = False, 

58 api_audience: Optional[str] = None, 

59 **kwargs, 

60 ) -> None: 

61 """Instantiate the transport. 

62 

63 Args: 

64 host (Optional[str]): 

65 The hostname to connect to. 

66 credentials (Optional[google.auth.credentials.Credentials]): The 

67 authorization credentials to attach to requests. These 

68 credentials identify the application to the service; if none 

69 are specified, the client will attempt to ascertain the 

70 credentials from the environment. 

71 credentials_file (Optional[str]): A file with credentials that can 

72 be loaded with :func:`google.auth.load_credentials_from_file`. 

73 This argument is mutually exclusive with credentials. 

74 scopes (Optional[Sequence[str]]): A list of scopes. 

75 quota_project_id (Optional[str]): An optional project to use for billing 

76 and quota. 

77 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

78 The client info used to send a user-agent string along with 

79 API requests. If ``None``, then default info will be used. 

80 Generally, you only need to set this if you're developing 

81 your own client library. 

82 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

83 be used for service account credentials. 

84 """ 

85 

86 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES} 

87 

88 # Save the scopes. 

89 self._scopes = scopes 

90 

91 # If no credentials are provided, then determine the appropriate 

92 # defaults. 

93 if credentials and credentials_file: 

94 raise core_exceptions.DuplicateCredentialArgs( 

95 "'credentials_file' and 'credentials' are mutually exclusive" 

96 ) 

97 

98 if credentials_file is not None: 

99 credentials, _ = google.auth.load_credentials_from_file( 

100 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id 

101 ) 

102 elif credentials is None: 

103 credentials, _ = google.auth.default( 

104 **scopes_kwargs, quota_project_id=quota_project_id 

105 ) 

106 # Don't apply audience if the credentials file passed from user. 

107 if hasattr(credentials, "with_gdch_audience"): 

108 credentials = credentials.with_gdch_audience( 

109 api_audience if api_audience else host 

110 ) 

111 

112 # If the credentials are service account credentials, then always try to use self signed JWT. 

113 if ( 

114 always_use_jwt_access 

115 and isinstance(credentials, service_account.Credentials) 

116 and hasattr(service_account.Credentials, "with_always_use_jwt_access") 

117 ): 

118 credentials = credentials.with_always_use_jwt_access(True) 

119 

120 # Save the credentials. 

121 self._credentials = credentials 

122 

123 # Save the hostname. Default to port 443 (HTTPS) if none is specified. 

124 if ":" not in host: 

125 host += ":443" 

126 self._host = host 

127 

128 def _prep_wrapped_messages(self, client_info): 

129 # Precompute the wrapped methods. 

130 self._wrapped_methods = { 

131 self.list_tag_values: gapic_v1.method.wrap_method( 

132 self.list_tag_values, 

133 default_retry=retries.Retry( 

134 initial=0.1, 

135 maximum=60.0, 

136 multiplier=1.3, 

137 predicate=retries.if_exception_type( 

138 core_exceptions.ServiceUnavailable, 

139 ), 

140 deadline=60.0, 

141 ), 

142 default_timeout=60.0, 

143 client_info=client_info, 

144 ), 

145 self.get_tag_value: gapic_v1.method.wrap_method( 

146 self.get_tag_value, 

147 default_retry=retries.Retry( 

148 initial=0.1, 

149 maximum=60.0, 

150 multiplier=1.3, 

151 predicate=retries.if_exception_type( 

152 core_exceptions.ServiceUnavailable, 

153 ), 

154 deadline=60.0, 

155 ), 

156 default_timeout=60.0, 

157 client_info=client_info, 

158 ), 

159 self.get_namespaced_tag_value: gapic_v1.method.wrap_method( 

160 self.get_namespaced_tag_value, 

161 default_timeout=None, 

162 client_info=client_info, 

163 ), 

164 self.create_tag_value: gapic_v1.method.wrap_method( 

165 self.create_tag_value, 

166 default_timeout=60.0, 

167 client_info=client_info, 

168 ), 

169 self.update_tag_value: gapic_v1.method.wrap_method( 

170 self.update_tag_value, 

171 default_timeout=60.0, 

172 client_info=client_info, 

173 ), 

174 self.delete_tag_value: gapic_v1.method.wrap_method( 

175 self.delete_tag_value, 

176 default_timeout=60.0, 

177 client_info=client_info, 

178 ), 

179 self.get_iam_policy: gapic_v1.method.wrap_method( 

180 self.get_iam_policy, 

181 default_retry=retries.Retry( 

182 initial=0.1, 

183 maximum=60.0, 

184 multiplier=1.3, 

185 predicate=retries.if_exception_type( 

186 core_exceptions.ServiceUnavailable, 

187 ), 

188 deadline=60.0, 

189 ), 

190 default_timeout=60.0, 

191 client_info=client_info, 

192 ), 

193 self.set_iam_policy: gapic_v1.method.wrap_method( 

194 self.set_iam_policy, 

195 default_timeout=60.0, 

196 client_info=client_info, 

197 ), 

198 self.test_iam_permissions: gapic_v1.method.wrap_method( 

199 self.test_iam_permissions, 

200 default_timeout=None, 

201 client_info=client_info, 

202 ), 

203 } 

204 

205 def close(self): 

206 """Closes resources associated with the transport. 

207 

208 .. warning:: 

209 Only call this method if the transport is NOT shared 

210 with other clients - this may cause errors in other clients! 

211 """ 

212 raise NotImplementedError() 

213 

214 @property 

215 def operations_client(self): 

216 """Return the client designed to process long-running operations.""" 

217 raise NotImplementedError() 

218 

219 @property 

220 def list_tag_values( 

221 self, 

222 ) -> Callable[ 

223 [tag_values.ListTagValuesRequest], 

224 Union[ 

225 tag_values.ListTagValuesResponse, 

226 Awaitable[tag_values.ListTagValuesResponse], 

227 ], 

228 ]: 

229 raise NotImplementedError() 

230 

231 @property 

232 def get_tag_value( 

233 self, 

234 ) -> Callable[ 

235 [tag_values.GetTagValueRequest], 

236 Union[tag_values.TagValue, Awaitable[tag_values.TagValue]], 

237 ]: 

238 raise NotImplementedError() 

239 

240 @property 

241 def get_namespaced_tag_value( 

242 self, 

243 ) -> Callable[ 

244 [tag_values.GetNamespacedTagValueRequest], 

245 Union[tag_values.TagValue, Awaitable[tag_values.TagValue]], 

246 ]: 

247 raise NotImplementedError() 

248 

249 @property 

250 def create_tag_value( 

251 self, 

252 ) -> Callable[ 

253 [tag_values.CreateTagValueRequest], 

254 Union[operations_pb2.Operation, Awaitable[operations_pb2.Operation]], 

255 ]: 

256 raise NotImplementedError() 

257 

258 @property 

259 def update_tag_value( 

260 self, 

261 ) -> Callable[ 

262 [tag_values.UpdateTagValueRequest], 

263 Union[operations_pb2.Operation, Awaitable[operations_pb2.Operation]], 

264 ]: 

265 raise NotImplementedError() 

266 

267 @property 

268 def delete_tag_value( 

269 self, 

270 ) -> Callable[ 

271 [tag_values.DeleteTagValueRequest], 

272 Union[operations_pb2.Operation, Awaitable[operations_pb2.Operation]], 

273 ]: 

274 raise NotImplementedError() 

275 

276 @property 

277 def get_iam_policy( 

278 self, 

279 ) -> Callable[ 

280 [iam_policy_pb2.GetIamPolicyRequest], 

281 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]], 

282 ]: 

283 raise NotImplementedError() 

284 

285 @property 

286 def set_iam_policy( 

287 self, 

288 ) -> Callable[ 

289 [iam_policy_pb2.SetIamPolicyRequest], 

290 Union[policy_pb2.Policy, Awaitable[policy_pb2.Policy]], 

291 ]: 

292 raise NotImplementedError() 

293 

294 @property 

295 def test_iam_permissions( 

296 self, 

297 ) -> Callable[ 

298 [iam_policy_pb2.TestIamPermissionsRequest], 

299 Union[ 

300 iam_policy_pb2.TestIamPermissionsResponse, 

301 Awaitable[iam_policy_pb2.TestIamPermissionsResponse], 

302 ], 

303 ]: 

304 raise NotImplementedError() 

305 

306 @property 

307 def get_operation( 

308 self, 

309 ) -> Callable[ 

310 [operations_pb2.GetOperationRequest], 

311 Union[operations_pb2.Operation, Awaitable[operations_pb2.Operation]], 

312 ]: 

313 raise NotImplementedError() 

314 

315 @property 

316 def kind(self) -> str: 

317 raise NotImplementedError() 

318 

319 

320__all__ = ("TagValuesTransport",)