// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 // Package otelconf provides an OpenTelemetry declarative configuration SDK. package otelconf // import "go.opentelemetry.io/contrib/otelconf/v0.3.0" import ( "context" "crypto/tls" "crypto/x509" "errors" "fmt" "os" "go.opentelemetry.io/otel/baggage" "go.opentelemetry.io/otel/log" nooplog "go.opentelemetry.io/otel/log/noop" "go.opentelemetry.io/otel/metric" noopmetric "go.opentelemetry.io/otel/metric/noop" sdklog "go.opentelemetry.io/otel/sdk/log" sdkmetric "go.opentelemetry.io/otel/sdk/metric" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" nooptrace "go.opentelemetry.io/otel/trace/noop" yaml "go.yaml.in/yaml/v3" ) const ( protocolProtobufHTTP = "http/protobuf" protocolProtobufGRPC = "grpc" compressionGzip = "gzip" compressionNone = "none" ) type configOptions struct { ctx context.Context opentelemetryConfig OpenTelemetryConfiguration loggerProviderOptions []sdklog.LoggerProviderOption meterProviderOptions []sdkmetric.Option tracerProviderOptions []sdktrace.TracerProviderOption } type shutdownFunc func(context.Context) error func noopShutdown(context.Context) error { return nil } // SDK is a struct that contains all the providers // configured via the configuration model. type SDK struct { meterProvider metric.MeterProvider tracerProvider trace.TracerProvider loggerProvider log.LoggerProvider shutdown shutdownFunc } // TracerProvider returns a configured trace.TracerProvider. func (s *SDK) TracerProvider() trace.TracerProvider { return s.tracerProvider } // MeterProvider returns a configured metric.MeterProvider. func (s *SDK) MeterProvider() metric.MeterProvider { return s.meterProvider } // LoggerProvider returns a configured log.LoggerProvider. func (s *SDK) LoggerProvider() log.LoggerProvider { return s.loggerProvider } // Shutdown calls shutdown on all configured providers. func (s *SDK) Shutdown(ctx context.Context) error { return s.shutdown(ctx) } var noopSDK = SDK{ loggerProvider: nooplog.LoggerProvider{}, meterProvider: noopmetric.MeterProvider{}, tracerProvider: nooptrace.TracerProvider{}, shutdown: func(context.Context) error { return nil }, } // NewSDK creates SDK providers based on the configuration model. func NewSDK(opts ...ConfigurationOption) (SDK, error) { o := configOptions{ ctx: context.Background(), } for _, opt := range opts { o = opt.apply(o) } if o.opentelemetryConfig.Disabled != nil && *o.opentelemetryConfig.Disabled { return noopSDK, nil } r := newResource(o.opentelemetryConfig.Resource) mp, mpShutdown, err := meterProvider(o, r) if err != nil { return noopSDK, err } tp, tpShutdown, err := tracerProvider(o, r) if err != nil { return noopSDK, err } lp, lpShutdown, err := loggerProvider(o, r) if err != nil { return noopSDK, err } return SDK{ meterProvider: mp, tracerProvider: tp, loggerProvider: lp, shutdown: func(ctx context.Context) error { return errors.Join(mpShutdown(ctx), tpShutdown(ctx), lpShutdown(ctx)) }, }, nil } // ConfigurationOption configures options for providers. type ConfigurationOption interface { apply(configOptions) configOptions } type configurationOptionFunc func(configOptions) configOptions func (fn configurationOptionFunc) apply(cfg configOptions) configOptions { return fn(cfg) } // WithContext sets the context.Context for the SDK. func WithContext(ctx context.Context) ConfigurationOption { return configurationOptionFunc(func(c configOptions) configOptions { c.ctx = ctx return c }) } // WithOpenTelemetryConfiguration sets the OpenTelemetryConfiguration used // to produce the SDK. func WithOpenTelemetryConfiguration(cfg OpenTelemetryConfiguration) ConfigurationOption { return configurationOptionFunc(func(c configOptions) configOptions { c.opentelemetryConfig = cfg return c }) } // WithLoggerProviderOptions appends LoggerProviderOptions used for constructing // the LoggerProvider. OpenTelemetryConfiguration takes precedence over these options. func WithLoggerProviderOptions(opts ...sdklog.LoggerProviderOption) ConfigurationOption { return configurationOptionFunc(func(c configOptions) configOptions { c.loggerProviderOptions = append(c.loggerProviderOptions, opts...) return c }) } // WithMeterProviderOptions appends metric.Options used for constructing the // MeterProvider. OpenTelemetryConfiguration takes precedence over these options. func WithMeterProviderOptions(opts ...sdkmetric.Option) ConfigurationOption { return configurationOptionFunc(func(c configOptions) configOptions { c.meterProviderOptions = append(c.meterProviderOptions, opts...) return c }) } // WithTracerProviderOptions appends TracerProviderOptions used for constructing // the TracerProvider. OpenTelemetryConfiguration takes precedence over these options. func WithTracerProviderOptions(opts ...sdktrace.TracerProviderOption) ConfigurationOption { return configurationOptionFunc(func(c configOptions) configOptions { c.tracerProviderOptions = append(c.tracerProviderOptions, opts...) return c }) } // ParseYAML parses a YAML configuration file into an OpenTelemetryConfiguration. func ParseYAML(file []byte) (*OpenTelemetryConfiguration, error) { var cfg OpenTelemetryConfiguration err := yaml.Unmarshal(file, &cfg) if err != nil { return nil, err } return &cfg, nil } // createTLSConfig creates a tls.Config from certificate files. func createTLSConfig(caCertFile, clientCertFile, clientKeyFile *string) (*tls.Config, error) { tlsConfig := &tls.Config{} if caCertFile != nil { caText, err := os.ReadFile(*caCertFile) if err != nil { return nil, err } certPool := x509.NewCertPool() if !certPool.AppendCertsFromPEM(caText) { return nil, errors.New("could not create certificate authority chain from certificate") } tlsConfig.RootCAs = certPool } if clientCertFile != nil { if clientKeyFile == nil { return nil, errors.New("client certificate was provided but no client key was provided") } clientCert, err := tls.LoadX509KeyPair(*clientCertFile, *clientKeyFile) if err != nil { return nil, fmt.Errorf("could not use client certificate: %w", err) } tlsConfig.Certificates = []tls.Certificate{clientCert} } return tlsConfig, nil } // createHeadersConfig combines the two header config fields. Headers take precedence over headersList. func createHeadersConfig(headers []NameStringValuePair, headersList *string) (map[string]string, error) { result := make(map[string]string) if headersList != nil { // Parsing follows https://github.com/open-telemetry/opentelemetry-configuration/blob/568e5080816d40d75792eb754fc96bde09654159/schema/type_descriptions.yaml#L584. headerslist, err := baggage.Parse(*headersList) if err != nil { return nil, fmt.Errorf("invalid headers list: %w", err) } for _, kv := range headerslist.Members() { result[kv.Key()] = kv.Value() } } // Headers take precedence over HeadersList, so this has to be after HeadersList is processed if len(headers) > 0 { for _, kv := range headers { if kv.Value != nil { result[kv.Name] = *kv.Value } } } return result, nil }
// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package otelconf // import "go.opentelemetry.io/contrib/otelconf/v0.3.0" import ( "encoding/json" "errors" "fmt" "reflect" ) // MarshalJSON implements json.Marshaler. func (j *AttributeNameValueType) MarshalJSON() ([]byte, error) { return json.Marshal(j.Value) } var enumValuesAttributeNameValueType = []any{ nil, "string", "bool", "int", "double", "string_array", "bool_array", "int_array", "double_array", } // UnmarshalJSON implements json.Unmarshaler. func (j *AttributeNameValueType) UnmarshalJSON(b []byte) error { var v struct { Value any } if err := json.Unmarshal(b, &v.Value); err != nil { return err } var ok bool for _, expected := range enumValuesAttributeNameValueType { if reflect.DeepEqual(v.Value, expected) { ok = true break } } if !ok { return fmt.Errorf("invalid value (expected one of %#v): %#v", enumValuesAttributeNameValueType, v.Value) } *j = AttributeNameValueType(v) return nil } // UnmarshalJSON implements json.Unmarshaler. func (j *BatchLogRecordProcessor) UnmarshalJSON(b []byte) error { var raw map[string]any if err := json.Unmarshal(b, &raw); err != nil { return err } if _, ok := raw["exporter"]; raw != nil && !ok { return errors.New("field exporter in BatchLogRecordProcessor: required") } type Plain BatchLogRecordProcessor var plain Plain if err := json.Unmarshal(b, &plain); err != nil { return err } *j = BatchLogRecordProcessor(plain) return nil } // UnmarshalJSON implements json.Unmarshaler. func (j *BatchSpanProcessor) UnmarshalJSON(b []byte) error { var raw map[string]any if err := json.Unmarshal(b, &raw); err != nil { return err } if _, ok := raw["exporter"]; raw != nil && !ok { return errors.New("field exporter in BatchSpanProcessor: required") } type Plain BatchSpanProcessor var plain Plain if err := json.Unmarshal(b, &plain); err != nil { return err } *j = BatchSpanProcessor(plain) return nil } // UnmarshalJSON implements json.Unmarshaler. func (j *GeneralInstrumentationPeerServiceMappingElem) UnmarshalJSON(b []byte) error { var raw map[string]any if err := json.Unmarshal(b, &raw); err != nil { return err } if _, ok := raw["peer"]; raw != nil && !ok { return errors.New("field peer in GeneralInstrumentationPeerServiceMappingElem: required") } if _, ok := raw["service"]; raw != nil && !ok { return errors.New("field service in GeneralInstrumentationPeerServiceMappingElem: required") } type Plain GeneralInstrumentationPeerServiceMappingElem var plain Plain if err := json.Unmarshal(b, &plain); err != nil { return err } *j = GeneralInstrumentationPeerServiceMappingElem(plain) return nil } // UnmarshalJSON implements json.Unmarshaler. func (j *NameStringValuePair) UnmarshalJSON(b []byte) error { var raw map[string]any if err := json.Unmarshal(b, &raw); err != nil { return err } if _, ok := raw["name"]; !ok { return errors.New("json: cannot unmarshal field name in NameStringValuePair required") } if _, ok := raw["value"]; !ok { return errors.New("json: cannot unmarshal field value in NameStringValuePair required") } var name, value string var ok bool if name, ok = raw["name"].(string); !ok { return errors.New("yaml: cannot unmarshal field name in NameStringValuePair must be string") } if value, ok = raw["value"].(string); !ok { return errors.New("yaml: cannot unmarshal field value in NameStringValuePair must be string") } *j = NameStringValuePair{ Name: name, Value: &value, } return nil } var enumValuesOTLPMetricDefaultHistogramAggregation = []any{ "explicit_bucket_histogram", "base2_exponential_bucket_histogram", } // UnmarshalJSON implements json.Unmarshaler. func (j *OTLPMetricDefaultHistogramAggregation) UnmarshalJSON(b []byte) error { var v string if err := json.Unmarshal(b, &v); err != nil { return err } var ok bool for _, expected := range enumValuesOTLPMetricDefaultHistogramAggregation { if reflect.DeepEqual(v, expected) { ok = true break } } if !ok { return fmt.Errorf("invalid value (expected one of %#v): %#v", enumValuesOTLPMetricDefaultHistogramAggregation, v) } *j = OTLPMetricDefaultHistogramAggregation(v) return nil } // UnmarshalJSON implements json.Unmarshaler. func (j *OTLPMetric) UnmarshalJSON(b []byte) error { var raw map[string]any if err := json.Unmarshal(b, &raw); err != nil { return err } if _, ok := raw["endpoint"]; raw != nil && !ok { return errors.New("field endpoint in OTLPMetric: required") } if _, ok := raw["protocol"]; raw != nil && !ok { return errors.New("field protocol in OTLPMetric: required") } type Plain OTLPMetric var plain Plain if err := json.Unmarshal(b, &plain); err != nil { return err } *j = OTLPMetric(plain) return nil } // UnmarshalJSON implements json.Unmarshaler. func (j *OTLP) UnmarshalJSON(b []byte) error { var raw map[string]any if err := json.Unmarshal(b, &raw); err != nil { return err } if _, ok := raw["endpoint"]; raw != nil && !ok { return errors.New("field endpoint in OTLP: required") } if _, ok := raw["protocol"]; raw != nil && !ok { return errors.New("field protocol in OTLP: required") } type Plain OTLP var plain Plain if err := json.Unmarshal(b, &plain); err != nil { return err } *j = OTLP(plain) return nil } // UnmarshalJSON implements json.Unmarshaler. func (j *OpenTelemetryConfiguration) UnmarshalJSON(b []byte) error { var raw map[string]any if err := json.Unmarshal(b, &raw); err != nil { return err } if _, ok := raw["file_format"]; raw != nil && !ok { return errors.New("field file_format in OpenTelemetryConfiguration: required") } type Plain OpenTelemetryConfiguration var plain Plain if err := json.Unmarshal(b, &plain); err != nil { return err } *j = OpenTelemetryConfiguration(plain) return nil } // UnmarshalJSON implements json.Unmarshaler. func (j *PeriodicMetricReader) UnmarshalJSON(b []byte) error { var raw map[string]any if err := json.Unmarshal(b, &raw); err != nil { return err } if _, ok := raw["exporter"]; raw != nil && !ok { return errors.New("field exporter in PeriodicMetricReader: required") } type Plain PeriodicMetricReader var plain Plain if err := json.Unmarshal(b, &plain); err != nil { return err } *j = PeriodicMetricReader(plain) return nil } // UnmarshalJSON implements json.Unmarshaler. func (j *PullMetricReader) UnmarshalJSON(b []byte) error { var raw map[string]any if err := json.Unmarshal(b, &raw); err != nil { return err } if _, ok := raw["exporter"]; raw != nil && !ok { return errors.New("field exporter in PullMetricReader: required") } type Plain PullMetricReader var plain Plain if err := json.Unmarshal(b, &plain); err != nil { return err } *j = PullMetricReader(plain) return nil } // UnmarshalJSON implements json.Unmarshaler. func (j *SimpleLogRecordProcessor) UnmarshalJSON(b []byte) error { var raw map[string]any if err := json.Unmarshal(b, &raw); err != nil { return err } if _, ok := raw["exporter"]; raw != nil && !ok { return errors.New("field exporter in SimpleLogRecordProcessor: required") } type Plain SimpleLogRecordProcessor var plain Plain if err := json.Unmarshal(b, &plain); err != nil { return err } *j = SimpleLogRecordProcessor(plain) return nil } // UnmarshalJSON implements json.Unmarshaler. func (j *SimpleSpanProcessor) UnmarshalJSON(b []byte) error { var raw map[string]any if err := json.Unmarshal(b, &raw); err != nil { return err } if _, ok := raw["exporter"]; raw != nil && !ok { return errors.New("field exporter in SimpleSpanProcessor: required") } type Plain SimpleSpanProcessor var plain Plain if err := json.Unmarshal(b, &plain); err != nil { return err } *j = SimpleSpanProcessor(plain) return nil } var enumValuesViewSelectorInstrumentType = []any{ "counter", "histogram", "observable_counter", "observable_gauge", "observable_up_down_counter", "up_down_counter", } // UnmarshalJSON implements json.Unmarshaler. func (j *ViewSelectorInstrumentType) UnmarshalJSON(b []byte) error { var v string if err := json.Unmarshal(b, &v); err != nil { return err } var ok bool for _, expected := range enumValuesViewSelectorInstrumentType { if reflect.DeepEqual(v, expected) { ok = true break } } if !ok { return fmt.Errorf("invalid value (expected one of %#v): %#v", enumValuesViewSelectorInstrumentType, v) } *j = ViewSelectorInstrumentType(v) return nil } // UnmarshalJSON implements json.Unmarshaler. func (j *Zipkin) UnmarshalJSON(b []byte) error { var raw map[string]any if err := json.Unmarshal(b, &raw); err != nil { return err } if _, ok := raw["endpoint"]; raw != nil && !ok { return errors.New("field endpoint in Zipkin: required") } type Plain Zipkin var plain Plain if err := json.Unmarshal(b, &plain); err != nil { return err } *j = Zipkin(plain) return nil } // UnmarshalJSON implements json.Unmarshaler. func (j *AttributeNameValue) UnmarshalJSON(b []byte) error { var raw map[string]any if err := json.Unmarshal(b, &raw); err != nil { return err } if _, ok := raw["name"]; raw != nil && !ok { return errors.New("field name in AttributeNameValue: required") } if _, ok := raw["value"]; raw != nil && !ok { return errors.New("field value in AttributeNameValue: required") } type Plain AttributeNameValue var plain Plain if err := json.Unmarshal(b, &plain); err != nil { return err } if plain.Type != nil && plain.Type.Value == "int" { val, ok := plain.Value.(float64) if ok { plain.Value = int(val) } } if plain.Type != nil && plain.Type.Value == "int_array" { m, ok := plain.Value.([]any) if ok { var vals []any for _, v := range m { val, ok := v.(float64) if ok { vals = append(vals, int(val)) } else { vals = append(vals, val) } } plain.Value = vals } } *j = AttributeNameValue(plain) return nil }
// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package otelconf // import "go.opentelemetry.io/contrib/otelconf/v0.3.0" import ( "errors" "fmt" "reflect" ) // UnmarshalYAML implements yaml.Unmarshaler. func (j *AttributeNameValueType) UnmarshalYAML(unmarshal func(any) error) error { var v struct { Value any } if err := unmarshal(&v.Value); err != nil { return err } var ok bool for _, expected := range enumValuesAttributeNameValueType { if reflect.DeepEqual(v.Value, expected) { ok = true break } } if !ok { return fmt.Errorf("invalid value (expected one of %#v): %#v", enumValuesAttributeNameValueType, v.Value) } *j = AttributeNameValueType(v) return nil } // UnmarshalYAML implements yaml.Unmarshaler. func (j *NameStringValuePair) UnmarshalYAML(unmarshal func(any) error) error { var raw map[string]any if err := unmarshal(&raw); err != nil { return err } if _, ok := raw["name"]; !ok { return errors.New("yaml: cannot unmarshal field name in NameStringValuePair required") } if _, ok := raw["value"]; !ok { return errors.New("yaml: cannot unmarshal field value in NameStringValuePair required") } var name, value string var ok bool if name, ok = raw["name"].(string); !ok { return errors.New("yaml: cannot unmarshal field name in NameStringValuePair must be string") } if value, ok = raw["value"].(string); !ok { return errors.New("yaml: cannot unmarshal field value in NameStringValuePair must be string") } *j = NameStringValuePair{ Name: name, Value: &value, } return nil } // UnmarshalYAML implements yaml.Unmarshaler. func (j *LanguageSpecificInstrumentation) UnmarshalYAML(unmarshal func(any) error) error { var raw map[string]any if err := unmarshal(&raw); err != nil { return err } *j = raw return nil }
// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package otelconf // import "go.opentelemetry.io/contrib/otelconf/v0.3.0" import ( "context" "errors" "fmt" "net/url" "time" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp" "go.opentelemetry.io/otel/exporters/stdout/stdoutlog" "go.opentelemetry.io/otel/log" "go.opentelemetry.io/otel/log/noop" sdklog "go.opentelemetry.io/otel/sdk/log" "go.opentelemetry.io/otel/sdk/resource" "google.golang.org/grpc/credentials" ) func loggerProvider(cfg configOptions, res *resource.Resource) (log.LoggerProvider, shutdownFunc, error) { if cfg.opentelemetryConfig.LoggerProvider == nil { return noop.NewLoggerProvider(), noopShutdown, nil } opts := append(cfg.loggerProviderOptions, sdklog.WithResource(res)) var errs []error for _, processor := range cfg.opentelemetryConfig.LoggerProvider.Processors { sp, err := logProcessor(cfg.ctx, processor) if err == nil { opts = append(opts, sdklog.WithProcessor(sp)) } else { errs = append(errs, err) } } if len(errs) > 0 { return noop.NewLoggerProvider(), noopShutdown, errors.Join(errs...) } lp := sdklog.NewLoggerProvider(opts...) return lp, lp.Shutdown, nil } func logProcessor(ctx context.Context, processor LogRecordProcessor) (sdklog.Processor, error) { if processor.Batch != nil && processor.Simple != nil { return nil, errors.New("must not specify multiple log processor type") } if processor.Batch != nil { exp, err := logExporter(ctx, processor.Batch.Exporter) if err != nil { return nil, err } return batchLogProcessor(processor.Batch, exp) } if processor.Simple != nil { exp, err := logExporter(ctx, processor.Simple.Exporter) if err != nil { return nil, err } return sdklog.NewSimpleProcessor(exp), nil } return nil, errors.New("unsupported log processor type, must be one of simple or batch") } func logExporter(ctx context.Context, exporter LogRecordExporter) (sdklog.Exporter, error) { if exporter.Console != nil && exporter.OTLP != nil { return nil, errors.New("must not specify multiple exporters") } if exporter.Console != nil { return stdoutlog.New( stdoutlog.WithPrettyPrint(), ) } if exporter.OTLP != nil && exporter.OTLP.Protocol != nil { switch *exporter.OTLP.Protocol { case protocolProtobufHTTP: return otlpHTTPLogExporter(ctx, exporter.OTLP) case protocolProtobufGRPC: return otlpGRPCLogExporter(ctx, exporter.OTLP) default: return nil, fmt.Errorf("unsupported protocol %q", *exporter.OTLP.Protocol) } } return nil, errors.New("no valid log exporter") } func batchLogProcessor(blp *BatchLogRecordProcessor, exp sdklog.Exporter) (*sdklog.BatchProcessor, error) { var opts []sdklog.BatchProcessorOption if blp.ExportTimeout != nil { if *blp.ExportTimeout < 0 { return nil, fmt.Errorf("invalid export timeout %d", *blp.ExportTimeout) } opts = append(opts, sdklog.WithExportTimeout(time.Millisecond*time.Duration(*blp.ExportTimeout))) } if blp.MaxExportBatchSize != nil { if *blp.MaxExportBatchSize < 0 { return nil, fmt.Errorf("invalid batch size %d", *blp.MaxExportBatchSize) } opts = append(opts, sdklog.WithExportMaxBatchSize(*blp.MaxExportBatchSize)) } if blp.MaxQueueSize != nil { if *blp.MaxQueueSize < 0 { return nil, fmt.Errorf("invalid queue size %d", *blp.MaxQueueSize) } opts = append(opts, sdklog.WithMaxQueueSize(*blp.MaxQueueSize)) } if blp.ScheduleDelay != nil { if *blp.ScheduleDelay < 0 { return nil, fmt.Errorf("invalid schedule delay %d", *blp.ScheduleDelay) } opts = append(opts, sdklog.WithExportInterval(time.Millisecond*time.Duration(*blp.ScheduleDelay))) } return sdklog.NewBatchProcessor(exp, opts...), nil } func otlpHTTPLogExporter(ctx context.Context, otlpConfig *OTLP) (sdklog.Exporter, error) { var opts []otlploghttp.Option if otlpConfig.Endpoint != nil { u, err := url.ParseRequestURI(*otlpConfig.Endpoint) if err != nil { return nil, err } opts = append(opts, otlploghttp.WithEndpoint(u.Host)) if u.Scheme == "http" { opts = append(opts, otlploghttp.WithInsecure()) } if u.Path != "" { opts = append(opts, otlploghttp.WithURLPath(u.Path)) } } if otlpConfig.Compression != nil { switch *otlpConfig.Compression { case compressionGzip: opts = append(opts, otlploghttp.WithCompression(otlploghttp.GzipCompression)) case compressionNone: opts = append(opts, otlploghttp.WithCompression(otlploghttp.NoCompression)) default: return nil, fmt.Errorf("unsupported compression %q", *otlpConfig.Compression) } } if otlpConfig.Timeout != nil && *otlpConfig.Timeout > 0 { opts = append(opts, otlploghttp.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout))) } headersConfig, err := createHeadersConfig(otlpConfig.Headers, otlpConfig.HeadersList) if err != nil { return nil, err } if len(headersConfig) > 0 { opts = append(opts, otlploghttp.WithHeaders(headersConfig)) } tlsConfig, err := createTLSConfig(otlpConfig.Certificate, otlpConfig.ClientCertificate, otlpConfig.ClientKey) if err != nil { return nil, err } opts = append(opts, otlploghttp.WithTLSClientConfig(tlsConfig)) return otlploghttp.New(ctx, opts...) } func otlpGRPCLogExporter(ctx context.Context, otlpConfig *OTLP) (sdklog.Exporter, error) { var opts []otlploggrpc.Option if otlpConfig.Endpoint != nil { u, err := url.ParseRequestURI(*otlpConfig.Endpoint) if err != nil { return nil, err } // ParseRequestURI leaves the Host field empty when no // scheme is specified (i.e. localhost:4317). This check is // here to support the case where a user may not specify a // scheme. The code does its best effort here by using // otlpConfig.Endpoint as-is in that case if u.Host != "" { opts = append(opts, otlploggrpc.WithEndpoint(u.Host)) } else { opts = append(opts, otlploggrpc.WithEndpoint(*otlpConfig.Endpoint)) } if u.Scheme == "http" || (u.Scheme != "https" && otlpConfig.Insecure != nil && *otlpConfig.Insecure) { opts = append(opts, otlploggrpc.WithInsecure()) } } if otlpConfig.Compression != nil { switch *otlpConfig.Compression { case compressionGzip: opts = append(opts, otlploggrpc.WithCompressor(*otlpConfig.Compression)) case compressionNone: // none requires no options default: return nil, fmt.Errorf("unsupported compression %q", *otlpConfig.Compression) } } if otlpConfig.Timeout != nil && *otlpConfig.Timeout > 0 { opts = append(opts, otlploggrpc.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout))) } headersConfig, err := createHeadersConfig(otlpConfig.Headers, otlpConfig.HeadersList) if err != nil { return nil, err } if len(headersConfig) > 0 { opts = append(opts, otlploggrpc.WithHeaders(headersConfig)) } if otlpConfig.Certificate != nil || otlpConfig.ClientCertificate != nil || otlpConfig.ClientKey != nil { tlsConfig, err := createTLSConfig(otlpConfig.Certificate, otlpConfig.ClientCertificate, otlpConfig.ClientKey) if err != nil { return nil, err } opts = append(opts, otlploggrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig))) } return otlploggrpc.New(ctx, opts...) }
// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package otelconf // import "go.opentelemetry.io/contrib/otelconf/v0.3.0" import ( "context" "encoding/json" "errors" "fmt" "math" "net" "net/http" "net/url" "os" "strconv" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" otelprom "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/sdk/instrumentation" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" "google.golang.org/grpc/credentials" ) var zeroScope instrumentation.Scope const instrumentKindUndefined = sdkmetric.InstrumentKind(0) func meterProvider(cfg configOptions, res *resource.Resource) (metric.MeterProvider, shutdownFunc, error) { if cfg.opentelemetryConfig.MeterProvider == nil { return noop.NewMeterProvider(), noopShutdown, nil } opts := append(cfg.meterProviderOptions, sdkmetric.WithResource(res)) var errs []error for _, reader := range cfg.opentelemetryConfig.MeterProvider.Readers { r, err := metricReader(cfg.ctx, reader) if err == nil { opts = append(opts, sdkmetric.WithReader(r)) } else { errs = append(errs, err) } } for _, vw := range cfg.opentelemetryConfig.MeterProvider.Views { v, err := view(vw) if err == nil { opts = append(opts, sdkmetric.WithView(v)) } else { errs = append(errs, err) } } if len(errs) > 0 { return noop.NewMeterProvider(), noopShutdown, errors.Join(errs...) } mp := sdkmetric.NewMeterProvider(opts...) return mp, mp.Shutdown, nil } func metricReader(ctx context.Context, r MetricReader) (sdkmetric.Reader, error) { if r.Periodic != nil && r.Pull != nil { return nil, errors.New("must not specify multiple metric reader type") } if r.Periodic != nil { var opts []sdkmetric.PeriodicReaderOption if r.Periodic.Interval != nil { opts = append(opts, sdkmetric.WithInterval(time.Duration(*r.Periodic.Interval)*time.Millisecond)) } if r.Periodic.Timeout != nil { opts = append(opts, sdkmetric.WithTimeout(time.Duration(*r.Periodic.Timeout)*time.Millisecond)) } return periodicExporter(ctx, r.Periodic.Exporter, opts...) } if r.Pull != nil { return pullReader(ctx, r.Pull.Exporter) } return nil, errors.New("no valid metric reader") } func pullReader(ctx context.Context, exporter PullMetricExporter) (sdkmetric.Reader, error) { if exporter.Prometheus != nil { return prometheusReader(ctx, exporter.Prometheus) } return nil, errors.New("no valid metric exporter") } func periodicExporter(ctx context.Context, exporter PushMetricExporter, opts ...sdkmetric.PeriodicReaderOption) (sdkmetric.Reader, error) { if exporter.Console != nil && exporter.OTLP != nil { return nil, errors.New("must not specify multiple exporters") } if exporter.Console != nil { enc := json.NewEncoder(os.Stdout) enc.SetIndent("", " ") exp, err := stdoutmetric.New( stdoutmetric.WithEncoder(enc), ) if err != nil { return nil, err } return sdkmetric.NewPeriodicReader(exp, opts...), nil } if exporter.OTLP != nil && exporter.OTLP.Protocol != nil { var err error var exp sdkmetric.Exporter switch *exporter.OTLP.Protocol { case protocolProtobufHTTP: exp, err = otlpHTTPMetricExporter(ctx, exporter.OTLP) case protocolProtobufGRPC: exp, err = otlpGRPCMetricExporter(ctx, exporter.OTLP) default: return nil, fmt.Errorf("unsupported protocol %q", *exporter.OTLP.Protocol) } if err != nil { return nil, err } return sdkmetric.NewPeriodicReader(exp, opts...), nil } return nil, errors.New("no valid metric exporter") } func otlpHTTPMetricExporter(ctx context.Context, otlpConfig *OTLPMetric) (sdkmetric.Exporter, error) { opts := []otlpmetrichttp.Option{} if otlpConfig.Endpoint != nil { u, err := url.ParseRequestURI(*otlpConfig.Endpoint) if err != nil { return nil, err } opts = append(opts, otlpmetrichttp.WithEndpoint(u.Host)) if u.Scheme == "http" { opts = append(opts, otlpmetrichttp.WithInsecure()) } if u.Path != "" { opts = append(opts, otlpmetrichttp.WithURLPath(u.Path)) } } if otlpConfig.Compression != nil { switch *otlpConfig.Compression { case compressionGzip: opts = append(opts, otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression)) case compressionNone: opts = append(opts, otlpmetrichttp.WithCompression(otlpmetrichttp.NoCompression)) default: return nil, fmt.Errorf("unsupported compression %q", *otlpConfig.Compression) } } if otlpConfig.Timeout != nil { opts = append(opts, otlpmetrichttp.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout))) } headersConfig, err := createHeadersConfig(otlpConfig.Headers, otlpConfig.HeadersList) if err != nil { return nil, err } if len(headersConfig) > 0 { opts = append(opts, otlpmetrichttp.WithHeaders(headersConfig)) } if otlpConfig.TemporalityPreference != nil { switch *otlpConfig.TemporalityPreference { case "delta": opts = append(opts, otlpmetrichttp.WithTemporalitySelector(deltaTemporality)) case "cumulative": opts = append(opts, otlpmetrichttp.WithTemporalitySelector(cumulativeTemporality)) case "lowmemory": opts = append(opts, otlpmetrichttp.WithTemporalitySelector(lowMemory)) default: return nil, fmt.Errorf("unsupported temporality preference %q", *otlpConfig.TemporalityPreference) } } tlsConfig, err := createTLSConfig(otlpConfig.Certificate, otlpConfig.ClientCertificate, otlpConfig.ClientKey) if err != nil { return nil, err } opts = append(opts, otlpmetrichttp.WithTLSClientConfig(tlsConfig)) return otlpmetrichttp.New(ctx, opts...) } func otlpGRPCMetricExporter(ctx context.Context, otlpConfig *OTLPMetric) (sdkmetric.Exporter, error) { var opts []otlpmetricgrpc.Option if otlpConfig.Endpoint != nil { u, err := url.ParseRequestURI(*otlpConfig.Endpoint) if err != nil { return nil, err } // ParseRequestURI leaves the Host field empty when no // scheme is specified (i.e. localhost:4317). This check is // here to support the case where a user may not specify a // scheme. The code does its best effort here by using // otlpConfig.Endpoint as-is in that case if u.Host != "" { opts = append(opts, otlpmetricgrpc.WithEndpoint(u.Host)) } else { opts = append(opts, otlpmetricgrpc.WithEndpoint(*otlpConfig.Endpoint)) } if u.Scheme == "http" || (u.Scheme != "https" && otlpConfig.Insecure != nil && *otlpConfig.Insecure) { opts = append(opts, otlpmetricgrpc.WithInsecure()) } } if otlpConfig.Compression != nil { switch *otlpConfig.Compression { case compressionGzip: opts = append(opts, otlpmetricgrpc.WithCompressor(*otlpConfig.Compression)) case compressionNone: // none requires no options default: return nil, fmt.Errorf("unsupported compression %q", *otlpConfig.Compression) } } if otlpConfig.Timeout != nil && *otlpConfig.Timeout > 0 { opts = append(opts, otlpmetricgrpc.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout))) } headersConfig, err := createHeadersConfig(otlpConfig.Headers, otlpConfig.HeadersList) if err != nil { return nil, err } if len(headersConfig) > 0 { opts = append(opts, otlpmetricgrpc.WithHeaders(headersConfig)) } if otlpConfig.TemporalityPreference != nil { switch *otlpConfig.TemporalityPreference { case "delta": opts = append(opts, otlpmetricgrpc.WithTemporalitySelector(deltaTemporality)) case "cumulative": opts = append(opts, otlpmetricgrpc.WithTemporalitySelector(cumulativeTemporality)) case "lowmemory": opts = append(opts, otlpmetricgrpc.WithTemporalitySelector(lowMemory)) default: return nil, fmt.Errorf("unsupported temporality preference %q", *otlpConfig.TemporalityPreference) } } if otlpConfig.Certificate != nil || otlpConfig.ClientCertificate != nil || otlpConfig.ClientKey != nil { tlsConfig, err := createTLSConfig(otlpConfig.Certificate, otlpConfig.ClientCertificate, otlpConfig.ClientKey) if err != nil { return nil, err } opts = append(opts, otlpmetricgrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig))) } return otlpmetricgrpc.New(ctx, opts...) } func cumulativeTemporality(sdkmetric.InstrumentKind) metricdata.Temporality { return metricdata.CumulativeTemporality } func deltaTemporality(ik sdkmetric.InstrumentKind) metricdata.Temporality { switch ik { case sdkmetric.InstrumentKindCounter, sdkmetric.InstrumentKindHistogram, sdkmetric.InstrumentKindObservableCounter: return metricdata.DeltaTemporality default: return metricdata.CumulativeTemporality } } func lowMemory(ik sdkmetric.InstrumentKind) metricdata.Temporality { switch ik { case sdkmetric.InstrumentKindCounter, sdkmetric.InstrumentKindHistogram: return metricdata.DeltaTemporality default: return metricdata.CumulativeTemporality } } // newIncludeExcludeFilter returns a Filter that includes attributes // in the include list and excludes attributes in the excludes list. // It returns an error if an attribute is in both lists // // If IncludeExclude is empty a include-all filter is returned. func newIncludeExcludeFilter(lists *IncludeExclude) (attribute.Filter, error) { if lists == nil { return func(attribute.KeyValue) bool { return true }, nil } included := make(map[attribute.Key]struct{}) for _, k := range lists.Included { included[attribute.Key(k)] = struct{}{} } excluded := make(map[attribute.Key]struct{}) for _, k := range lists.Excluded { if _, ok := included[attribute.Key(k)]; ok { return nil, fmt.Errorf("attribute cannot be in both include and exclude list: %s", k) } excluded[attribute.Key(k)] = struct{}{} } return func(kv attribute.KeyValue) bool { // check if a value is excluded first if _, ok := excluded[kv.Key]; ok { return false } if len(included) == 0 { return true } _, ok := included[kv.Key] return ok }, nil } func prometheusReader(ctx context.Context, prometheusConfig *Prometheus) (sdkmetric.Reader, error) { if prometheusConfig.Host == nil { return nil, errors.New("host must be specified") } if prometheusConfig.Port == nil { return nil, errors.New("port must be specified") } opts, err := prometheusReaderOpts(prometheusConfig) if err != nil { return nil, err } reg := prometheus.NewRegistry() opts = append(opts, otelprom.WithRegisterer(reg)) reader, err := otelprom.New(opts...) if err != nil { return nil, fmt.Errorf("error creating otel prometheus exporter: %w", err) } mux := http.NewServeMux() mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) server := http.Server{ // Timeouts are necessary to make a server resilient to attacks. // We use values from this example: https://blog.cloudflare.com/exposing-go-on-the-internet/#:~:text=There%20are%20three%20main%20timeouts ReadTimeout: 5 * time.Second, WriteTimeout: 10 * time.Second, IdleTimeout: 120 * time.Second, Handler: mux, } // Remove surrounding "[]" from the host definition to allow users to define the host as "[::1]" or "::1". host := *prometheusConfig.Host if len(host) > 2 && host[0] == '[' && host[len(host)-1] == ']' { host = host[1 : len(host)-1] } addr := net.JoinHostPort(host, strconv.Itoa(*prometheusConfig.Port)) lis, err := net.Listen("tcp", addr) if err != nil { return nil, errors.Join( fmt.Errorf("binding address %s for Prometheus exporter: %w", addr, err), reader.Shutdown(ctx), ) } // Only for testing reasons, add the address to the http Server, will not be used. server.Addr = lis.Addr().String() go func() { if err := server.Serve(lis); err != nil && !errors.Is(err, http.ErrServerClosed) { otel.Handle(fmt.Errorf("the Prometheus HTTP server exited unexpectedly: %w", err)) } }() return readerWithServer{reader, &server}, nil } func prometheusReaderOpts(prometheusConfig *Prometheus) ([]otelprom.Option, error) { var opts []otelprom.Option if prometheusConfig.WithoutScopeInfo != nil && *prometheusConfig.WithoutScopeInfo { opts = append(opts, otelprom.WithoutScopeInfo()) } if prometheusConfig.WithoutTypeSuffix != nil && *prometheusConfig.WithoutTypeSuffix { opts = append(opts, otelprom.WithoutCounterSuffixes()) //nolint:staticcheck // WithouTypeSuffix is deprecated, but we still need it for backwards compatibility. } if prometheusConfig.WithoutUnits != nil && *prometheusConfig.WithoutUnits { opts = append(opts, otelprom.WithoutUnits()) //nolint:staticcheck // WithouTypeSuffix is deprecated, but we still need it for backwards compatibility. } if prometheusConfig.WithResourceConstantLabels != nil { f, err := newIncludeExcludeFilter(prometheusConfig.WithResourceConstantLabels) if err != nil { return nil, err } opts = append(opts, otelprom.WithResourceAsConstantLabels(f)) } return opts, nil } type readerWithServer struct { sdkmetric.Reader server *http.Server } func (rws readerWithServer) Shutdown(ctx context.Context) error { return errors.Join( rws.Reader.Shutdown(ctx), rws.server.Shutdown(ctx), ) } func view(v View) (sdkmetric.View, error) { if v.Selector == nil { return nil, errors.New("view: no selector provided") } inst, err := instrument(*v.Selector) if err != nil { return nil, err } s, err := stream(v.Stream) if err != nil { return nil, err } return sdkmetric.NewView(inst, s), nil } func instrument(vs ViewSelector) (sdkmetric.Instrument, error) { kind, err := instrumentKind(vs.InstrumentType) if err != nil { return sdkmetric.Instrument{}, fmt.Errorf("view_selector: %w", err) } inst := sdkmetric.Instrument{ Name: strOrEmpty(vs.InstrumentName), Unit: strOrEmpty(vs.Unit), Kind: kind, Scope: instrumentation.Scope{ Name: strOrEmpty(vs.MeterName), Version: strOrEmpty(vs.MeterVersion), SchemaURL: strOrEmpty(vs.MeterSchemaUrl), }, } if instrumentIsEmpty(inst) { return sdkmetric.Instrument{}, errors.New("view_selector: empty selector not supporter") } return inst, nil } func stream(vs *ViewStream) (sdkmetric.Stream, error) { if vs == nil { return sdkmetric.Stream{}, nil } f, err := newIncludeExcludeFilter(vs.AttributeKeys) if err != nil { return sdkmetric.Stream{}, err } return sdkmetric.Stream{ Name: strOrEmpty(vs.Name), Description: strOrEmpty(vs.Description), Aggregation: aggregation(vs.Aggregation), AttributeFilter: f, }, nil } func aggregation(aggr *ViewStreamAggregation) sdkmetric.Aggregation { if aggr == nil { return nil } if aggr.Base2ExponentialBucketHistogram != nil { return sdkmetric.AggregationBase2ExponentialHistogram{ MaxSize: int32OrZero(aggr.Base2ExponentialBucketHistogram.MaxSize), MaxScale: int32OrZero(aggr.Base2ExponentialBucketHistogram.MaxScale), // Need to negate because config has the positive action RecordMinMax. NoMinMax: !boolOrFalse(aggr.Base2ExponentialBucketHistogram.RecordMinMax), } } if aggr.Default != nil { // TODO: Understand what to set here. return nil } if aggr.Drop != nil { return sdkmetric.AggregationDrop{} } if aggr.ExplicitBucketHistogram != nil { return sdkmetric.AggregationExplicitBucketHistogram{ Boundaries: aggr.ExplicitBucketHistogram.Boundaries, // Need to negate because config has the positive action RecordMinMax. NoMinMax: !boolOrFalse(aggr.ExplicitBucketHistogram.RecordMinMax), } } if aggr.LastValue != nil { return sdkmetric.AggregationLastValue{} } if aggr.Sum != nil { return sdkmetric.AggregationSum{} } return nil } func instrumentKind(vsit *ViewSelectorInstrumentType) (sdkmetric.InstrumentKind, error) { if vsit == nil { // Equivalent to instrumentKindUndefined. return instrumentKindUndefined, nil } switch *vsit { case ViewSelectorInstrumentTypeCounter: return sdkmetric.InstrumentKindCounter, nil case ViewSelectorInstrumentTypeUpDownCounter: return sdkmetric.InstrumentKindUpDownCounter, nil case ViewSelectorInstrumentTypeHistogram: return sdkmetric.InstrumentKindHistogram, nil case ViewSelectorInstrumentTypeObservableCounter: return sdkmetric.InstrumentKindObservableCounter, nil case ViewSelectorInstrumentTypeObservableUpDownCounter: return sdkmetric.InstrumentKindObservableUpDownCounter, nil case ViewSelectorInstrumentTypeObservableGauge: return sdkmetric.InstrumentKindObservableGauge, nil } return instrumentKindUndefined, errors.New("instrument_type: invalid value") } func instrumentIsEmpty(i sdkmetric.Instrument) bool { return i.Name == "" && i.Description == "" && i.Kind == instrumentKindUndefined && i.Unit == "" && i.Scope == zeroScope } func boolOrFalse(pBool *bool) bool { if pBool == nil { return false } return *pBool } func int32OrZero(pInt *int) int32 { if pInt == nil { return 0 } i := *pInt if i > math.MaxInt32 { return math.MaxInt32 } if i < math.MinInt32 { return math.MinInt32 } return int32(i) } func strOrEmpty(pStr *string) string { if pStr == nil { return "" } return *pStr }
// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package otelconf // import "go.opentelemetry.io/contrib/otelconf/v0.3.0" import ( "fmt" "strconv" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/resource" ) func keyVal(k string, v any) attribute.KeyValue { switch val := v.(type) { case bool: return attribute.Bool(k, val) case int64: return attribute.Int64(k, val) case uint64: return attribute.String(k, strconv.FormatUint(val, 10)) case float64: return attribute.Float64(k, val) case int8: return attribute.Int64(k, int64(val)) case uint8: return attribute.Int64(k, int64(val)) case int16: return attribute.Int64(k, int64(val)) case uint16: return attribute.Int64(k, int64(val)) case int32: return attribute.Int64(k, int64(val)) case uint32: return attribute.Int64(k, int64(val)) case float32: return attribute.Float64(k, float64(val)) case int: return attribute.Int(k, val) case uint: return attribute.String(k, strconv.FormatUint(uint64(val), 10)) case string: return attribute.String(k, val) default: return attribute.String(k, fmt.Sprint(v)) } } func newResource(res *Resource) *resource.Resource { if res == nil { return resource.Default() } var attrs []attribute.KeyValue for _, v := range res.Attributes { attrs = append(attrs, keyVal(v.Name, v.Value)) } if res.SchemaUrl == nil { return resource.NewSchemaless(attrs...) } return resource.NewWithAttributes(*res.SchemaUrl, attrs...) }
// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package otelconf // import "go.opentelemetry.io/contrib/otelconf/v0.3.0" import ( "context" "errors" "fmt" "net/url" "time" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" "google.golang.org/grpc/credentials" ) var errInvalidSamplerConfiguration = errors.New("invalid sampler configuration") func tracerProvider(cfg configOptions, res *resource.Resource) (trace.TracerProvider, shutdownFunc, error) { if cfg.opentelemetryConfig.TracerProvider == nil { return noop.NewTracerProvider(), noopShutdown, nil } opts := append(cfg.tracerProviderOptions, sdktrace.WithResource(res)) var errs []error for _, processor := range cfg.opentelemetryConfig.TracerProvider.Processors { sp, err := spanProcessor(cfg.ctx, processor) if err == nil { opts = append(opts, sdktrace.WithSpanProcessor(sp)) } else { errs = append(errs, err) } } if s, err := sampler(cfg.opentelemetryConfig.TracerProvider.Sampler); err == nil { opts = append(opts, sdktrace.WithSampler(s)) } else { errs = append(errs, err) } if len(errs) > 0 { return noop.NewTracerProvider(), noopShutdown, errors.Join(errs...) } tp := sdktrace.NewTracerProvider(opts...) return tp, tp.Shutdown, nil } func parentBasedSampler(s *SamplerParentBased) (sdktrace.Sampler, error) { var rootSampler sdktrace.Sampler var opts []sdktrace.ParentBasedSamplerOption var errs []error var err error if s.Root == nil { rootSampler = sdktrace.AlwaysSample() } else { rootSampler, err = sampler(s.Root) if err != nil { errs = append(errs, err) } } if s.RemoteParentSampled != nil { remoteParentSampler, err := sampler(s.RemoteParentSampled) if err != nil { errs = append(errs, err) } else { opts = append(opts, sdktrace.WithRemoteParentSampled(remoteParentSampler)) } } if s.RemoteParentNotSampled != nil { remoteParentNotSampler, err := sampler(s.RemoteParentNotSampled) if err != nil { errs = append(errs, err) } else { opts = append(opts, sdktrace.WithRemoteParentNotSampled(remoteParentNotSampler)) } } if s.LocalParentSampled != nil { localParentSampler, err := sampler(s.LocalParentSampled) if err != nil { errs = append(errs, err) } else { opts = append(opts, sdktrace.WithLocalParentSampled(localParentSampler)) } } if s.LocalParentNotSampled != nil { localParentNotSampler, err := sampler(s.LocalParentNotSampled) if err != nil { errs = append(errs, err) } else { opts = append(opts, sdktrace.WithLocalParentNotSampled(localParentNotSampler)) } } if len(errs) > 0 { return nil, errors.Join(errs...) } return sdktrace.ParentBased(rootSampler, opts...), nil } func sampler(s *Sampler) (sdktrace.Sampler, error) { if s == nil { // If omitted, parent based sampler with a root of always_on is used. return sdktrace.ParentBased(sdktrace.AlwaysSample()), nil } if s.ParentBased != nil { return parentBasedSampler(s.ParentBased) } if s.AlwaysOff != nil { return sdktrace.NeverSample(), nil } if s.AlwaysOn != nil { return sdktrace.AlwaysSample(), nil } if s.TraceIDRatioBased != nil { if s.TraceIDRatioBased.Ratio == nil { return sdktrace.TraceIDRatioBased(1), nil } return sdktrace.TraceIDRatioBased(*s.TraceIDRatioBased.Ratio), nil } return nil, errInvalidSamplerConfiguration } func spanExporter(ctx context.Context, exporter SpanExporter) (sdktrace.SpanExporter, error) { if exporter.Console != nil && exporter.OTLP != nil { return nil, errors.New("must not specify multiple exporters") } if exporter.Console != nil { return stdouttrace.New( stdouttrace.WithPrettyPrint(), ) } if exporter.OTLP != nil && exporter.OTLP.Protocol != nil { switch *exporter.OTLP.Protocol { case protocolProtobufHTTP: return otlpHTTPSpanExporter(ctx, exporter.OTLP) case protocolProtobufGRPC: return otlpGRPCSpanExporter(ctx, exporter.OTLP) default: return nil, fmt.Errorf("unsupported protocol %q", *exporter.OTLP.Protocol) } } return nil, errors.New("no valid span exporter") } func spanProcessor(ctx context.Context, processor SpanProcessor) (sdktrace.SpanProcessor, error) { if processor.Batch != nil && processor.Simple != nil { return nil, errors.New("must not specify multiple span processor type") } if processor.Batch != nil { exp, err := spanExporter(ctx, processor.Batch.Exporter) if err != nil { return nil, err } return batchSpanProcessor(processor.Batch, exp) } if processor.Simple != nil { exp, err := spanExporter(ctx, processor.Simple.Exporter) if err != nil { return nil, err } return sdktrace.NewSimpleSpanProcessor(exp), nil } return nil, errors.New("unsupported span processor type, must be one of simple or batch") } func otlpGRPCSpanExporter(ctx context.Context, otlpConfig *OTLP) (sdktrace.SpanExporter, error) { var opts []otlptracegrpc.Option if otlpConfig.Endpoint != nil { u, err := url.ParseRequestURI(*otlpConfig.Endpoint) if err != nil { return nil, err } // ParseRequestURI leaves the Host field empty when no // scheme is specified (i.e. localhost:4317). This check is // here to support the case where a user may not specify a // scheme. The code does its best effort here by using // otlpConfig.Endpoint as-is in that case. if u.Host != "" { opts = append(opts, otlptracegrpc.WithEndpoint(u.Host)) } else { opts = append(opts, otlptracegrpc.WithEndpoint(*otlpConfig.Endpoint)) } if u.Scheme == "http" || (u.Scheme != "https" && otlpConfig.Insecure != nil && *otlpConfig.Insecure) { opts = append(opts, otlptracegrpc.WithInsecure()) } } if otlpConfig.Compression != nil { switch *otlpConfig.Compression { case compressionGzip: opts = append(opts, otlptracegrpc.WithCompressor(*otlpConfig.Compression)) case compressionNone: // none requires no options default: return nil, fmt.Errorf("unsupported compression %q", *otlpConfig.Compression) } } if otlpConfig.Timeout != nil && *otlpConfig.Timeout > 0 { opts = append(opts, otlptracegrpc.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout))) } headersConfig, err := createHeadersConfig(otlpConfig.Headers, otlpConfig.HeadersList) if err != nil { return nil, err } if len(headersConfig) > 0 { opts = append(opts, otlptracegrpc.WithHeaders(headersConfig)) } if otlpConfig.Certificate != nil || otlpConfig.ClientCertificate != nil || otlpConfig.ClientKey != nil { tlsConfig, err := createTLSConfig(otlpConfig.Certificate, otlpConfig.ClientCertificate, otlpConfig.ClientKey) if err != nil { return nil, err } opts = append(opts, otlptracegrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig))) } return otlptracegrpc.New(ctx, opts...) } func otlpHTTPSpanExporter(ctx context.Context, otlpConfig *OTLP) (sdktrace.SpanExporter, error) { var opts []otlptracehttp.Option if otlpConfig.Endpoint != nil { u, err := url.ParseRequestURI(*otlpConfig.Endpoint) if err != nil { return nil, err } opts = append(opts, otlptracehttp.WithEndpoint(u.Host)) if u.Scheme == "http" { opts = append(opts, otlptracehttp.WithInsecure()) } if u.Path != "" { opts = append(opts, otlptracehttp.WithURLPath(u.Path)) } } if otlpConfig.Compression != nil { switch *otlpConfig.Compression { case compressionGzip: opts = append(opts, otlptracehttp.WithCompression(otlptracehttp.GzipCompression)) case compressionNone: opts = append(opts, otlptracehttp.WithCompression(otlptracehttp.NoCompression)) default: return nil, fmt.Errorf("unsupported compression %q", *otlpConfig.Compression) } } if otlpConfig.Timeout != nil && *otlpConfig.Timeout > 0 { opts = append(opts, otlptracehttp.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout))) } headersConfig, err := createHeadersConfig(otlpConfig.Headers, otlpConfig.HeadersList) if err != nil { return nil, err } if len(headersConfig) > 0 { opts = append(opts, otlptracehttp.WithHeaders(headersConfig)) } tlsConfig, err := createTLSConfig(otlpConfig.Certificate, otlpConfig.ClientCertificate, otlpConfig.ClientKey) if err != nil { return nil, err } opts = append(opts, otlptracehttp.WithTLSClientConfig(tlsConfig)) return otlptracehttp.New(ctx, opts...) } func batchSpanProcessor(bsp *BatchSpanProcessor, exp sdktrace.SpanExporter) (sdktrace.SpanProcessor, error) { var opts []sdktrace.BatchSpanProcessorOption if bsp.ExportTimeout != nil { if *bsp.ExportTimeout < 0 { return nil, fmt.Errorf("invalid export timeout %d", *bsp.ExportTimeout) } opts = append(opts, sdktrace.WithExportTimeout(time.Millisecond*time.Duration(*bsp.ExportTimeout))) } if bsp.MaxExportBatchSize != nil { if *bsp.MaxExportBatchSize < 0 { return nil, fmt.Errorf("invalid batch size %d", *bsp.MaxExportBatchSize) } opts = append(opts, sdktrace.WithMaxExportBatchSize(*bsp.MaxExportBatchSize)) } if bsp.MaxQueueSize != nil { if *bsp.MaxQueueSize < 0 { return nil, fmt.Errorf("invalid queue size %d", *bsp.MaxQueueSize) } opts = append(opts, sdktrace.WithMaxQueueSize(*bsp.MaxQueueSize)) } if bsp.ScheduleDelay != nil { if *bsp.ScheduleDelay < 0 { return nil, fmt.Errorf("invalid schedule delay %d", *bsp.ScheduleDelay) } opts = append(opts, sdktrace.WithBatchTimeout(time.Millisecond*time.Duration(*bsp.ScheduleDelay))) } return sdktrace.NewBatchSpanProcessor(exp, opts...), nil }