Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/auth/transport/grpc.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

61 statements  

1# Copyright 2016 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Authorization support for gRPC.""" 

16 

17from __future__ import absolute_import 

18 

19import logging 

20 

21from google.auth import exceptions 

22from google.auth.transport import _mtls_helper 

23from google.oauth2 import service_account 

24 

25try: 

26 import grpc # type: ignore 

27except ImportError as caught_exc: # pragma: NO COVER 

28 raise ImportError( 

29 "gRPC is not installed from please install the grpcio package to use the gRPC transport." 

30 ) from caught_exc 

31 

32_LOGGER = logging.getLogger(__name__) 

33 

34 

35class AuthMetadataPlugin(grpc.AuthMetadataPlugin): 

36 """A `gRPC AuthMetadataPlugin`_ that inserts the credentials into each 

37 request. 

38 

39 .. _gRPC AuthMetadataPlugin: 

40 http://www.grpc.io/grpc/python/grpc.html#grpc.AuthMetadataPlugin 

41 

42 Args: 

43 credentials (google.auth.credentials.Credentials): The credentials to 

44 add to requests. 

45 request (google.auth.transport.Request): A HTTP transport request 

46 object used to refresh credentials as needed. 

47 default_host (Optional[str]): A host like "pubsub.googleapis.com". 

48 This is used when a self-signed JWT is created from service 

49 account credentials. 

50 """ 

51 

52 def __init__(self, credentials, request, default_host=None): 

53 # pylint: disable=no-value-for-parameter 

54 # pylint doesn't realize that the super method takes no arguments 

55 # because this class is the same name as the superclass. 

56 super(AuthMetadataPlugin, self).__init__() 

57 self._credentials = credentials 

58 self._request = request 

59 self._default_host = default_host 

60 

61 def _get_authorization_headers(self, context): 

62 """Gets the authorization headers for a request. 

63 

64 Returns: 

65 Sequence[Tuple[str, str]]: A list of request headers (key, value) 

66 to add to the request. 

67 """ 

68 headers = {} 

69 

70 # https://google.aip.dev/auth/4111 

71 # Attempt to use self-signed JWTs when a service account is used. 

72 # A default host must be explicitly provided since it cannot always 

73 # be determined from the context.service_url. 

74 if isinstance(self._credentials, service_account.Credentials): 

75 self._credentials._create_self_signed_jwt( 

76 "https://{}/".format(self._default_host) if self._default_host else None 

77 ) 

78 

79 self._credentials.before_request( 

80 self._request, context.method_name, context.service_url, headers 

81 ) 

82 

83 return list(headers.items()) 

84 

85 def __call__(self, context, callback): 

86 """Passes authorization metadata into the given callback. 

87 

88 Args: 

89 context (grpc.AuthMetadataContext): The RPC context. 

90 callback (grpc.AuthMetadataPluginCallback): The callback that will 

91 be invoked to pass in the authorization metadata. 

92 """ 

93 callback(self._get_authorization_headers(context), None) 

94 

95 

96def secure_authorized_channel( 

97 credentials, 

98 request, 

99 target, 

100 ssl_credentials=None, 

101 client_cert_callback=None, 

102 **kwargs 

103): 

