Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py: 27%

172 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# Copyright The OpenTelemetry Authors 

2# Licensed under the Apache License, Version 2.0 (the "License"); 

3# you may not use this file except in compliance with the License. 

4# You may obtain a copy of the License at 

5# 

6# http://www.apache.org/licenses/LICENSE-2.0 

7# 

8# Unless required by applicable law or agreed to in writing, software 

9# distributed under the License is distributed on an "AS IS" BASIS, 

10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

11# See the License for the specific language governing permissions and 

12# limitations under the License. 

13 

14import gzip 

15import logging 

16import zlib 

17from os import environ 

18from typing import Dict, Optional, Sequence, Any, Callable, List, Mapping 

19from io import BytesIO 

20from time import sleep 

21 

22from opentelemetry.exporter.otlp.proto.http import Compression 

23from opentelemetry.sdk.metrics._internal.aggregation import Aggregation 

24from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( 

25 ExportMetricsServiceRequest, 

26) 

27from opentelemetry.proto.common.v1.common_pb2 import ( 

28 AnyValue, 

29 ArrayValue, 

30 KeyValue, 

31 KeyValueList, 

32) 

33from opentelemetry.proto.common.v1.common_pb2 import InstrumentationScope 

34from opentelemetry.proto.resource.v1.resource_pb2 import Resource 

35from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 

36from opentelemetry.sdk.environment_variables import ( 

37 OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, 

38 OTEL_EXPORTER_OTLP_ENDPOINT, 

39 OTEL_EXPORTER_OTLP_CERTIFICATE, 

40 OTEL_EXPORTER_OTLP_HEADERS, 

41 OTEL_EXPORTER_OTLP_TIMEOUT, 

42 OTEL_EXPORTER_OTLP_COMPRESSION, 

43 OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, 

44 OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE, 

45 OTEL_EXPORTER_OTLP_METRICS_HEADERS, 

46 OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, 

47 OTEL_EXPORTER_OTLP_METRICS_COMPRESSION, 

48) 

49from opentelemetry.sdk.metrics import ( 

50 Counter, 

51 Histogram, 

52 ObservableCounter, 

53 ObservableGauge, 

54 ObservableUpDownCounter, 

55 UpDownCounter, 

56) 

57from opentelemetry.sdk.metrics.export import ( 

58 AggregationTemporality, 

59 Gauge, 

60 Histogram as HistogramType, 

61 MetricExporter, 

62 MetricExportResult, 

63 MetricsData, 

64 Sum, 

65) 

66from opentelemetry.sdk.resources import Resource as SDKResource 

67from opentelemetry.util.re import parse_env_headers 

68 

69import backoff 

70import requests 

71 

72_logger = logging.getLogger(__name__) 

73 

74 

75DEFAULT_COMPRESSION = Compression.NoCompression 

76DEFAULT_ENDPOINT = "http://localhost:4318/" 

77DEFAULT_METRICS_EXPORT_PATH = "v1/metrics" 

78DEFAULT_TIMEOUT = 10 # in seconds 

79 

80# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff 

81# wait generator API requires a first .send(None) before reading the backoff 

82# values from the generator. 

83_is_backoff_v2 = next(backoff.expo()) is None 

84 

85 

86def _expo(*args, **kwargs): 

87 gen = backoff.expo(*args, **kwargs) 

88 if _is_backoff_v2: 

89 gen.send(None) 

90 return gen 

91 

92 

93class OTLPMetricExporter(MetricExporter): 

94 

95 _MAX_RETRY_TIMEOUT = 64 

96 

97 def __init__( 

98 self, 

99 endpoint: Optional[str] = None, 

100 certificate_file: Optional[str] = None, 

101 headers: Optional[Dict[str, str]] = None, 

102 timeout: Optional[int] = None, 

103 compression: Optional[Compression] = None, 

104 session: Optional[requests.Session] = None, 

105 preferred_temporality: Dict[type, AggregationTemporality] = None, 

106 preferred_aggregation: Dict[type, Aggregation] = None, 

107 ): 

