1# -*- coding: utf-8 -*-
2# Copyright 2025 Google LLC
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import inspect
17import json
18import pickle
19import logging as std_logging
20import warnings
21from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union
22
23from google.api_core import gapic_v1
24from google.api_core import grpc_helpers_async
25from google.api_core import exceptions as core_exceptions
26from google.api_core import retry_async as retries
27from google.auth import credentials as ga_credentials # type: ignore
28from google.auth.transport.grpc import SslCredentials # type: ignore
29from google.protobuf.json_format import MessageToJson
30import google.protobuf.message
31
32import grpc # type: ignore
33import proto # type: ignore
34from grpc.experimental import aio # type: ignore
35
36from google.cloud.logging_v2.types import logging_metrics
37from google.longrunning import operations_pb2 # type: ignore
38from google.protobuf import empty_pb2 # type: ignore
39from .base import MetricsServiceV2Transport, DEFAULT_CLIENT_INFO
40from .grpc import MetricsServiceV2GrpcTransport
41
42try:
43 from google.api_core import client_logging # type: ignore
44
45 CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
46except ImportError: # pragma: NO COVER
47 CLIENT_LOGGING_SUPPORTED = False
48
49_LOGGER = std_logging.getLogger(__name__)
50
51
52class _LoggingClientAIOInterceptor(
53 grpc.aio.UnaryUnaryClientInterceptor
54): # pragma: NO COVER
55 async def intercept_unary_unary(self, continuation, client_call_details, request):
56 logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
57 std_logging.DEBUG
58 )
59 if logging_enabled: # pragma: NO COVER
60 request_metadata = client_call_details.metadata
61 if isinstance(request, proto.Message):
62 request_payload = type(request).to_json(request)
63 elif isinstance(request, google.protobuf.message.Message):
64 request_payload = MessageToJson(request)
65 else:
66 request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"
67
68 request_metadata = {
69 key: value.decode("utf-8") if isinstance(value, bytes) else value
70 for key, value in request_metadata
71 }
72 grpc_request = {
73 "payload": request_payload,
74 "requestMethod": "grpc",
75 "metadata": dict(request_metadata),
76 }
77 _LOGGER.debug(
78 f"Sending request for {client_call_details.method}",
79 extra={
80 "serviceName": "google.logging.v2.MetricsServiceV2",
81 "rpcName": str(client_call_details.method),
82 "request": grpc_request,
83 "metadata": grpc_request["metadata"],
84 },
85 )
86 response = await continuation(client_call_details, request)
87 if logging_enabled: # pragma: NO COVER
88 response_metadata = await response.trailing_metadata()
89 # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples
90 metadata = (
91 dict([(k, str(v)) for k, v in response_metadata])
92 if response_metadata
93 else None
94 )
95 result = await response
96 if isinstance(result, proto.Message):
97 response_payload = type(result).to_json(result)
98 elif isinstance(result, google.protobuf.message.Message):
99 response_payload = MessageToJson(result)
100 else:
101 response_payload = f"{type(result).__name__}: {pickle.dumps(result)}"
102 grpc_response = {
103 "payload": response_payload,
104 "metadata": metadata,
105 "status": "OK",
106 }
107 _LOGGER.debug(
108 f"Received response to rpc {client_call_details.method}.",
109 extra={
110 "serviceName": "google.logging.v2.MetricsServiceV2",
111 "rpcName": str(client_call_details.method),
112 "response": grpc_response,
113 "metadata": grpc_response["metadata"],
114 },
115 )
116 return response
117
118
119class MetricsServiceV2GrpcAsyncIOTransport(MetricsServiceV2Transport):
120 """gRPC AsyncIO backend transport for MetricsServiceV2.
121
122 Service for configuring logs-based metrics.
123
124 This class defines the same methods as the primary client, so the
125 primary client can load the underlying transport implementation
126 and call it.
127
128 It sends protocol buffers over the wire using gRPC (which is built on
129 top of HTTP/2); the ``grpcio`` package must be installed.
130 """
131
132 _grpc_channel: aio.Channel
133 _stubs: Dict[str, Callable] = {}
134
135 @classmethod
136 def create_channel(
137 cls,
138 host: str = "logging.googleapis.com",
139 credentials: Optional[ga_credentials.Credentials] = None,
140 credentials_file: Optional[str] = None,
141 scopes: Optional[Sequence[str]] = None,
142 quota_project_id: Optional[str] = None,
143 **kwargs,
144 ) -> aio.Channel:
145 """Create and return a gRPC AsyncIO channel object.
146 Args:
147 host (Optional[str]): The host for the channel to use.
148 credentials (Optional[~.Credentials]): The
149 authorization credentials to attach to requests. These
150 credentials identify this application to the service. If
151 none are specified, the client will attempt to ascertain
152 the credentials from the environment.
153 credentials_file (Optional[str]): Deprecated. A file with credentials that can
154 be loaded with :func:`google.auth.load_credentials_from_file`. This argument will be
155 removed in the next major version of this library.
156 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
157 service. These are only used when credentials are not specified and
158 are passed to :func:`google.auth.default`.
159 quota_project_id (Optional[str]): An optional project to use for billing
160 and quota.
161 kwargs (Optional[dict]): Keyword arguments, which are passed to the
162 channel creation.
163 Returns:
164 aio.Channel: A gRPC AsyncIO channel object.
165 """
166
167 return grpc_helpers_async.create_channel(
168 host,
169 credentials=credentials,
170 credentials_file=credentials_file,
171 quota_project_id=quota_project_id,
172 default_scopes=cls.AUTH_SCOPES,
173 scopes=scopes,
174 default_host=cls.DEFAULT_HOST,
175 **kwargs,
176 )
177
178 def __init__(
179 self,
180 *,
181 host: str = "logging.googleapis.com",
182 credentials: Optional[ga_credentials.Credentials] = None,
183 credentials_file: Optional[str] = None,
184 scopes: Optional[Sequence[str]] = None,
185 channel: Optional[Union[aio.Channel, Callable[..., aio.Channel]]] = None,
186 api_mtls_endpoint: Optional[str] = None,
187 client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
188 ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
189 client_cert_source_for_mtls: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
190 quota_project_id: Optional[str] = None,
191 client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
192 always_use_jwt_access: Optional[bool] = False,
193 api_audience: Optional[str] = None,
194 ) -> None:
195 """Instantiate the transport.
196
197 Args:
198 host (Optional[str]):
199 The hostname to connect to (default: 'logging.googleapis.com').
200 credentials (Optional[google.auth.credentials.Credentials]): The
201 authorization credentials to attach to requests. These
202 credentials identify the application to the service; if none
203 are specified, the client will attempt to ascertain the
204 credentials from the environment.
205 This argument is ignored if a ``channel`` instance is provided.
206 credentials_file (Optional[str]): Deprecated. A file with credentials that can
207 be loaded with :func:`google.auth.load_credentials_from_file`.
208 This argument is ignored if a ``channel`` instance is provided.
209 This argument will be removed in the next major version of this library.
210 scopes (Optional[Sequence[str]]): A optional list of scopes needed for this
211 service. These are only used when credentials are not specified and
212 are passed to :func:`google.auth.default`.
213 channel (Optional[Union[aio.Channel, Callable[..., aio.Channel]]]):
214 A ``Channel`` instance through which to make calls, or a Callable
215 that constructs and returns one. If set to None, ``self.create_channel``
216 is used to create the channel. If a Callable is given, it will be called
217 with the same arguments as used in ``self.create_channel``.
218 api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
219 If provided, it overrides the ``host`` argument and tries to create
220 a mutual TLS channel with client SSL credentials from
221 ``client_cert_source`` or application default SSL credentials.
222 client_cert_source (Optional[Callable[[], Tuple[bytes, bytes]]]):
223 Deprecated. A callback to provide client SSL certificate bytes and
224 private key bytes, both in PEM format. It is ignored if
225 ``api_mtls_endpoint`` is None.
226 ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
227 for the grpc channel. It is ignored if a ``channel`` instance is provided.
228 client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
229 A callback to provide client certificate bytes and private key bytes,
230 both in PEM format. It is used to configure a mutual TLS channel. It is
231 ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided.
232 quota_project_id (Optional[str]): An optional project to use for billing
233 and quota.
234 client_info (google.api_core.gapic_v1.client_info.ClientInfo):
235 The client info used to send a user-agent string along with
236 API requests. If ``None``, then default info will be used.
237 Generally, you only need to set this if you're developing
238 your own client library.
239 always_use_jwt_access (Optional[bool]): Whether self signed JWT should
240 be used for service account credentials.
241
242 Raises:
243 google.auth.exceptions.MutualTlsChannelError: If mutual TLS transport
244 creation failed for any reason.
245 google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials``
246 and ``credentials_file`` are passed.
247 """
248 self._grpc_channel = None
249 self._ssl_channel_credentials = ssl_channel_credentials
250 self._stubs: Dict[str, Callable] = {}
251
252 if api_mtls_endpoint:
253 warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning)
254 if client_cert_source:
255 warnings.warn("client_cert_source is deprecated", DeprecationWarning)
256
257 if isinstance(channel, aio.Channel):
258 # Ignore credentials if a channel was passed.
259 credentials = None
260 self._ignore_credentials = True
261 # If a channel was explicitly provided, set it.
262 self._grpc_channel = channel
263 self._ssl_channel_credentials = None
264 else:
265 if api_mtls_endpoint:
266 host = api_mtls_endpoint
267
268 # Create SSL credentials with client_cert_source or application
269 # default SSL credentials.
270 if client_cert_source:
271 cert, key = client_cert_source()
272 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
273 certificate_chain=cert, private_key=key
274 )
275 else:
276 self._ssl_channel_credentials = SslCredentials().ssl_credentials
277
278 else:
279 if client_cert_source_for_mtls and not ssl_channel_credentials:
280 cert, key = client_cert_source_for_mtls()
281 self._ssl_channel_credentials = grpc.ssl_channel_credentials(
282 certificate_chain=cert, private_key=key
283 )
284
285 # The base transport sets the host, credentials and scopes
286 super().__init__(
287 host=host,
288 credentials=credentials,
289 credentials_file=credentials_file,
290 scopes=scopes,
291 quota_project_id=quota_project_id,
292 client_info=client_info,
293 always_use_jwt_access=always_use_jwt_access,
294 api_audience=api_audience,
295 )
296
297 if not self._grpc_channel:
298 # initialize with the provided callable or the default channel
299 channel_init = channel or type(self).create_channel
300 self._grpc_channel = channel_init(
301 self._host,
302 # use the credentials which are saved
303 credentials=self._credentials,
304 # Set ``credentials_file`` to ``None`` here as
305 # the credentials that we saved earlier should be used.
306 credentials_file=None,
307 scopes=self._scopes,
308 ssl_credentials=self._ssl_channel_credentials,
309 quota_project_id=quota_project_id,
310 options=[
311 ("grpc.max_send_message_length", -1),
312 ("grpc.max_receive_message_length", -1),
313 ],
314 )
315
316 self._interceptor = _LoggingClientAIOInterceptor()
317 self._grpc_channel._unary_unary_interceptors.append(self._interceptor)
318 self._logged_channel = self._grpc_channel
319 self._wrap_with_kind = (
320 "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters
321 )
322 # Wrap messages. This must be done after self._logged_channel exists
323 self._prep_wrapped_messages(client_info)
324
325 @property
326 def grpc_channel(self) -> aio.Channel:
327 """Create the channel designed to connect to this service.
328
329 This property caches on the instance; repeated calls return
330 the same channel.
331 """
332 # Return the channel from cache.
333 return self._grpc_channel
334
335 @property
336 def list_log_metrics(
337 self,
338 ) -> Callable[
339 [logging_metrics.ListLogMetricsRequest],
340 Awaitable[logging_metrics.ListLogMetricsResponse],
341 ]:
342 r"""Return a callable for the list log metrics method over gRPC.
343
344 Lists logs-based metrics.
345
346 Returns:
347 Callable[[~.ListLogMetricsRequest],
348 Awaitable[~.ListLogMetricsResponse]]:
349 A function that, when called, will call the underlying RPC
350 on the server.
351 """
352 # Generate a "stub function" on-the-fly which will actually make
353 # the request.
354 # gRPC handles serialization and deserialization, so we just need
355 # to pass in the functions for each.
356 if "list_log_metrics" not in self._stubs:
357 self._stubs["list_log_metrics"] = self._logged_channel.unary_unary(
358 "/google.logging.v2.MetricsServiceV2/ListLogMetrics",
359 request_serializer=logging_metrics.ListLogMetricsRequest.serialize,
360 response_deserializer=logging_metrics.ListLogMetricsResponse.deserialize,
361 )
362 return self._stubs["list_log_metrics"]
363
364 @property
365 def get_log_metric(
366 self,
367 ) -> Callable[
368 [logging_metrics.GetLogMetricRequest], Awaitable[logging_metrics.LogMetric]
369 ]:
370 r"""Return a callable for the get log metric method over gRPC.
371
372 Gets a logs-based metric.
373
374 Returns:
375 Callable[[~.GetLogMetricRequest],
376 Awaitable[~.LogMetric]]:
377 A function that, when called, will call the underlying RPC
378 on the server.
379 """
380 # Generate a "stub function" on-the-fly which will actually make
381 # the request.
382 # gRPC handles serialization and deserialization, so we just need
383 # to pass in the functions for each.
384 if "get_log_metric" not in self._stubs:
385 self._stubs["get_log_metric"] = self._logged_channel.unary_unary(
386 "/google.logging.v2.MetricsServiceV2/GetLogMetric",
387 request_serializer=logging_metrics.GetLogMetricRequest.serialize,
388 response_deserializer=logging_metrics.LogMetric.deserialize,
389 )
390 return self._stubs["get_log_metric"]
391
392 @property
393 def create_log_metric(
394 self,
395 ) -> Callable[
396 [logging_metrics.CreateLogMetricRequest], Awaitable[logging_metrics.LogMetric]
397 ]:
398 r"""Return a callable for the create log metric method over gRPC.
399
400 Creates a logs-based metric.
401
402 Returns:
403 Callable[[~.CreateLogMetricRequest],
404 Awaitable[~.LogMetric]]:
405 A function that, when called, will call the underlying RPC
406 on the server.
407 """
408 # Generate a "stub function" on-the-fly which will actually make
409 # the request.
410 # gRPC handles serialization and deserialization, so we just need
411 # to pass in the functions for each.
412 if "create_log_metric" not in self._stubs:
413 self._stubs["create_log_metric"] = self._logged_channel.unary_unary(
414 "/google.logging.v2.MetricsServiceV2/CreateLogMetric",
415 request_serializer=logging_metrics.CreateLogMetricRequest.serialize,
416 response_deserializer=logging_metrics.LogMetric.deserialize,
417 )
418 return self._stubs["create_log_metric"]
419
420 @property
421 def update_log_metric(
422 self,
423 ) -> Callable[
424 [logging_metrics.UpdateLogMetricRequest], Awaitable[logging_metrics.LogMetric]
425 ]:
426 r"""Return a callable for the update log metric method over gRPC.
427
428 Creates or updates a logs-based metric.
429
430 Returns:
431 Callable[[~.UpdateLogMetricRequest],
432 Awaitable[~.LogMetric]]:
433 A function that, when called, will call the underlying RPC
434 on the server.
435 """
436 # Generate a "stub function" on-the-fly which will actually make
437 # the request.
438 # gRPC handles serialization and deserialization, so we just need
439 # to pass in the functions for each.
440 if "update_log_metric" not in self._stubs:
441 self._stubs["update_log_metric"] = self._logged_channel.unary_unary(
442 "/google.logging.v2.MetricsServiceV2/UpdateLogMetric",
443 request_serializer=logging_metrics.UpdateLogMetricRequest.serialize,
444 response_deserializer=logging_metrics.LogMetric.deserialize,
445 )
446 return self._stubs["update_log_metric"]
447
448 @property
449 def delete_log_metric(
450 self,
451 ) -> Callable[[logging_metrics.DeleteLogMetricRequest], Awaitable[empty_pb2.Empty]]:
452 r"""Return a callable for the delete log metric method over gRPC.
453
454 Deletes a logs-based metric.
455
456 Returns:
457 Callable[[~.DeleteLogMetricRequest],
458 Awaitable[~.Empty]]:
459 A function that, when called, will call the underlying RPC
460 on the server.
461 """
462 # Generate a "stub function" on-the-fly which will actually make
463 # the request.
464 # gRPC handles serialization and deserialization, so we just need
465 # to pass in the functions for each.
466 if "delete_log_metric" not in self._stubs:
467 self._stubs["delete_log_metric"] = self._logged_channel.unary_unary(
468 "/google.logging.v2.MetricsServiceV2/DeleteLogMetric",
469 request_serializer=logging_metrics.DeleteLogMetricRequest.serialize,
470 response_deserializer=empty_pb2.Empty.FromString,
471 )
472 return self._stubs["delete_log_metric"]
473
474 def _prep_wrapped_messages(self, client_info):
475 """Precompute the wrapped methods, overriding the base class method to use async wrappers."""
476 self._wrapped_methods = {
477 self.list_log_metrics: self._wrap_method(
478 self.list_log_metrics,
479 default_retry=retries.AsyncRetry(
480 initial=0.1,
481 maximum=60.0,
482 multiplier=1.3,
483 predicate=retries.if_exception_type(
484 core_exceptions.DeadlineExceeded,
485 core_exceptions.InternalServerError,
486 core_exceptions.ServiceUnavailable,
487 ),
488 deadline=60.0,
489 ),
490 default_timeout=60.0,
491 client_info=client_info,
492 ),
493 self.get_log_metric: self._wrap_method(
494 self.get_log_metric,
495 default_retry=retries.AsyncRetry(
496 initial=0.1,
497 maximum=60.0,
498 multiplier=1.3,
499 predicate=retries.if_exception_type(
500 core_exceptions.DeadlineExceeded,
501 core_exceptions.InternalServerError,
502 core_exceptions.ServiceUnavailable,
503 ),
504 deadline=60.0,
505 ),
506 default_timeout=60.0,
507 client_info=client_info,
508 ),
509 self.create_log_metric: self._wrap_method(
510 self.create_log_metric,
511 default_timeout=60.0,
512 client_info=client_info,
513 ),
514 self.update_log_metric: self._wrap_method(
515 self.update_log_metric,
516 default_retry=retries.AsyncRetry(
517 initial=0.1,
518 maximum=60.0,
519 multiplier=1.3,
520 predicate=retries.if_exception_type(
521 core_exceptions.DeadlineExceeded,
522 core_exceptions.InternalServerError,
523 core_exceptions.ServiceUnavailable,
524 ),
525 deadline=60.0,
526 ),
527 default_timeout=60.0,
528 client_info=client_info,
529 ),
530 self.delete_log_metric: self._wrap_method(
531 self.delete_log_metric,
532 default_retry=retries.AsyncRetry(
533 initial=0.1,
534 maximum=60.0,
535 multiplier=1.3,
536 predicate=retries.if_exception_type(
537 core_exceptions.DeadlineExceeded,
538 core_exceptions.InternalServerError,
539 core_exceptions.ServiceUnavailable,
540 ),
541 deadline=60.0,
542 ),
543 default_timeout=60.0,
544 client_info=client_info,
545 ),
546 self.cancel_operation: self._wrap_method(
547 self.cancel_operation,
548 default_timeout=None,
549 client_info=client_info,
550 ),
551 self.get_operation: self._wrap_method(
552 self.get_operation,
553 default_timeout=None,
554 client_info=client_info,
555 ),
556 self.list_operations: self._wrap_method(
557 self.list_operations,
558 default_timeout=None,
559 client_info=client_info,
560 ),
561 }
562
563 def _wrap_method(self, func, *args, **kwargs):
564 if self._wrap_with_kind: # pragma: NO COVER
565 kwargs["kind"] = self.kind
566 return gapic_v1.method_async.wrap_method(func, *args, **kwargs)
567
568 def close(self):
569 return self._logged_channel.close()
570
571 @property
572 def kind(self) -> str:
573 return "grpc_asyncio"
574
575 @property
576 def cancel_operation(
577 self,
578 ) -> Callable[[operations_pb2.CancelOperationRequest], None]:
579 r"""Return a callable for the cancel_operation method over gRPC."""
580 # Generate a "stub function" on-the-fly which will actually make
581 # the request.
582 # gRPC handles serialization and deserialization, so we just need
583 # to pass in the functions for each.
584 if "cancel_operation" not in self._stubs:
585 self._stubs["cancel_operation"] = self._logged_channel.unary_unary(
586 "/google.longrunning.Operations/CancelOperation",
587 request_serializer=operations_pb2.CancelOperationRequest.SerializeToString,
588 response_deserializer=None,
589 )
590 return self._stubs["cancel_operation"]
591
592 @property
593 def get_operation(
594 self,
595 ) -> Callable[[operations_pb2.GetOperationRequest], operations_pb2.Operation]:
596 r"""Return a callable for the get_operation method over gRPC."""
597 # Generate a "stub function" on-the-fly which will actually make
598 # the request.
599 # gRPC handles serialization and deserialization, so we just need
600 # to pass in the functions for each.
601 if "get_operation" not in self._stubs:
602 self._stubs["get_operation"] = self._logged_channel.unary_unary(
603 "/google.longrunning.Operations/GetOperation",
604 request_serializer=operations_pb2.GetOperationRequest.SerializeToString,
605 response_deserializer=operations_pb2.Operation.FromString,
606 )
607 return self._stubs["get_operation"]
608
609 @property
610 def list_operations(
611 self,
612 ) -> Callable[
613 [operations_pb2.ListOperationsRequest], operations_pb2.ListOperationsResponse
614 ]:
615 r"""Return a callable for the list_operations method over gRPC."""
616 # Generate a "stub function" on-the-fly which will actually make
617 # the request.
618 # gRPC handles serialization and deserialization, so we just need
619 # to pass in the functions for each.
620 if "list_operations" not in self._stubs:
621 self._stubs["list_operations"] = self._logged_channel.unary_unary(
622 "/google.longrunning.Operations/ListOperations",
623 request_serializer=operations_pb2.ListOperationsRequest.SerializeToString,
624 response_deserializer=operations_pb2.ListOperationsResponse.FromString,
625 )
626 return self._stubs["list_operations"]
627
628
629__all__ = ("MetricsServiceV2GrpcAsyncIOTransport",)