104 """Creates a secure authorized gRPC channel. 

105 

106 This creates a channel with SSL and :class:`AuthMetadataPlugin`. This 

107 channel can be used to create a stub that can make authorized requests. 

108 Users can configure client certificate or rely on device certificates to 

109 establish a mutual TLS channel, if the `GOOGLE_API_USE_CLIENT_CERTIFICATE` 

110 variable is explicitly set to `true`. 

111 

112 Example:: 

113 

114 import google.auth 

115 import google.auth.transport.grpc 

116 import google.auth.transport.requests 

117 from google.cloud.speech.v1 import cloud_speech_pb2 

118 

119 # Get credentials. 

120 credentials, _ = google.auth.default() 

121 

122 # Get an HTTP request function to refresh credentials. 

123 request = google.auth.transport.requests.Request() 

124 

125 # Create a channel. 

126 channel = google.auth.transport.grpc.secure_authorized_channel( 

127 credentials, regular_endpoint, request, 

128 ssl_credentials=grpc.ssl_channel_credentials()) 

129 

130 # Use the channel to create a stub. 

131 cloud_speech.create_Speech_stub(channel) 

132 

133 Usage: 

134 

135 There are actually a couple of options to create a channel, depending on if 

136 you want to create a regular or mutual TLS channel. 

137 

138 First let's list the endpoints (regular vs mutual TLS) to choose from:: 

139 

140 regular_endpoint = 'speech.googleapis.com:443' 

141 mtls_endpoint = 'speech.mtls.googleapis.com:443' 

142 

143 Option 1: create a regular (non-mutual) TLS channel by explicitly setting 

144 the ssl_credentials:: 

145 

146 regular_ssl_credentials = grpc.ssl_channel_credentials() 

147 

148 channel = google.auth.transport.grpc.secure_authorized_channel( 

149 credentials, regular_endpoint, request, 

150 ssl_credentials=regular_ssl_credentials) 

151 

152 Option 2: create a mutual TLS channel by calling a callback which returns 

153 the client side certificate and the key (Note that 

154 `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable must be explicitly 

155 set to `true`):: 

156 

157 def my_client_cert_callback(): 

158 code_to_load_client_cert_and_key() 

159 if loaded: 

160 return (pem_cert_bytes, pem_key_bytes) 

161 raise MyClientCertFailureException() 

162 

163 try: 

164 channel = google.auth.transport.grpc.secure_authorized_channel( 

165 credentials, mtls_endpoint, request, 

166 client_cert_callback=my_client_cert_callback) 

167 except MyClientCertFailureException: 

168 # handle the exception 

169 

170 Option 3: use application default SSL credentials. It searches and uses 

171 the command in a context aware metadata file, which is available on devices 

172 with endpoint verification support (Note that 

173 `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable must be explicitly 

174 set to `true`). 

175 See https://cloud.google.com/endpoint-verification/docs/overview:: 

176 

177 try: 

178 default_ssl_credentials = SslCredentials() 

179 except: 

180 # Exception can be raised if the context aware metadata is malformed. 

181 # See :class:`SslCredentials` for the possible exceptions. 

182 

183 # Choose the endpoint based on the SSL credentials type. 

184 if default_ssl_credentials.is_mtls: 

185 endpoint_to_use = mtls_endpoint 

186 else: 

187 endpoint_to_use = regular_endpoint 

188 channel = google.auth.transport.grpc.secure_authorized_channel( 

189 credentials, endpoint_to_use, request, 

190 ssl_credentials=default_ssl_credentials) 

191 

192 Option 4: not setting ssl_credentials and client_cert_callback. For devices 

193 without endpoint verification support or `GOOGLE_API_USE_CLIENT_CERTIFICATE` 

194 environment variable is not `true`, a regular TLS channel is created; 

195 otherwise, a mutual TLS channel is created, however, the call should be 

196 wrapped in a try/except block in case of malformed context aware metadata. 

197 

198 The following code uses regular_endpoint, it works the same no matter the 

199 created channle is regular or mutual TLS. Regular endpoint ignores client 

200 certificate and key:: 

201 

202 channel = google.auth.transport.grpc.secure_authorized_channel( 

203 credentials, regular_endpoint, request) 

204 

205 The following code uses mtls_endpoint, if the created channle is regular, 

206 and API mtls_endpoint is confgured to require client SSL credentials, API 

207 calls using this channel will be rejected:: 

208 

209 channel = google.auth.transport.grpc.secure_authorized_channel( 

210 credentials, mtls_endpoint, request) 

211 

212 Args: 

213 credentials (google.auth.credentials.Credentials): The credentials to 

214 add to requests. 

215 request (google.auth.transport.Request): A HTTP transport request 

216 object used to refresh credentials as needed. Even though gRPC 

217 is a separate transport, there's no way to refresh the credentials 

218 without using a standard http transport. 

219 target (str): The host and port of the service. 

220 ssl_credentials (grpc.ChannelCredentials): Optional SSL channel 

221 credentials. This can be used to specify different certificates. 

222 This argument is mutually exclusive with client_cert_callback; 

223 providing both will raise an exception. 

224 If ssl_credentials and client_cert_callback are None, application 

225 default SSL credentials are used if `GOOGLE_API_USE_CLIENT_CERTIFICATE` 

226 environment variable is explicitly set to `true`, otherwise one way TLS 

227 SSL credentials are used. 

228 client_cert_callback (Callable[[], (bytes, bytes)]): Optional 

229 callback function to obtain client certicate and key for mutual TLS 

230 connection. This argument is mutually exclusive with 

231 ssl_credentials; providing both will raise an exception. 

232 This argument does nothing unless `GOOGLE_API_USE_CLIENT_CERTIFICATE` 

233 environment variable is explicitly set to `true`. 

234 kwargs: Additional arguments to pass to :func:`grpc.secure_channel`. 

235 

236 Returns: 

237 grpc.Channel: The created gRPC channel. 

238 

239 Raises: 

240 google.auth.exceptions.MutualTLSChannelError: If mutual TLS channel 

241 creation failed for any reason. 

242 """ 

