1
#include "source/extensions/stat_sinks/open_telemetry/config.h"
2

            
3
#include "envoy/registry/registry.h"
4

            
5
#include "source/extensions/stat_sinks/open_telemetry/open_telemetry_http_impl.h"
6
#include "source/extensions/stat_sinks/open_telemetry/open_telemetry_impl.h"
7
#include "source/extensions/stat_sinks/open_telemetry/open_telemetry_proto_descriptors.h"
8
#include "source/extensions/tracers/opentelemetry/resource_detectors/resource_provider.h"
9

            
10
namespace Envoy {
11
namespace Extensions {
12
namespace StatSinks {
13
namespace OpenTelemetry {
14

            
15
absl::StatusOr<Stats::SinkPtr>
16
OpenTelemetrySinkFactory::createStatsSink(const Protobuf::Message& config,
17
15
                                          Server::Configuration::ServerFactoryContext& server) {
18
15
  validateProtoDescriptors();
19

            
20
15
  const auto& sink_config = MessageUtil::downcastAndValidate<const SinkConfig&>(
21
15
      config, server.messageValidationContext().staticValidationVisitor());
22

            
23
15
  Tracers::OpenTelemetry::ResourceProviderPtr resource_provider =
24
15
      std::make_unique<Tracers::OpenTelemetry::ResourceProviderImpl>();
25
15
  auto otlp_options = std::make_shared<OtlpOptions>(
26
15
      sink_config,
27
15
      resource_provider->getResource(sink_config.resource_detectors(), server,
28
15
                                     /*service_name=*/""),
29
15
      server);
30
15
  std::shared_ptr<OtlpMetricsFlusher> otlp_metrics_flusher =
31
15
      std::make_shared<OtlpMetricsFlusherImpl>(otlp_options);
32

            
33
15
  switch (sink_config.protocol_specifier_case()) {
34
7
  case SinkConfig::ProtocolSpecifierCase::kGrpcService: {
35
7
    const auto& grpc_service = sink_config.grpc_service();
36

            
37
7
    auto client_or_error =
38
7
        server.clusterManager().grpcAsyncClientManager().getOrCreateRawAsyncClient(
39
7
            grpc_service, server.scope(), false);
40
7
    RETURN_IF_NOT_OK_REF(client_or_error.status());
41
7
    std::shared_ptr<OtlpMetricsExporter> grpc_metrics_exporter =
42
7
        std::make_shared<OpenTelemetryGrpcMetricsExporterImpl>(otlp_options,
43
7
                                                               client_or_error.value());
44

            
45
7
    return std::make_unique<OpenTelemetrySink>(
46
7
        otlp_metrics_flusher, grpc_metrics_exporter,
47
7
        server.timeSource().systemTime().time_since_epoch().count());
48
7
  }
49

            
50
7
  case SinkConfig::ProtocolSpecifierCase::kHttpService: {
51
7
    std::shared_ptr<OtlpMetricsExporter> http_metrics_exporter =
52
7
        std::make_shared<OpenTelemetryHttpMetricsExporter>(server.clusterManager(),
53
7
                                                           sink_config.http_service());
54

            
55
7
    return std::make_unique<OpenTelemetrySink>(
56
7
        otlp_metrics_flusher, http_metrics_exporter,
57
7
        server.timeSource().systemTime().time_since_epoch().count());
58
7
  }
59

            
60
  default:
61
    break;
62
15
  }
63

            
64
  return absl::InvalidArgumentError("unexpected Open Telemetry protocol case num");
65
15
}
66

            
67
21
ProtobufTypes::MessagePtr OpenTelemetrySinkFactory::createEmptyConfigProto() {
68
21
  return std::make_unique<SinkConfig>();
69
21
}
70

            
71
26
std::string OpenTelemetrySinkFactory::name() const { return OpenTelemetryName; }
72

            
73
/**
74
 * Static registration for the this sink factory. @see RegisterFactory.
75
 */
76
LEGACY_REGISTER_FACTORY(OpenTelemetrySinkFactory, Server::Configuration::StatsSinkFactory,
77
                        "envoy.open_telemetry_stat_sink");
78

            
79
} // namespace OpenTelemetry
80
} // namespace StatSinks
81
} // namespace Extensions
82
} // namespace Envoy