1# Copyright The OpenTelemetry Authors
2# Licensed under the Apache License, Version 2.0 (the "License");
3# you may not use this file except in compliance with the License.
4# You may obtain a copy of the License at
5#
6# http://www.apache.org/licenses/LICENSE-2.0
7#
8# Unless required by applicable law or agreed to in writing, software
9# distributed under the License is distributed on an "AS IS" BASIS,
10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11# See the License for the specific language governing permissions and
12# limitations under the License.
13
14import gzip
15import logging
16import zlib
17from os import environ
18from typing import Dict, Optional, Sequence, Any, Callable, List, Mapping
19from io import BytesIO
20from time import sleep
21
22from opentelemetry.exporter.otlp.proto.http import Compression
23from opentelemetry.sdk.metrics._internal.aggregation import Aggregation
24from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import (
25 ExportMetricsServiceRequest,
26)
27from opentelemetry.proto.common.v1.common_pb2 import (
28 AnyValue,
29 ArrayValue,
30 KeyValue,
31 KeyValueList,
32)
33from opentelemetry.proto.common.v1.common_pb2 import InstrumentationScope
34from opentelemetry.proto.resource.v1.resource_pb2 import Resource
35from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2
36from opentelemetry.sdk.environment_variables import (
37 OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE,
38 OTEL_EXPORTER_OTLP_ENDPOINT,
39 OTEL_EXPORTER_OTLP_CERTIFICATE,
40 OTEL_EXPORTER_OTLP_HEADERS,
41 OTEL_EXPORTER_OTLP_TIMEOUT,
42 OTEL_EXPORTER_OTLP_COMPRESSION,
43 OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
44 OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE,
45 OTEL_EXPORTER_OTLP_METRICS_HEADERS,
46 OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
47 OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
48)
49from opentelemetry.sdk.metrics import (
50 Counter,
51 Histogram,
52 ObservableCounter,
53 ObservableGauge,
54 ObservableUpDownCounter,
55 UpDownCounter,
56)
57from opentelemetry.sdk.metrics.export import (
58 AggregationTemporality,
59 Gauge,
60 Histogram as HistogramType,
61 MetricExporter,
62 MetricExportResult,
63 MetricsData,
64 Sum,
65)
66from opentelemetry.sdk.resources import Resource as SDKResource
67from opentelemetry.util.re import parse_env_headers
68
69import backoff
70import requests
71
72_logger = logging.getLogger(__name__)
73
74
75DEFAULT_COMPRESSION = Compression.NoCompression
76DEFAULT_ENDPOINT = "http://localhost:4318/"
77DEFAULT_METRICS_EXPORT_PATH = "v1/metrics"
78DEFAULT_TIMEOUT = 10 # in seconds
79
80# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
81# wait generator API requires a first .send(None) before reading the backoff
82# values from the generator.
83_is_backoff_v2 = next(backoff.expo()) is None
84
85
86def _expo(*args, **kwargs):
87 gen = backoff.expo(*args, **kwargs)
88 if _is_backoff_v2:
89 gen.send(None)
90 return gen
91
92
93class OTLPMetricExporter(MetricExporter):
94
95 _MAX_RETRY_TIMEOUT = 64
96
97 def __init__(
98 self,
99 endpoint: Optional[str] = None,
100 certificate_file: Optional[str] = None,
101 headers: Optional[Dict[str, str]] = None,
102 timeout: Optional[int] = None,
103 compression: Optional[Compression] = None,
104 session: Optional[requests.Session] = None,
105 preferred_temporality: Dict[type, AggregationTemporality] = None,
106 preferred_aggregation: Dict[type, Aggregation] = None,
107 ):
108 self._endpoint = endpoint or environ.get(
109 OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
110 _append_metrics_path(
111 environ.get(OTEL_EXPORTER_OTLP_ENDPOINT, DEFAULT_ENDPOINT)
112 ),
113 )
114 self._certificate_file = certificate_file or environ.get(
115 OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE,
116 environ.get(OTEL_EXPORTER_OTLP_CERTIFICATE, True),
117 )
118 headers_string = environ.get(
119 OTEL_EXPORTER_OTLP_METRICS_HEADERS,
120 environ.get(OTEL_EXPORTER_OTLP_HEADERS, ""),
121 )
122 self._headers = headers or parse_env_headers(headers_string)
123 self._timeout = timeout or int(
124 environ.get(
125 OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
126 environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT),
127 )
128 )
129 self._compression = compression or _compression_from_env()
130 self._session = session or requests.Session()
131 self._session.headers.update(self._headers)
132 self._session.headers.update(
133 {"Content-Type": "application/x-protobuf"}
134 )
135 if self._compression is not Compression.NoCompression:
136 self._session.headers.update(
137 {"Content-Encoding": self._compression.value}
138 )
139
140 instrument_class_temporality = {}
141 if (
142 environ.get(
143 OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE,
144 "CUMULATIVE",
145 )
146 .upper()
147 .strip()
148 == "DELTA"
149 ):
150 instrument_class_temporality = {
151 Counter: AggregationTemporality.DELTA,
152 UpDownCounter: AggregationTemporality.CUMULATIVE,
153 Histogram: AggregationTemporality.DELTA,
154 ObservableCounter: AggregationTemporality.DELTA,
155 ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
156 ObservableGauge: AggregationTemporality.CUMULATIVE,
157 }
158 else:
159 instrument_class_temporality = {
160 Counter: AggregationTemporality.CUMULATIVE,
161 UpDownCounter: AggregationTemporality.CUMULATIVE,
162 Histogram: AggregationTemporality.CUMULATIVE,
163 ObservableCounter: AggregationTemporality.CUMULATIVE,
164 ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
165 ObservableGauge: AggregationTemporality.CUMULATIVE,
166 }
167 instrument_class_temporality.update(preferred_temporality or {})
168
169 MetricExporter.__init__(
170 self,
171 preferred_temporality=instrument_class_temporality,
172 preferred_aggregation=preferred_aggregation,
173 )
174
175 def _export(self, serialized_data: str):
176 data = serialized_data
177 if self._compression == Compression.Gzip:
178 gzip_data = BytesIO()
179 with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream:
180 gzip_stream.write(serialized_data)
181 data = gzip_data.getvalue()
182 elif self._compression == Compression.Deflate:
183 data = zlib.compress(bytes(serialized_data))
184
185 return self._session.post(
186 url=self._endpoint,
187 data=data,
188 verify=self._certificate_file,
189 timeout=self._timeout,
190 )
191
192 @staticmethod
193 def _retryable(resp: requests.Response) -> bool:
194 if resp.status_code == 408:
195 return True
196 if resp.status_code >= 500 and resp.status_code <= 599:
197 return True
198 return False
199
200 def _translate_data(
201 self, data: MetricsData
202 ) -> ExportMetricsServiceRequest:
203
204 resource_metrics_dict = {}
205
206 for resource_metrics in data.resource_metrics:
207
208 resource = resource_metrics.resource
209
210 # It is safe to assume that each entry in data.resource_metrics is
211 # associated with an unique resource.
212 scope_metrics_dict = {}
213
214 resource_metrics_dict[resource] = scope_metrics_dict
215
216 for scope_metrics in resource_metrics.scope_metrics:
217
218 instrumentation_scope = scope_metrics.scope
219
220 # The SDK groups metrics in instrumentation scopes already so
221 # there is no need to check for existing instrumentation scopes
222 # here.
223 pb2_scope_metrics = pb2.ScopeMetrics(
224 scope=InstrumentationScope(
225 name=instrumentation_scope.name,
226 version=instrumentation_scope.version,
227 )
228 )
229
230 scope_metrics_dict[instrumentation_scope] = pb2_scope_metrics
231
232 for metric in scope_metrics.metrics:
233 pb2_metric = pb2.Metric(
234 name=metric.name,
235 description=metric.description,
236 unit=metric.unit,
237 )
238
239 if isinstance(metric.data, Gauge):
240 for data_point in metric.data.data_points:
241 pt = pb2.NumberDataPoint(
242 attributes=self._translate_attributes(
243 data_point.attributes
244 ),
245 time_unix_nano=data_point.time_unix_nano,
246 )
247 if isinstance(data_point.value, int):
248 pt.as_int = data_point.value
249 else:
250 pt.as_double = data_point.value
251 pb2_metric.gauge.data_points.append(pt)
252
253 elif isinstance(metric.data, HistogramType):
254 for data_point in metric.data.data_points:
255 pt = pb2.HistogramDataPoint(
256 attributes=self._translate_attributes(
257 data_point.attributes
258 ),
259 time_unix_nano=data_point.time_unix_nano,
260 start_time_unix_nano=(
261 data_point.start_time_unix_nano
262 ),
263 count=data_point.count,
264 sum=data_point.sum,
265 bucket_counts=data_point.bucket_counts,
266 explicit_bounds=data_point.explicit_bounds,
267 max=data_point.max,
268 min=data_point.min,
269 )
270 pb2_metric.histogram.aggregation_temporality = (
271 metric.data.aggregation_temporality
272 )
273 pb2_metric.histogram.data_points.append(pt)
274
275 elif isinstance(metric.data, Sum):
276 for data_point in metric.data.data_points:
277 pt = pb2.NumberDataPoint(
278 attributes=self._translate_attributes(
279 data_point.attributes
280 ),
281 start_time_unix_nano=(
282 data_point.start_time_unix_nano
283 ),
284 time_unix_nano=data_point.time_unix_nano,
285 )
286 if isinstance(data_point.value, int):
287 pt.as_int = data_point.value
288 else:
289 pt.as_double = data_point.value
290 # note that because sum is a message type, the
291 # fields must be set individually rather than
292 # instantiating a pb2.Sum and setting it once
293 pb2_metric.sum.aggregation_temporality = (
294 metric.data.aggregation_temporality
295 )
296 pb2_metric.sum.is_monotonic = (
297 metric.data.is_monotonic
298 )
299 pb2_metric.sum.data_points.append(pt)
300 else:
301 _logger.warn(
302 "unsupported datapoint type %s", metric.point
303 )
304 continue
305
306 pb2_scope_metrics.metrics.append(pb2_metric)
307
308 return ExportMetricsServiceRequest(
309 resource_metrics=get_resource_data(
310 resource_metrics_dict,
311 pb2.ResourceMetrics,
312 "metrics",
313 )
314 )
315
316 def _translate_attributes(self, attributes) -> Sequence[KeyValue]:
317 output = []
318 if attributes:
319
320 for key, value in attributes.items():
321 try:
322 output.append(_translate_key_values(key, value))
323 except Exception as error: # pylint: disable=broad-except
324 _logger.exception(error)
325 return output
326
327 def export(
328 self,
329 metrics_data: MetricsData,
330 timeout_millis: float = 10_000,
331 **kwargs,
332 ) -> MetricExportResult:
333 serialized_data = self._translate_data(metrics_data)
334 for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT):
335
336 if delay == self._MAX_RETRY_TIMEOUT:
337 return MetricExportResult.FAILURE
338
339 resp = self._export(serialized_data.SerializeToString())
340 # pylint: disable=no-else-return
341 if resp.status_code in (200, 202):
342 return MetricExportResult.SUCCESS
343 elif self._retryable(resp):
344 _logger.warning(
345 "Transient error %s encountered while exporting metric batch, retrying in %ss.",
346 resp.reason,
347 delay,
348 )
349 sleep(delay)
350 continue
351 else:
352 _logger.error(
353 "Failed to export batch code: %s, reason: %s",
354 resp.status_code,
355 resp.text,
356 )
357 return MetricExportResult.FAILURE
358 return MetricExportResult.FAILURE
359
360 def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
361 pass
362
363 @property
364 def _exporting(self) -> str:
365 return "metrics"
366
367 def force_flush(self, timeout_millis: float = 10_000) -> bool:
368 return True
369
370
371def _translate_value(value: Any) -> KeyValue:
372
373 if isinstance(value, bool):
374 any_value = AnyValue(bool_value=value)
375
376 elif isinstance(value, str):
377 any_value = AnyValue(string_value=value)
378
379 elif isinstance(value, int):
380 any_value = AnyValue(int_value=value)
381
382 elif isinstance(value, float):
383 any_value = AnyValue(double_value=value)
384
385 elif isinstance(value, Sequence):
386 any_value = AnyValue(
387 array_value=ArrayValue(values=[_translate_value(v) for v in value])
388 )
389
390 elif isinstance(value, Mapping):
391 any_value = AnyValue(
392 kvlist_value=KeyValueList(
393 values=[
394 _translate_key_values(str(k), v) for k, v in value.items()
395 ]
396 )
397 )
398
399 else:
400 raise Exception(f"Invalid type {type(value)} of value {value}")
401
402 return any_value
403
404
405def _translate_key_values(key: str, value: Any) -> KeyValue:
406 return KeyValue(key=key, value=_translate_value(value))
407
408
409def get_resource_data(
410 sdk_resource_scope_data: Dict[SDKResource, Any], # ResourceDataT?
411 resource_class: Callable[..., Resource],
412 name: str,
413) -> List[Resource]:
414
415 resource_data = []
416
417 for (
418 sdk_resource,
419 scope_data,
420 ) in sdk_resource_scope_data.items():
421
422 collector_resource = Resource()
423
424 for key, value in sdk_resource.attributes.items():
425
426 try:
427 # pylint: disable=no-member
428 collector_resource.attributes.append(
429 _translate_key_values(key, value)
430 )
431 except Exception as error: # pylint: disable=broad-except
432 _logger.exception(error)
433
434 resource_data.append(
435 resource_class(
436 **{
437 "resource": collector_resource,
438 "scope_{}".format(name): scope_data.values(),
439 }
440 )
441 )
442
443 return resource_data
444
445
446def _compression_from_env() -> Compression:
447 compression = (
448 environ.get(
449 OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
450 environ.get(OTEL_EXPORTER_OTLP_COMPRESSION, "none"),
451 )
452 .lower()
453 .strip()
454 )
455 return Compression(compression)
456
457
458def _append_metrics_path(endpoint: str) -> str:
459 if endpoint.endswith("/"):
460 return endpoint + DEFAULT_METRICS_EXPORT_PATH
461 return endpoint + f"/{DEFAULT_METRICS_EXPORT_PATH}"