1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import json
17import logging as std_logging
18import pickle
19import warnings
20from typing import Callable, Dict, Optional, Sequence, Tuple, Union
21
22from google.api_core import grpc_helpers
23from google.api_core import gapic_v1
24import google.auth # type: ignore
25from google.auth import credentials as ga_credentials # type: ignore
26from google.auth.transport.grpc import SslCredentials # type: ignore
27from google.protobuf.json_format import MessageToJson
28import google.protobuf.message
29
30import grpc # type: ignore
31import proto # type: ignore
32
33from google.cloud.logging_v2.types import logging_metrics
34from google.longrunning import operations_pb2 # type: ignore
35from google.protobuf import empty_pb2 # type: ignore
36from .base import MetricsServiceV2Transport, DEFAULT_CLIENT_INFO
37
38try:
39 from google.api_core import client_logging # type: ignore
40
41 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
42except ImportError: # pragma: NO COVER
43 CLIENT_LOGGING_SUPPORTED = False
44
45_LOGGER = std_logging.getLogger(__name__)
46
47
48class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
49 def intercept_unary_unary(self, continuation, client_call_details, request):
50 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
51 std_logging.DEBUG
52 )
53 if logging_enabled: # pragma: NO COVER
54 request_metadata = client_call_details.metadata
55 if isinstance(request, proto.Message):
56 request_payload = type(request).to_json(request)
57 elif isinstance(request, google.protobuf.message.Message):
58 request_payload = MessageToJson(request)
59 else:
60 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"
61
62 request_metadata = {
63 key: value.decode("utf-8") if isinstance(value, bytes) else value
64 for key, value in request_metadata
65 }
66 grpc_request = {
67 "payload": request_payload,
68 "requestMethod": "grpc",
69 "metadata": dict(request_metadata),
70 }
71 _LOGGER.debug(
72 f"Sending request for {client_call_details.method}",
73 extra={
74 "serviceName": "google.logging.v2.MetricsServiceV2",
75 "rpcName": str(client_call_details.method),
76 "request": grpc_request,
77 "metadata": grpc_request["metadata"],
78 },
79 )
80 response = continuation(client_call_details, request)
81 if logging_enabled: # pragma: NO COVER
82 response_metadata = response.trailing_metadata()
83 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples
84 metadata = (
85 dict([(k, str(v)) for k, v in response_metadata])
86 if response_metadata
87 else None
88 )
89 result = response.result()
90 if isinstance(result, proto.Message):
91 response_payload = type(result).to_json(result)
92 elif isinstance(result, google.protobuf.message.Message):
93 response_payload = MessageToJson(result)
94 else:
95 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}"
96 grpc_response = {
97 "payload": response_payload,
98 "metadata": metadata,
99 "status": "OK",
100 }
101 _LOGGER.debug(
102 f"Received response for {client_call_details.method}.",
103 extra={
104 "serviceName": "google.logging.v2.MetricsServiceV2",
105 "rpcName": client_call_details.method,
106 "response": grpc_response,
107 "metadata": grpc_response["metadata"],
108 },
109 )
110 return response
111
112
113class MetricsServiceV2GrpcTransport(MetricsServiceV2Transport):
114 """gRPC backend transport for MetricsServiceV2.
115
116 Service for configuring logs-based metrics.
117
118 This class defines the same methods as the primary client, so the
119 primary client can load the underlying transport implementation
120 and call it.
121
122 It sends protocol buffers over the wire using gRPC (which is built on
123 top of HTTP/2); the ``grpcio`` package must be installed.
124 """
125
126 _stubs: Dict[str, Callable]
127
128 def __init__(
129 self,
130 *,
131 host: str = "logging.googleapis.com",
132 credentials: Optional[ga_credentials.Credentials] = None,
133 credentials_file: Optional[str] = None,
134 scopes: Optional[Sequence[str]] = None,
135 channel: Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]] = None,
136 api_mtls_endpoint: Optional[str] = None,
137 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
138 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
139 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
140 quota_project_id: Optional[str] = None,
141 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
142 always_use_jwt_access: Optional[bool] = False,
143 api_audience: Optional[str] = None,
144 ) -> None:
145 """Instantiate the transport.
146
147 Args:
148 host (Optional[str]):
149 The hostname to connect to (default: 'logging.googleapis.com').
150 credentials (Optional[google.auth.credentials.Credentials]): The
151 authorization credentials to attach to requests. These
152 credentials identify the application to the service; if none
153 are specified, the client will attempt to ascertain the
154 credentials from the environment.
155 This argument is ignored if a ``channel`` instance is provided.
156 credentials_file (Optional[str]): A file with credentials that can
157 be loaded with :func:`google.auth.load_credentials_from_file`.
158 This argument is ignored if a ``channel`` instance is provided.
159 scopes (Optional(Sequence[str])): A list of scopes. This argument is
160 ignored if a ``channel`` instance is provided.
161 channel (Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]]):
162 A ``Channel`` instance through which to make calls, or a Callable
163 that constructs and returns one. If set to None, ``self.create_channel``
164 is used to create the channel. If a Callable is given, it will be called
165 with the same arguments as used in ``self.create_channel``.
166 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
167 If provided, it overrides the ``host`` argument and tries to create
168 a mutual TLS channel with client SSL credentials from
169 ``client_cert_source`` or application default SSL credentials.
170 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
171 Deprecated. A callback to provide client SSL certificate bytes and
172 private key bytes, both in PEM format. It is ignored if
173 ``api_mtls_endpoint`` is None.
174 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
175 for the grpc channel. It is ignored if a ``channel`` instance is provided.
176 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
177 A callback to provide client certificate bytes and private key bytes,
178 both in PEM format. It is used to configure a mutual TLS channel. It is
179 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided.
180 quota_project_id (Optional[str]): An optional project to use for billing
181 and quota.
182 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
183 The client info used to send a user-agent string along with
184 API requests. If ``None``, then default info will be used.
185 Generally, you only need to set this if you're developing
186 your own client library.
187 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
188 be used for service account credentials.
189
190 Raises:
191 google.auth.exceptions.MutualTLSChannelError: If mutual TLS transport
192 creation failed for any reason.
193 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
194 and ``credentials_file`` are passed.
195 """
196 self._grpc_channel = None
197 self._ssl_channel_credentials = ssl_channel_credentials
198 self._stubs: Dict[str, Callable] = {}
199
200 if api_mtls_endpoint:
201 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
202 if client_cert_source:
203 warnings.warn("client_cert_source is deprecated", DeprecationWarning)
204
205 if isinstance(channel, grpc.Channel):
206 # Ignore credentials if a channel was passed.
207 credentials = None
208 self._ignore_credentials = True
209 # If a channel was explicitly provided, set it.
210 self._grpc_channel = channel
211 self._ssl_channel_credentials = None
212
213 else:
214 if api_mtls_endpoint:
215 host = api_mtls_endpoint
216
217 # Create SSL credentials with client_cert_source or application
218 # default SSL credentials.
219 if client_cert_source:
220 cert, key = client_cert_source()
221 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
222 certificate_chain=cert, private_key=key
223 )
224 else:
225 self._ssl_channel_credentials = SslCredentials().ssl_credentials
226
227 else:
228 if client_cert_source_for_mtls and not ssl_channel_credentials:
229 cert, key = client_cert_source_for_mtls()
230 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
231 certificate_chain=cert, private_key=key
232 )
233
234 # The base transport sets the host, credentials and scopes
235 super().__init__(
236 host=host,
237 credentials=credentials,
238 credentials_file=credentials_file,
239 scopes=scopes,
240 quota_project_id=quota_project_id,
241 client_info=client_info,
242 always_use_jwt_access=always_use_jwt_access,
243 api_audience=api_audience,
244 )
245
246 if not self._grpc_channel:
247 # initialize with the provided callable or the default channel
248 channel_init = channel or type(self).create_channel
249 self._grpc_channel = channel_init(
250 self._host,
251 # use the credentials which are saved
252 credentials=self._credentials,
253 # Set ``credentials_file`` to ``None`` here as
254 # the credentials that we saved earlier should be used.
255 credentials_file=None,
256 scopes=self._scopes,
257 ssl_credentials=self._ssl_channel_credentials,
258 quota_project_id=quota_project_id,
259 options=[
260 ("grpc.max_send_message_length", -1),
261 ("grpc.max_receive_message_length", -1),
262 ],
263 )
264
265 self._interceptor = _LoggingClientInterceptor()
266 self._logged_channel = grpc.intercept_channel(
267 self._grpc_channel, self._interceptor
268 )
269
270 # Wrap messages. This must be done after self._logged_channel exists
271 self._prep_wrapped_messages(client_info)
272
273 @classmethod
274 def create_channel(
275 cls,
276 host: str = "logging.googleapis.com",
277 credentials: Optional[ga_credentials.Credentials] = None,
278 credentials_file: Optional[str] = None,
279 scopes: Optional[Sequence[str]] = None,
280 quota_project_id: Optional[str] = None,
281 **kwargs,
282 ) -> grpc.Channel:
283 """Create and return a gRPC channel object.
284 Args:
285 host (Optional[str]): The host for the channel to use.
286 credentials (Optional[~.Credentials]): The
287 authorization credentials to attach to requests. These
288 credentials identify this application to the service. If
289 none are specified, the client will attempt to ascertain
290 the credentials from the environment.
291 credentials_file (Optional[str]): A file with credentials that can
292 be loaded with :func:`google.auth.load_credentials_from_file`.
293 This argument is mutually exclusive with credentials.
294 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
295 service. These are only used when credentials are not specified and
296 are passed to :func:`google.auth.default`.
297 quota_project_id (Optional[str]): An optional project to use for billing
298 and quota.
299 kwargs (Optional[dict]): Keyword arguments, which are passed to the
300 channel creation.
301 Returns:
302 grpc.Channel: A gRPC channel object.
303
304 Raises:
305 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
306 and ``credentials_file`` are passed.
307 """
308
309 return grpc_helpers.create_channel(
310 host,
311 credentials=credentials,
312 credentials_file=credentials_file,
313 quota_project_id=quota_project_id,
314 default_scopes=cls.AUTH_SCOPES,
315 scopes=scopes,
316 default_host=cls.DEFAULT_HOST,
317 **kwargs,
318 )
319
320 @property
321 def grpc_channel(self) -> grpc.Channel:
322 """Return the channel designed to connect to this service."""
323 return self._grpc_channel
324
325 @property
326 def list_log_metrics(
327 self,
328 ) -> Callable[
329 [logging_metrics.ListLogMetricsRequest], logging_metrics.ListLogMetricsResponse
330 ]:
331 r"""Return a callable for the list log metrics method over gRPC.
332
333 Lists logs-based metrics.
334
335 Returns:
336 Callable[[~.ListLogMetricsRequest],
337 ~.ListLogMetricsResponse]:
338 A function that, when called, will call the underlying RPC
339 on the server.
340 """
341 # Generate a "stub function" on-the-fly which will actually make
342 # the request.
343 # gRPC handles serialization and deserialization, so we just need
344 # to pass in the functions for each.
345 if "list_log_metrics" not in self._stubs:
346 self._stubs["list_log_metrics"] = self._logged_channel.unary_unary(
347 "/google.logging.v2.MetricsServiceV2/ListLogMetrics",
348 request_serializer=logging_metrics.ListLogMetricsRequest.serialize,
349 response_deserializer=logging_metrics.ListLogMetricsResponse.deserialize,
350 )
351 return self._stubs["list_log_metrics"]
352
353 @property
354 def get_log_metric(
355 self,
356 ) -> Callable[[logging_metrics.GetLogMetricRequest], logging_metrics.LogMetric]:
357 r"""Return a callable for the get log metric method over gRPC.
358
359 Gets a logs-based metric.
360
361 Returns:
362 Callable[[~.GetLogMetricRequest],
363 ~.LogMetric]:
364 A function that, when called, will call the underlying RPC
365 on the server.
366 """
367 # Generate a "stub function" on-the-fly which will actually make
368 # the request.
369 # gRPC handles serialization and deserialization, so we just need
370 # to pass in the functions for each.
371 if "get_log_metric" not in self._stubs:
372 self._stubs["get_log_metric"] = self._logged_channel.unary_unary(
373 "/google.logging.v2.MetricsServiceV2/GetLogMetric",
374 request_serializer=logging_metrics.GetLogMetricRequest.serialize,
375 response_deserializer=logging_metrics.LogMetric.deserialize,
376 )
377 return self._stubs["get_log_metric"]
378
379 @property
380 def create_log_metric(
381 self,
382 ) -> Callable[[logging_metrics.CreateLogMetricRequest], logging_metrics.LogMetric]:
383 r"""Return a callable for the create log metric method over gRPC.
384
385 Creates a logs-based metric.
386
387 Returns:
388 Callable[[~.CreateLogMetricRequest],
389 ~.LogMetric]:
390 A function that, when called, will call the underlying RPC
391 on the server.
392 """
393 # Generate a "stub function" on-the-fly which will actually make
394 # the request.
395 # gRPC handles serialization and deserialization, so we just need
396 # to pass in the functions for each.
397 if "create_log_metric" not in self._stubs:
398 self._stubs["create_log_metric"] = self._logged_channel.unary_unary(
399 "/google.logging.v2.MetricsServiceV2/CreateLogMetric",
400 request_serializer=logging_metrics.CreateLogMetricRequest.serialize,
401 response_deserializer=logging_metrics.LogMetric.deserialize,
402 )
403 return self._stubs["create_log_metric"]
404
405 @property
406 def update_log_metric(
407 self,
408 ) -> Callable[[logging_metrics.UpdateLogMetricRequest], logging_metrics.LogMetric]:
409 r"""Return a callable for the update log metric method over gRPC.
410
411 Creates or updates a logs-based metric.
412
413 Returns:
414 Callable[[~.UpdateLogMetricRequest],
415 ~.LogMetric]:
416 A function that, when called, will call the underlying RPC
417 on the server.
418 """
419 # Generate a "stub function" on-the-fly which will actually make
420 # the request.
421 # gRPC handles serialization and deserialization, so we just need
422 # to pass in the functions for each.
423 if "update_log_metric" not in self._stubs:
424 self._stubs["update_log_metric"] = self._logged_channel.unary_unary(
425 "/google.logging.v2.MetricsServiceV2/UpdateLogMetric",
426 request_serializer=logging_metrics.UpdateLogMetricRequest.serialize,
427 response_deserializer=logging_metrics.LogMetric.deserialize,
428 )
429 return self._stubs["update_log_metric"]
430
431 @property
432 def delete_log_metric(
433 self,
434 ) -> Callable[[logging_metrics.DeleteLogMetricRequest], empty_pb2.Empty]:
435 r"""Return a callable for the delete log metric method over gRPC.
436
437 Deletes a logs-based metric.
438
439 Returns:
440 Callable[[~.DeleteLogMetricRequest],
441 ~.Empty]:
442 A function that, when called, will call the underlying RPC
443 on the server.
444 """
445 # Generate a "stub function" on-the-fly which will actually make
446 # the request.
447 # gRPC handles serialization and deserialization, so we just need
448 # to pass in the functions for each.
449 if "delete_log_metric" not in self._stubs:
450 self._stubs["delete_log_metric"] = self._logged_channel.unary_unary(
451 "/google.logging.v2.MetricsServiceV2/DeleteLogMetric",
452 request_serializer=logging_metrics.DeleteLogMetricRequest.serialize,
453 response_deserializer=empty_pb2.Empty.FromString,
454 )
455 return self._stubs["delete_log_metric"]
456
457 def close(self):
458 self._logged_channel.close()
459
460 @property
461 def cancel_operation(
462 self,
463 ) -> Callable[[operations_pb2.CancelOperationRequest], None]:
464 r"""Return a callable for the cancel_operation method over gRPC."""
465 # Generate a "stub function" on-the-fly which will actually make
466 # the request.
467 # gRPC handles serialization and deserialization, so we just need
468 # to pass in the functions for each.
469 if "cancel_operation" not in self._stubs:
470 self._stubs["cancel_operation"] = self._logged_channel.unary_unary(
471 "/google.longrunning.Operations/CancelOperation",
472 request_serializer=operations_pb2.CancelOperationRequest.SerializeToString,
473 response_deserializer=None,
474 )
475 return self._stubs["cancel_operation"]
476
477 @property
478 def get_operation(
479 self,
480 ) -> Callable[[operations_pb2.GetOperationRequest], operations_pb2.Operation]:
481 r"""Return a callable for the get_operation method over gRPC."""
482 # Generate a "stub function" on-the-fly which will actually make
483 # the request.
484 # gRPC handles serialization and deserialization, so we just need
485 # to pass in the functions for each.
486 if "get_operation" not in self._stubs:
487 self._stubs["get_operation"] = self._logged_channel.unary_unary(
488 "/google.longrunning.Operations/GetOperation",
489 request_serializer=operations_pb2.GetOperationRequest.SerializeToString,
490 response_deserializer=operations_pb2.Operation.FromString,
491 )
492 return self._stubs["get_operation"]
493
494 @property
495 def list_operations(
496 self,
497 ) -> Callable[
498 [operations_pb2.ListOperationsRequest], operations_pb2.ListOperationsResponse
499 ]:
500 r"""Return a callable for the list_operations method over gRPC."""
501 # Generate a "stub function" on-the-fly which will actually make
502 # the request.
503 # gRPC handles serialization and deserialization, so we just need
504 # to pass in the functions for each.
505 if "list_operations" not in self._stubs:
506 self._stubs["list_operations"] = self._logged_channel.unary_unary(
507 "/google.longrunning.Operations/ListOperations",
508 request_serializer=operations_pb2.ListOperationsRequest.SerializeToString,
509 response_deserializer=operations_pb2.ListOperationsResponse.FromString,
510 )
511 return self._stubs["list_operations"]
512
513 @property
514 def kind(self) -> str:
515 return "grpc"
516
517
518__all__ = ("MetricsServiceV2GrpcTransport",)