108 self._endpoint = endpoint or environ.get( 

109 OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, 

110 _append_metrics_path( 

111 environ.get(OTEL_EXPORTER_OTLP_ENDPOINT, DEFAULT_ENDPOINT) 

112 ), 

113 ) 

114 self._certificate_file = certificate_file or environ.get( 

115 OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE, 

116 environ.get(OTEL_EXPORTER_OTLP_CERTIFICATE, True), 

117 ) 

118 headers_string = environ.get( 

119 OTEL_EXPORTER_OTLP_METRICS_HEADERS, 

120 environ.get(OTEL_EXPORTER_OTLP_HEADERS, ""), 

121 ) 

122 self._headers = headers or parse_env_headers(headers_string) 

123 self._timeout = timeout or int( 

124 environ.get( 

125 OTEL_EXPORTER_OTLP_METRICS_TIMEOUT, 

126 environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT), 

127 ) 

128 ) 

129 self._compression = compression or _compression_from_env() 

130 self._session = session or requests.Session() 

131 self._session.headers.update(self._headers) 

132 self._session.headers.update( 

133 {"Content-Type": "application/x-protobuf"} 

134 ) 

135 if self._compression is not Compression.NoCompression: 

136 self._session.headers.update( 

137 {"Content-Encoding": self._compression.value} 

138 ) 

139 

140 instrument_class_temporality = {} 

141 if ( 

142 environ.get( 

143 OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, 

144 "CUMULATIVE", 

145 ) 

146 .upper() 

147 .strip() 

148 == "DELTA" 

149 ): 

150 instrument_class_temporality = { 

151 Counter: AggregationTemporality.DELTA, 

152 UpDownCounter: AggregationTemporality.CUMULATIVE, 

153 Histogram: AggregationTemporality.DELTA, 

154 ObservableCounter: AggregationTemporality.DELTA, 

155 ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, 

156 ObservableGauge: AggregationTemporality.CUMULATIVE, 

157 } 

158 else: 

159 instrument_class_temporality = { 

160 Counter: AggregationTemporality.CUMULATIVE, 

161 UpDownCounter: AggregationTemporality.CUMULATIVE, 

162 Histogram: AggregationTemporality.CUMULATIVE, 

163 ObservableCounter: AggregationTemporality.CUMULATIVE, 

164 ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, 

165 ObservableGauge: AggregationTemporality.CUMULATIVE, 

166 } 

167 instrument_class_temporality.update(preferred_temporality or {}) 

168 

169 MetricExporter.__init__( 

170 self, 

171 preferred_temporality=instrument_class_temporality, 

172 preferred_aggregation=preferred_aggregation, 

173 ) 

174 

175 def _export(self, serialized_data: str): 

176 data = serialized_data 

177 if self._compression == Compression.Gzip: 

178 gzip_data = BytesIO() 

179 with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: 

180 gzip_stream.write(serialized_data) 

181 data = gzip_data.getvalue() 

182 elif self._compression == Compression.Deflate: 

183 data = zlib.compress(bytes(serialized_data)) 

184 

185 return self._session.post( 

186 url=self._endpoint, 

187 data=data, 

188 verify=self._certificate_file, 

189 timeout=self._timeout, 

190 ) 

191 

192 @staticmethod 

193 def _retryable(resp: requests.Response) -> bool: 

194 if resp.status_code == 408: 

195 return True 

196 if resp.status_code >= 500 and resp.status_code <= 599: 

197 return True 

198 return False 

199 

200 def _translate_data( 

201 self, data: MetricsData 

202 ) -> ExportMetricsServiceRequest: 

203 

204 resource_metrics_dict = {} 

205 

206 for resource_metrics in data.resource_metrics: 

207 

208 resource = resource_metrics.resource 

209 

210 # It is safe to assume that each entry in data.resource_metrics is 

211 # associated with an unique resource. 

212 scope_metrics_dict = {} 

213 

214 resource_metrics_dict[resource] = scope_metrics_dict 

215 

216 for scope_metrics in resource_metrics.scope_metrics: 

