Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/operations_v1/transports/base.py: 58%

48 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 07:30 +0000

1# -*- coding: utf-8 -*- 

2# Copyright 2020 Google LLC 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16import abc 

17from typing import Awaitable, Callable, Optional, Sequence, Union 

18 

19import google.api_core # type: ignore 

20from google.api_core import exceptions as core_exceptions # type: ignore 

21from google.api_core import gapic_v1 # type: ignore 

22from google.api_core import retry as retries # type: ignore 

23from google.api_core import version 

24import google.auth # type: ignore 

25from google.auth import credentials as ga_credentials # type: ignore 

26from google.longrunning import operations_pb2 

27from google.oauth2 import service_account # type: ignore 

28from google.protobuf import empty_pb2 # type: ignore 

29 

30 

31DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( 

32 gapic_version=version.__version__, 

33) 

34 

35 

36class OperationsTransport(abc.ABC): 

37 """Abstract transport class for Operations.""" 

38 

39 AUTH_SCOPES = () 

40 

41 DEFAULT_HOST: str = "longrunning.googleapis.com" 

42 

43 def __init__( 

44 self, 

45 *, 

46 host: str = DEFAULT_HOST, 

47 credentials: ga_credentials.Credentials = None, 

48 credentials_file: Optional[str] = None, 

49 scopes: Optional[Sequence[str]] = None, 

50 quota_project_id: Optional[str] = None, 

51 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO, 

52 always_use_jwt_access: Optional[bool] = False, 

53 **kwargs, 

54 ) -> None: 

55 """Instantiate the transport. 

56 

57 Args: 

58 host (Optional[str]): 

59 The hostname to connect to. 

60 credentials (Optional[google.auth.credentials.Credentials]): The 

61 authorization credentials to attach to requests. These 

62 credentials identify the application to the service; if none 

63 are specified, the client will attempt to ascertain the 

64 credentials from the environment. 

65 credentials_file (Optional[str]): A file with credentials that can 

66 be loaded with :func:`google.auth.load_credentials_from_file`. 

67 This argument is mutually exclusive with credentials. 

68 scopes (Optional[Sequence[str]]): A list of scopes. 

69 quota_project_id (Optional[str]): An optional project to use for billing 

70 and quota. 

71 client_info (google.api_core.gapic_v1.client_info.ClientInfo): 

72 The client info used to send a user-agent string along with 

73 API requests. If ``None``, then default info will be used. 

74 Generally, you only need to set this if you're developing 

75 your own client library. 

76 always_use_jwt_access (Optional[bool]): Whether self signed JWT should 

77 be used for service account credentials. 

78 """ 

79 # Save the hostname. Default to port 443 (HTTPS) if none is specified. 

80 if ":" not in host: 

81 host += ":443" 

82 self._host = host 

83 

84 scopes_kwargs = {"scopes": scopes, "default_scopes": self.AUTH_SCOPES} 

85 

86 # Save the scopes. 

87 self._scopes = scopes 

88 

89 # If no credentials are provided, then determine the appropriate 

90 # defaults. 

91 if credentials and credentials_file: 

92 raise core_exceptions.DuplicateCredentialArgs( 

93 "'credentials_file' and 'credentials' are mutually exclusive" 

94 ) 

95 

96 if credentials_file is not None: 

97 credentials, _ = google.auth.load_credentials_from_file( 

98 credentials_file, **scopes_kwargs, quota_project_id=quota_project_id 

99 ) 

100 

101 elif credentials is None: 

102 credentials, _ = google.auth.default( 

103 **scopes_kwargs, quota_project_id=quota_project_id 

104 ) 

105 

106 # If the credentials are service account credentials, then always try to use self signed JWT. 

107 if ( 

108 always_use_jwt_access 

109 and isinstance(credentials, service_account.Credentials) 

110 and hasattr(service_account.Credentials, "with_always_use_jwt_access") 

111 ): 

112 credentials = credentials.with_always_use_jwt_access(True) 

113 

114 # Save the credentials. 

115 self._credentials = credentials 

116 

117 def _prep_wrapped_messages(self, client_info): 

118 # Precompute the wrapped methods. 

119 self._wrapped_methods = { 

120 self.list_operations: gapic_v1.method.wrap_method( 

121 self.list_operations, 

122 default_retry=retries.Retry( 

123 initial=0.5, 

124 maximum=10.0, 

125 multiplier=2.0, 

126 predicate=retries.if_exception_type( 

127 core_exceptions.ServiceUnavailable, 

128 ), 

129 deadline=10.0, 

130 ), 

131 default_timeout=10.0, 

132 client_info=client_info, 

133 ), 

134 self.get_operation: gapic_v1.method.wrap_method( 

135 self.get_operation, 

136 default_retry=retries.Retry( 

137 initial=0.5, 

138 maximum=10.0, 

139 multiplier=2.0, 

140 predicate=retries.if_exception_type( 

141 core_exceptions.ServiceUnavailable, 

142 ), 

143 deadline=10.0, 

144 ), 

145 default_timeout=10.0, 

146 client_info=client_info, 

147 ), 

148 self.delete_operation: gapic_v1.method.wrap_method( 

149 self.delete_operation, 

150 default_retry=retries.Retry( 

151 initial=0.5, 

152 maximum=10.0, 

153 multiplier=2.0, 

154 predicate=retries.if_exception_type( 

155 core_exceptions.ServiceUnavailable, 

156 ), 

157 deadline=10.0, 

158 ), 

159 default_timeout=10.0, 

160 client_info=client_info, 

161 ), 

162 self.cancel_operation: gapic_v1.method.wrap_method( 

163 self.cancel_operation, 

164 default_retry=retries.Retry( 

165 initial=0.5, 

166 maximum=10.0, 

167 multiplier=2.0, 

168 predicate=retries.if_exception_type( 

169 core_exceptions.ServiceUnavailable, 

170 ), 

171 deadline=10.0, 

172 ), 

173 default_timeout=10.0, 

174 client_info=client_info, 

175 ), 

176 } 

177 

178 def close(self): 

179 """Closes resources associated with the transport. 

180 

181 .. warning:: 

182 Only call this method if the transport is NOT shared 

183 with other clients - this may cause errors in other clients! 

184 """ 

185 raise NotImplementedError() 

186 

187 @property 

188 def list_operations( 

189 self, 

190 ) -> Callable[ 

191 [operations_pb2.ListOperationsRequest], 

192 Union[ 

193 operations_pb2.ListOperationsResponse, 

194 Awaitable[operations_pb2.ListOperationsResponse], 

195 ], 

196 ]: 

197 raise NotImplementedError() 

198 

199 @property 

200 def get_operation( 

201 self, 

202 ) -> Callable[ 

203 [operations_pb2.GetOperationRequest], 

204 Union[operations_pb2.Operation, Awaitable[operations_pb2.Operation]], 

205 ]: 

206 raise NotImplementedError() 

207 

208 @property 

209 def delete_operation( 

210 self, 

211 ) -> Callable[ 

212 [operations_pb2.DeleteOperationRequest], 

213 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]], 

214 ]: 

215 raise NotImplementedError() 

216 

217 @property 

218 def cancel_operation( 

219 self, 

220 ) -> Callable[ 

221 [operations_pb2.CancelOperationRequest], 

222 Union[empty_pb2.Empty, Awaitable[empty_pb2.Empty]], 

223 ]: 

224 raise NotImplementedError() 

225 

226 

227__all__ = ("OperationsTransport",)