243 # Create the metadata plugin for inserting the authorization header. 

244 metadata_plugin = AuthMetadataPlugin(credentials, request) 

245 

246 # Create a set of grpc.CallCredentials using the metadata plugin. 

247 google_auth_credentials = grpc.metadata_call_credentials(metadata_plugin) 

248 

249 if ssl_credentials and client_cert_callback: 

250 raise exceptions.MalformedError( 

251 "Received both ssl_credentials and client_cert_callback; " 

252 "these are mutually exclusive." 

253 ) 

254 

255 # If SSL credentials are not explicitly set, try client_cert_callback and ADC. 

256 if not ssl_credentials: 

257 use_client_cert = _mtls_helper.check_use_client_cert() 

258 if use_client_cert and client_cert_callback: 

259 # Use the callback if provided. 

260 cert, key = client_cert_callback() 

261 ssl_credentials = grpc.ssl_channel_credentials( 

262 certificate_chain=cert, private_key=key 

263 ) 

264 elif use_client_cert: 

265 # Use application default SSL credentials. 

266 adc_ssl_credentils = SslCredentials() 

267 ssl_credentials = adc_ssl_credentils.ssl_credentials 

268 else: 

269 ssl_credentials = grpc.ssl_channel_credentials() 

270 

271 # Combine the ssl credentials and the authorization credentials. 

272 composite_credentials = grpc.composite_channel_credentials( 

273 ssl_credentials, google_auth_credentials 

274 ) 

275 

276 return grpc.secure_channel(target, composite_credentials, **kwargs) 

277 

278 

279class SslCredentials: 

280 """Class for application default SSL credentials. 

281 

282 The behavior is controlled by `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment 

283 variable whose default value is `false`. Client certificate will not be used 

284 unless the environment variable is explicitly set to `true`. See 

285 https://google.aip.dev/auth/4114 

286 

287 If the environment variable is `true`, then for devices with endpoint verification 

288 support, a device certificate will be automatically loaded and mutual TLS will 

289 be established. 

290 See https://cloud.google.com/endpoint-verification/docs/overview. 

291 """ 

292 

293 def __init__(self): 

294 use_client_cert = _mtls_helper.check_use_client_cert() 

295 if not use_client_cert: 

296 self._is_mtls = False 

297 else: 

298 # Load client SSL credentials. 

299 metadata_path = _mtls_helper._check_config_path( 

300 _mtls_helper.CONTEXT_AWARE_METADATA_PATH 

301 ) 

302 self._is_mtls = metadata_path is not None 

303 

304 @property 

305 def ssl_credentials(self): 

306 """Get the created SSL channel credentials. 

307 

308 For devices with endpoint verification support, if the device certificate 

309 loading has any problems, corresponding exceptions will be raised. For 

310 a device without endpoint verification support, no exceptions will be 

311 raised. 

312 

313 Returns: 

314 grpc.ChannelCredentials: The created grpc channel credentials. 

315 

316 Raises: 

317 google.auth.exceptions.MutualTLSChannelError: If mutual TLS channel 

318 creation failed for any reason. 

319 """ 

320 if self._is_mtls: 

321 try: 

322 _, cert, key, _ = _mtls_helper.get_client_ssl_credentials() 

323 self._ssl_credentials = grpc.ssl_channel_credentials( 

324 certificate_chain=cert, private_key=key 

325 ) 

326 except exceptions.ClientCertError as caught_exc: 

327 new_exc = exceptions.MutualTLSChannelError(caught_exc) 

328 raise new_exc from caught_exc 

329 else: 

330 self._ssl_credentials = grpc.ssl_channel_credentials() 

331 

332 return self._ssl_credentials 

333 

334 @property 

335 def is_mtls(self): 

336 """Indicates if the created SSL channel credentials is mutual TLS.""" 

337 return self._is_mtls