217 

218 instrumentation_scope = scope_metrics.scope 

219 

220 # The SDK groups metrics in instrumentation scopes already so 

221 # there is no need to check for existing instrumentation scopes 

222 # here. 

223 pb2_scope_metrics = pb2.ScopeMetrics( 

224 scope=InstrumentationScope( 

225 name=instrumentation_scope.name, 

226 version=instrumentation_scope.version, 

227 ) 

228 ) 

229 

230 scope_metrics_dict[instrumentation_scope] = pb2_scope_metrics 

231 

232 for metric in scope_metrics.metrics: 

233 pb2_metric = pb2.Metric( 

234 name=metric.name, 

235 description=metric.description, 

236 unit=metric.unit, 

237 ) 

238 

239 if isinstance(metric.data, Gauge): 

240 for data_point in metric.data.data_points: 

241 pt = pb2.NumberDataPoint( 

242 attributes=self._translate_attributes( 

243 data_point.attributes 

244 ), 

245 time_unix_nano=data_point.time_unix_nano, 

246 ) 

247 if isinstance(data_point.value, int): 

248 pt.as_int = data_point.value 

249 else: 

250 pt.as_double = data_point.value 

251 pb2_metric.gauge.data_points.append(pt) 

252 

253 elif isinstance(metric.data, HistogramType): 

254 for data_point in metric.data.data_points: 

255 pt = pb2.HistogramDataPoint( 

256 attributes=self._translate_attributes( 

257 data_point.attributes 

258 ), 

259 time_unix_nano=data_point.time_unix_nano, 

260 start_time_unix_nano=( 

261 data_point.start_time_unix_nano 

262 ), 

263 count=data_point.count, 

264 sum=data_point.sum, 

265 bucket_counts=data_point.bucket_counts, 

266 explicit_bounds=data_point.explicit_bounds, 

267 max=data_point.max, 

268 min=data_point.min, 

269 ) 

270 pb2_metric.histogram.aggregation_temporality = ( 

271 metric.data.aggregation_temporality 

272 ) 

273 pb2_metric.histogram.data_points.append(pt) 

274 

275 elif isinstance(metric.data, Sum): 

276 for data_point in metric.data.data_points: 

277 pt = pb2.NumberDataPoint( 

278 attributes=self._translate_attributes( 

279 data_point.attributes 

280 ), 

281 start_time_unix_nano=( 

282 data_point.start_time_unix_nano 

283 ), 

284 time_unix_nano=data_point.time_unix_nano, 

285 ) 

286 if isinstance(data_point.value, int): 

287 pt.as_int = data_point.value 

288 else: 

289 pt.as_double = data_point.value 

290 # note that because sum is a message type, the 

291 # fields must be set individually rather than 

292 # instantiating a pb2.Sum and setting it once 

293 pb2_metric.sum.aggregation_temporality = ( 

294 metric.data.aggregation_temporality 

295 ) 

296 pb2_metric.sum.is_monotonic = ( 

297 metric.data.is_monotonic 

298 ) 

299 pb2_metric.sum.data_points.append(pt) 

300 else: 

301 _logger.warn( 

302 "unsupported datapoint type %s", metric.point 

303 ) 

304 continue 

305 

306 pb2_scope_metrics.metrics.append(pb2_metric) 

307 

308 return ExportMetricsServiceRequest( 

309 resource_metrics=get_resource_data( 

310 resource_metrics_dict, 

311 pb2.ResourceMetrics, 

312 "metrics", 

313 ) 

314 ) 

315 

316 def _translate_attributes(self, attributes) -> Sequence[KeyValue]: 

317 output = [] 

318 if attributes: 

319 

320 for key, value in attributes.items(): 

321 try: 

322 output.append(_translate_key_values(key, value)) 

323 except Exception as error: # pylint: disable=broad-except 

324 _logger.exception(error) 

325 return output 

326 

327 def export( 

328 self, 

329 metrics_data: MetricsData, 

330 timeout_millis: float = 10_000, 

331 **kwargs, 

332 ) -> MetricExportResult: 

333 serialized_data = self._translate_data(metrics_data) 

334 for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT): 

335 

336 if delay == self._MAX_RETRY_TIMEOUT: 

337 return MetricExportResult.FAILURE 

338 

339 resp = self._export(serialized_data.SerializeToString()) 

340 # pylint: disable=no-else-return 

341 if resp.status_code in (200, 202): 

342 return MetricExportResult.SUCCESS 

343 elif self._retryable(resp): 

344 _logger.warning( 

345 "Transient error %s encountered while exporting metric batch, retrying in %ss.", 

346 resp.reason, 

347 delay, 

348 ) 

349 sleep(delay) 

350 continue 

351 else: 

352 _logger.error( 

353 "Failed to export batch code: %s, reason: %s", 

354 resp.status_code, 

355 resp.text, 

356 ) 

357 return MetricExportResult.FAILURE 

358 return MetricExportResult.FAILURE 

359 

360 def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: 

361 pass 

362 

363 @property 

364 def _exporting(self) -> str: 

365 return "metrics" 

366 

367 def force_flush(self, timeout_millis: float = 10_000) -> bool: 

368 return True 

369 

370 

371def _translate_value(value: Any) -> KeyValue: 

372 

373 if isinstance(value, bool): 

374 any_value = AnyValue(bool_value=value) 

375 

376 elif isinstance(value, str): 

377 any_value = AnyValue(string_value=value) 

378 

379 elif isinstance(value, int): 

380 any_value = AnyValue(int_value=value) 

381 

382 elif isinstance(value, float): 

383 any_value = AnyValue(double_value=value) 

384 

385 elif isinstance(value, Sequence): 

386 any_value = AnyValue( 

387 array_value=ArrayValue(values=[_translate_value(v) for v in value]) 

388 ) 

389 

390 elif isinstance(value, Mapping): 

391 any_value = AnyValue( 

392 kvlist_value=KeyValueList( 

393 values=[ 

394 _translate_key_values(str(k), v) for k, v in value.items() 

395 ] 

396 ) 

397 ) 

398 

399 else: 

400 raise Exception(f"Invalid type {type(value)} of value {value}") 

401 

402 return any_value 

403 

404 

405def _translate_key_values(key: str, value: Any) -> KeyValue: 

406 return KeyValue(key=key, value=_translate_value(value)) 

407 

408 

409def get_resource_data( 

410 sdk_resource_scope_data: Dict[SDKResource, Any], # ResourceDataT? 

411 resource_class: Callable[..., Resource], 

412 name: str, 

413) -> List[Resource]: 

414 

415 resource_data = [] 

416 

417 for ( 

418 sdk_resource, 

419 scope_data, 

420 ) in sdk_resource_scope_data.items(): 

421 

422 collector_resource = Resource() 

423 

424 for key, value in sdk_resource.attributes.items(): 

425 

426 try: 

427 # pylint: disable=no-member 

428 collector_resource.attributes.append( 

429 _translate_key_values(key, value) 

430 ) 

431 except Exception as error: # pylint: disable=broad-except 

432 _logger.exception(error) 

433 

434 resource_data.append( 

435 resource_class( 

436 **{ 

437 "resource": collector_resource, 

438 "scope_{}".format(name): scope_data.values(), 

439 } 

440 ) 

441 ) 

442 

443 return resource_data 

444 

445 

446def _compression_from_env() -> Compression: 

447 compression = ( 

448 environ.get( 

449 OTEL_EXPORTER_OTLP_METRICS_COMPRESSION, 

450 environ.get(OTEL_EXPORTER_OTLP_COMPRESSION, "none"), 

451 ) 

452 .lower() 

453 .strip() 

454 ) 

455 return Compression(compression) 

456 

457 

458def _append_metrics_path(endpoint: str) -> str: 

459 if endpoint.endswith("/"): 

460 return endpoint + DEFAULT_METRICS_EXPORT_PATH 

461 return endpoint + f"/{DEFAULT_METRICS_EXPORT_PATH}"