// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package kv contains function to translate name value pairs into
// attribute.KeyValue.
package kv // import "go.opentelemetry.io/contrib/otelconf/internal/kv"
import (
"fmt"
"strconv"
"go.opentelemetry.io/otel/attribute"
)
func FromNameValue(k string, v any) attribute.KeyValue {
switch val := v.(type) {
case bool:
return attribute.Bool(k, val)
case int64:
return attribute.Int64(k, val)
case uint64:
return attribute.String(k, strconv.FormatUint(val, 10))
case float64:
return attribute.Float64(k, val)
case int8:
return attribute.Int64(k, int64(val))
case uint8:
return attribute.Int64(k, int64(val))
case int16:
return attribute.Int64(k, int64(val))
case uint16:
return attribute.Int64(k, int64(val))
case int32:
return attribute.Int64(k, int64(val))
case uint32:
return attribute.Int64(k, int64(val))
case float32:
return attribute.Float64(k, float64(val))
case int:
return attribute.Int(k, val)
case uint:
return attribute.String(k, strconv.FormatUint(uint64(val), 10))
case string:
return attribute.String(k, val)
default:
return attribute.String(k, fmt.Sprint(v))
}
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package provider contains various providers
// used to replace variables in configuration files.
package provider // import "go.opentelemetry.io/contrib/otelconf/internal/provider"
import (
"fmt"
"os"
"regexp"
"strings"
"time"
"go.yaml.in/yaml/v3"
)
const validationPattern = `^[a-zA-Z_][a-zA-Z0-9_]*$`
var (
validationRegexp = regexp.MustCompile(validationPattern)
doubleDollarSignsRegexp = regexp.MustCompile(`\$\$([^{$])`)
envVarRegexp = regexp.MustCompile(`([$]*)\{([a-zA-Z_][a-zA-Z0-9_]*-?[^}]*)\}`)
)
func ReplaceEnvVars(input []byte) ([]byte, error) {
// start by replacing all $$ that are not followed by a {
out := doubleDollarSignsRegexp.ReplaceAllFunc(input, func(s []byte) []byte {
return append([]byte("$"), doubleDollarSignsRegexp.FindSubmatch(s)[1]...)
})
var err error
out = envVarRegexp.ReplaceAllFunc(out, func(s []byte) []byte {
match := envVarRegexp.FindSubmatch(s)
var data []byte
// check if we have an odd number of $, which indicates that
// env var replacement should be done
dollarSigns := match[1]
if len(match) > 2 && (len(dollarSigns)%2 == 1) {
data, err = replaceEnvVar(string(match[2]))
if err != nil {
return data
}
if len(dollarSigns) > 1 {
data = append(dollarSigns[0:(len(dollarSigns)/2)], data...)
}
} else {
// need to expand any default value env var to support the case $${STRING_VALUE:-${STRING_VALUE}}
_, defaultValue := parseEnvVar(string(match[2]))
if !defaultValue.valid || !strings.Contains(defaultValue.data, "$") {
return fmt.Appendf(dollarSigns[0:(len(dollarSigns)/2)], "{%s}", match[2])
}
// expand the default value
data, err = ReplaceEnvVars(append(match[2], byte('}')))
if err != nil {
return data
}
data = fmt.Appendf(dollarSigns[0:(len(dollarSigns)/2)], "{%s", data)
}
return data
})
if err != nil {
return nil, err
}
return out, nil
}
func replaceEnvVar(in string) ([]byte, error) {
envVarName, defaultValue := parseEnvVar(in)
if strings.Contains(envVarName, ":") {
return nil, fmt.Errorf("invalid environment variable name: %s", envVarName)
}
if !validationRegexp.MatchString(envVarName) {
return nil, fmt.Errorf("invalid environment variable name: %s", envVarName)
}
val := os.Getenv(envVarName)
if val == "" && defaultValue.valid {
val = strings.ReplaceAll(defaultValue.data, "$$", "$")
}
if val == "" {
return nil, nil
}
out := []byte(val)
if err := checkRawConfType(out); err != nil {
return nil, fmt.Errorf("invalid value type: %w", err)
}
return out, nil
}
type defaultValue struct {
data string
valid bool
}
func parseEnvVar(in string) (string, defaultValue) {
in = strings.TrimPrefix(in, "env:")
const sep = ":-"
if i := strings.Index(in, sep); i >= 0 {
return in[:i], defaultValue{data: in[i+len(sep):], valid: true}
}
return in, defaultValue{}
}
func checkRawConfType(val []byte) error {
var rawConf any
err := yaml.Unmarshal(val, &rawConf)
if err != nil {
return err
}
switch rawConf.(type) {
case int, int32, int64, float32, float64, bool, string, time.Time:
return nil
default:
return fmt.Errorf(
"unsupported type=%T for retrieved config,"+
" ensure that values are wrapped in quotes", rawConf)
}
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package tls provides functionality to translate configuration options into tls.Config.
package tls // import "go.opentelemetry.io/contrib/otelconf/internal/tls"
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"os"
)
// CreateConfig creates a tls.Config from certificate files.
func CreateConfig(caCertFile, clientCertFile, clientKeyFile *string) (*tls.Config, error) {
tlsConfig := &tls.Config{}
if caCertFile != nil {
caText, err := os.ReadFile(*caCertFile)
if err != nil {
return nil, err
}
certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caText) {
return nil, errors.New("could not create certificate authority chain from certificate")
}
tlsConfig.RootCAs = certPool
}
if clientCertFile != nil {
if clientKeyFile == nil {
return nil, errors.New("client certificate was provided but no client key was provided")
}
clientCert, err := tls.LoadX509KeyPair(*clientCertFile, *clientKeyFile)
if err != nil {
return nil, fmt.Errorf("could not use client certificate: %w", err)
}
tlsConfig.Certificates = []tls.Certificate{clientCert}
}
return tlsConfig, nil
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package otelconf provides an OpenTelemetry declarative configuration SDK.
package otelconf // import "go.opentelemetry.io/contrib/otelconf/v0.3.0"
import (
"context"
"errors"
"fmt"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/log"
nooplog "go.opentelemetry.io/otel/log/noop"
"go.opentelemetry.io/otel/metric"
noopmetric "go.opentelemetry.io/otel/metric/noop"
sdklog "go.opentelemetry.io/otel/sdk/log"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
nooptrace "go.opentelemetry.io/otel/trace/noop"
yaml "go.yaml.in/yaml/v3"
"go.opentelemetry.io/contrib/otelconf/internal/provider"
)
const (
protocolProtobufHTTP = "http/protobuf"
protocolProtobufGRPC = "grpc"
compressionGzip = "gzip"
compressionNone = "none"
)
type configOptions struct {
ctx context.Context
opentelemetryConfig OpenTelemetryConfiguration
loggerProviderOptions []sdklog.LoggerProviderOption
meterProviderOptions []sdkmetric.Option
tracerProviderOptions []sdktrace.TracerProviderOption
}
type shutdownFunc func(context.Context) error
func noopShutdown(context.Context) error {
return nil
}
// SDK is a struct that contains all the providers
// configured via the configuration model.
type SDK struct {
meterProvider metric.MeterProvider
tracerProvider trace.TracerProvider
loggerProvider log.LoggerProvider
shutdown shutdownFunc
}
// TracerProvider returns a configured trace.TracerProvider.
func (s *SDK) TracerProvider() trace.TracerProvider {
return s.tracerProvider
}
// MeterProvider returns a configured metric.MeterProvider.
func (s *SDK) MeterProvider() metric.MeterProvider {
return s.meterProvider
}
// LoggerProvider returns a configured log.LoggerProvider.
func (s *SDK) LoggerProvider() log.LoggerProvider {
return s.loggerProvider
}
// Shutdown calls shutdown on all configured providers.
func (s *SDK) Shutdown(ctx context.Context) error {
return s.shutdown(ctx)
}
var noopSDK = SDK{
loggerProvider: nooplog.LoggerProvider{},
meterProvider: noopmetric.MeterProvider{},
tracerProvider: nooptrace.TracerProvider{},
shutdown: func(context.Context) error { return nil },
}
// NewSDK creates SDK providers based on the configuration model.
func NewSDK(opts ...ConfigurationOption) (SDK, error) {
o := configOptions{
ctx: context.Background(),
}
for _, opt := range opts {
o = opt.apply(o)
}
if o.opentelemetryConfig.Disabled != nil && *o.opentelemetryConfig.Disabled {
return noopSDK, nil
}
r := newResource(o.opentelemetryConfig.Resource)
mp, mpShutdown, err := meterProvider(o, r)
if err != nil {
return noopSDK, err
}
tp, tpShutdown, err := tracerProvider(o, r)
if err != nil {
return noopSDK, err
}
lp, lpShutdown, err := loggerProvider(o, r)
if err != nil {
return noopSDK, err
}
return SDK{
meterProvider: mp,
tracerProvider: tp,
loggerProvider: lp,
shutdown: func(ctx context.Context) error {
return errors.Join(mpShutdown(ctx), tpShutdown(ctx), lpShutdown(ctx))
},
}, nil
}
// ConfigurationOption configures options for providers.
type ConfigurationOption interface {
apply(configOptions) configOptions
}
type configurationOptionFunc func(configOptions) configOptions
func (fn configurationOptionFunc) apply(cfg configOptions) configOptions {
return fn(cfg)
}
// WithContext sets the context.Context for the SDK.
func WithContext(ctx context.Context) ConfigurationOption {
return configurationOptionFunc(func(c configOptions) configOptions {
c.ctx = ctx
return c
})
}
// WithOpenTelemetryConfiguration sets the OpenTelemetryConfiguration used
// to produce the SDK.
func WithOpenTelemetryConfiguration(cfg OpenTelemetryConfiguration) ConfigurationOption {
return configurationOptionFunc(func(c configOptions) configOptions {
c.opentelemetryConfig = cfg
return c
})
}
// WithLoggerProviderOptions appends LoggerProviderOptions used for constructing
// the LoggerProvider. OpenTelemetryConfiguration takes precedence over these options.
func WithLoggerProviderOptions(opts ...sdklog.LoggerProviderOption) ConfigurationOption {
return configurationOptionFunc(func(c configOptions) configOptions {
c.loggerProviderOptions = append(c.loggerProviderOptions, opts...)
return c
})
}
// WithMeterProviderOptions appends metric.Options used for constructing the
// MeterProvider. OpenTelemetryConfiguration takes precedence over these options.
func WithMeterProviderOptions(opts ...sdkmetric.Option) ConfigurationOption {
return configurationOptionFunc(func(c configOptions) configOptions {
c.meterProviderOptions = append(c.meterProviderOptions, opts...)
return c
})
}
// WithTracerProviderOptions appends TracerProviderOptions used for constructing
// the TracerProvider. OpenTelemetryConfiguration takes precedence over these options.
func WithTracerProviderOptions(opts ...sdktrace.TracerProviderOption) ConfigurationOption {
return configurationOptionFunc(func(c configOptions) configOptions {
c.tracerProviderOptions = append(c.tracerProviderOptions, opts...)
return c
})
}
// ParseYAML parses a YAML configuration file into an OpenTelemetryConfiguration.
func ParseYAML(file []byte) (*OpenTelemetryConfiguration, error) {
file, err := provider.ReplaceEnvVars(file)
if err != nil {
return nil, err
}
var cfg OpenTelemetryConfiguration
err = yaml.Unmarshal(file, &cfg)
if err != nil {
return nil, err
}
return &cfg, nil
}
// createHeadersConfig combines the two header config fields. Headers take precedence over headersList.
func createHeadersConfig(headers []NameStringValuePair, headersList *string) (map[string]string, error) {
result := make(map[string]string)
if headersList != nil {
// Parsing follows https://github.com/open-telemetry/opentelemetry-configuration/blob/568e5080816d40d75792eb754fc96bde09654159/schema/type_descriptions.yaml#L584.
headerslist, err := baggage.Parse(*headersList)
if err != nil {
return nil, fmt.Errorf("invalid headers list: %w", err)
}
for _, kv := range headerslist.Members() {
result[kv.Key()] = kv.Value()
}
}
// Headers take precedence over HeadersList, so this has to be after HeadersList is processed
if len(headers) > 0 {
for _, kv := range headers {
if kv.Value != nil {
result[kv.Name] = *kv.Value
}
}
}
return result, nil
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package otelconf // import "go.opentelemetry.io/contrib/otelconf/v0.3.0"
import (
"encoding/json"
"errors"
"fmt"
"reflect"
)
// MarshalJSON implements json.Marshaler.
func (j *AttributeNameValueType) MarshalJSON() ([]byte, error) {
return json.Marshal(j.Value)
}
var enumValuesAttributeNameValueType = []any{
nil,
"string",
"bool",
"int",
"double",
"string_array",
"bool_array",
"int_array",
"double_array",
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *AttributeNameValueType) UnmarshalJSON(b []byte) error {
var v struct {
Value any
}
if err := json.Unmarshal(b, &v.Value); err != nil {
return err
}
var ok bool
for _, expected := range enumValuesAttributeNameValueType {
if reflect.DeepEqual(v.Value, expected) {
ok = true
break
}
}
if !ok {
return fmt.Errorf("invalid value (expected one of %#v): %#v", enumValuesAttributeNameValueType, v.Value)
}
*j = AttributeNameValueType(v)
return nil
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *BatchLogRecordProcessor) UnmarshalJSON(b []byte) error {
var raw map[string]any
if err := json.Unmarshal(b, &raw); err != nil {
return err
}
if _, ok := raw["exporter"]; raw != nil && !ok {
return errors.New("field exporter in BatchLogRecordProcessor: required")
}
type Plain BatchLogRecordProcessor
var plain Plain
if err := json.Unmarshal(b, &plain); err != nil {
return err
}
*j = BatchLogRecordProcessor(plain)
return nil
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *BatchSpanProcessor) UnmarshalJSON(b []byte) error {
var raw map[string]any
if err := json.Unmarshal(b, &raw); err != nil {
return err
}
if _, ok := raw["exporter"]; raw != nil && !ok {
return errors.New("field exporter in BatchSpanProcessor: required")
}
type Plain BatchSpanProcessor
var plain Plain
if err := json.Unmarshal(b, &plain); err != nil {
return err
}
*j = BatchSpanProcessor(plain)
return nil
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *GeneralInstrumentationPeerServiceMappingElem) UnmarshalJSON(b []byte) error {
var raw map[string]any
if err := json.Unmarshal(b, &raw); err != nil {
return err
}
if _, ok := raw["peer"]; raw != nil && !ok {
return errors.New("field peer in GeneralInstrumentationPeerServiceMappingElem: required")
}
if _, ok := raw["service"]; raw != nil && !ok {
return errors.New("field service in GeneralInstrumentationPeerServiceMappingElem: required")
}
type Plain GeneralInstrumentationPeerServiceMappingElem
var plain Plain
if err := json.Unmarshal(b, &plain); err != nil {
return err
}
*j = GeneralInstrumentationPeerServiceMappingElem(plain)
return nil
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *NameStringValuePair) UnmarshalJSON(b []byte) error {
var raw map[string]any
if err := json.Unmarshal(b, &raw); err != nil {
return err
}
if _, ok := raw["name"]; !ok {
return errors.New("json: cannot unmarshal field name in NameStringValuePair required")
}
if _, ok := raw["value"]; !ok {
return errors.New("json: cannot unmarshal field value in NameStringValuePair required")
}
var name, value string
var ok bool
if name, ok = raw["name"].(string); !ok {
return errors.New("yaml: cannot unmarshal field name in NameStringValuePair must be string")
}
if value, ok = raw["value"].(string); !ok {
return errors.New("yaml: cannot unmarshal field value in NameStringValuePair must be string")
}
*j = NameStringValuePair{
Name: name,
Value: &value,
}
return nil
}
var enumValuesOTLPMetricDefaultHistogramAggregation = []any{
"explicit_bucket_histogram",
"base2_exponential_bucket_histogram",
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *OTLPMetricDefaultHistogramAggregation) UnmarshalJSON(b []byte) error {
var v string
if err := json.Unmarshal(b, &v); err != nil {
return err
}
var ok bool
for _, expected := range enumValuesOTLPMetricDefaultHistogramAggregation {
if reflect.DeepEqual(v, expected) {
ok = true
break
}
}
if !ok {
return fmt.Errorf("invalid value (expected one of %#v): %#v", enumValuesOTLPMetricDefaultHistogramAggregation, v)
}
*j = OTLPMetricDefaultHistogramAggregation(v)
return nil
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *OTLPMetric) UnmarshalJSON(b []byte) error {
var raw map[string]any
if err := json.Unmarshal(b, &raw); err != nil {
return err
}
if _, ok := raw["endpoint"]; raw != nil && !ok {
return errors.New("field endpoint in OTLPMetric: required")
}
if _, ok := raw["protocol"]; raw != nil && !ok {
return errors.New("field protocol in OTLPMetric: required")
}
type Plain OTLPMetric
var plain Plain
if err := json.Unmarshal(b, &plain); err != nil {
return err
}
*j = OTLPMetric(plain)
return nil
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *OTLP) UnmarshalJSON(b []byte) error {
var raw map[string]any
if err := json.Unmarshal(b, &raw); err != nil {
return err
}
if _, ok := raw["endpoint"]; raw != nil && !ok {
return errors.New("field endpoint in OTLP: required")
}
if _, ok := raw["protocol"]; raw != nil && !ok {
return errors.New("field protocol in OTLP: required")
}
type Plain OTLP
var plain Plain
if err := json.Unmarshal(b, &plain); err != nil {
return err
}
*j = OTLP(plain)
return nil
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *OpenTelemetryConfiguration) UnmarshalJSON(b []byte) error {
var raw map[string]any
if err := json.Unmarshal(b, &raw); err != nil {
return err
}
if _, ok := raw["file_format"]; raw != nil && !ok {
return errors.New("field file_format in OpenTelemetryConfiguration: required")
}
type Plain OpenTelemetryConfiguration
var plain Plain
if err := json.Unmarshal(b, &plain); err != nil {
return err
}
*j = OpenTelemetryConfiguration(plain)
return nil
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *PeriodicMetricReader) UnmarshalJSON(b []byte) error {
var raw map[string]any
if err := json.Unmarshal(b, &raw); err != nil {
return err
}
if _, ok := raw["exporter"]; raw != nil && !ok {
return errors.New("field exporter in PeriodicMetricReader: required")
}
type Plain PeriodicMetricReader
var plain Plain
if err := json.Unmarshal(b, &plain); err != nil {
return err
}
*j = PeriodicMetricReader(plain)
return nil
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *PullMetricReader) UnmarshalJSON(b []byte) error {
var raw map[string]any
if err := json.Unmarshal(b, &raw); err != nil {
return err
}
if _, ok := raw["exporter"]; raw != nil && !ok {
return errors.New("field exporter in PullMetricReader: required")
}
type Plain PullMetricReader
var plain Plain
if err := json.Unmarshal(b, &plain); err != nil {
return err
}
*j = PullMetricReader(plain)
return nil
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *SimpleLogRecordProcessor) UnmarshalJSON(b []byte) error {
var raw map[string]any
if err := json.Unmarshal(b, &raw); err != nil {
return err
}
if _, ok := raw["exporter"]; raw != nil && !ok {
return errors.New("field exporter in SimpleLogRecordProcessor: required")
}
type Plain SimpleLogRecordProcessor
var plain Plain
if err := json.Unmarshal(b, &plain); err != nil {
return err
}
*j = SimpleLogRecordProcessor(plain)
return nil
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *SimpleSpanProcessor) UnmarshalJSON(b []byte) error {
var raw map[string]any
if err := json.Unmarshal(b, &raw); err != nil {
return err
}
if _, ok := raw["exporter"]; raw != nil && !ok {
return errors.New("field exporter in SimpleSpanProcessor: required")
}
type Plain SimpleSpanProcessor
var plain Plain
if err := json.Unmarshal(b, &plain); err != nil {
return err
}
*j = SimpleSpanProcessor(plain)
return nil
}
var enumValuesViewSelectorInstrumentType = []any{
"counter",
"histogram",
"observable_counter",
"observable_gauge",
"observable_up_down_counter",
"up_down_counter",
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *ViewSelectorInstrumentType) UnmarshalJSON(b []byte) error {
var v string
if err := json.Unmarshal(b, &v); err != nil {
return err
}
var ok bool
for _, expected := range enumValuesViewSelectorInstrumentType {
if reflect.DeepEqual(v, expected) {
ok = true
break
}
}
if !ok {
return fmt.Errorf("invalid value (expected one of %#v): %#v", enumValuesViewSelectorInstrumentType, v)
}
*j = ViewSelectorInstrumentType(v)
return nil
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *Zipkin) UnmarshalJSON(b []byte) error {
var raw map[string]any
if err := json.Unmarshal(b, &raw); err != nil {
return err
}
if _, ok := raw["endpoint"]; raw != nil && !ok {
return errors.New("field endpoint in Zipkin: required")
}
type Plain Zipkin
var plain Plain
if err := json.Unmarshal(b, &plain); err != nil {
return err
}
*j = Zipkin(plain)
return nil
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *AttributeNameValue) UnmarshalJSON(b []byte) error {
var raw map[string]any
if err := json.Unmarshal(b, &raw); err != nil {
return err
}
if _, ok := raw["name"]; raw != nil && !ok {
return errors.New("field name in AttributeNameValue: required")
}
if _, ok := raw["value"]; raw != nil && !ok {
return errors.New("field value in AttributeNameValue: required")
}
type Plain AttributeNameValue
var plain Plain
if err := json.Unmarshal(b, &plain); err != nil {
return err
}
if plain.Type != nil && plain.Type.Value == "int" {
val, ok := plain.Value.(float64)
if ok {
plain.Value = int(val)
}
}
if plain.Type != nil && plain.Type.Value == "int_array" {
m, ok := plain.Value.([]any)
if ok {
var vals []any
for _, v := range m {
val, ok := v.(float64)
if ok {
vals = append(vals, int(val))
} else {
vals = append(vals, val)
}
}
plain.Value = vals
}
}
*j = AttributeNameValue(plain)
return nil
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package otelconf // import "go.opentelemetry.io/contrib/otelconf/v0.3.0"
import (
"errors"
"fmt"
"reflect"
)
// UnmarshalYAML implements yaml.Unmarshaler.
func (j *AttributeNameValueType) UnmarshalYAML(unmarshal func(any) error) error {
var v struct {
Value any
}
if err := unmarshal(&v.Value); err != nil {
return err
}
var ok bool
for _, expected := range enumValuesAttributeNameValueType {
if reflect.DeepEqual(v.Value, expected) {
ok = true
break
}
}
if !ok {
return fmt.Errorf("invalid value (expected one of %#v): %#v", enumValuesAttributeNameValueType, v.Value)
}
*j = AttributeNameValueType(v)
return nil
}
// UnmarshalYAML implements yaml.Unmarshaler.
func (j *NameStringValuePair) UnmarshalYAML(unmarshal func(any) error) error {
var raw map[string]any
if err := unmarshal(&raw); err != nil {
return err
}
if _, ok := raw["name"]; !ok {
return errors.New("yaml: cannot unmarshal field name in NameStringValuePair required")
}
if _, ok := raw["value"]; !ok {
return errors.New("yaml: cannot unmarshal field value in NameStringValuePair required")
}
var name, value string
var ok bool
if name, ok = raw["name"].(string); !ok {
return errors.New("yaml: cannot unmarshal field name in NameStringValuePair must be string")
}
if value, ok = raw["value"].(string); !ok {
return errors.New("yaml: cannot unmarshal field value in NameStringValuePair must be string")
}
*j = NameStringValuePair{
Name: name,
Value: &value,
}
return nil
}
// UnmarshalYAML implements yaml.Unmarshaler.
func (j *LanguageSpecificInstrumentation) UnmarshalYAML(unmarshal func(any) error) error {
var raw map[string]any
if err := unmarshal(&raw); err != nil {
return err
}
*j = raw
return nil
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package otelconf // import "go.opentelemetry.io/contrib/otelconf/v0.3.0"
import (
"context"
"errors"
"fmt"
"net/url"
"time"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
"go.opentelemetry.io/otel/exporters/stdout/stdoutlog"
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/noop"
sdklog "go.opentelemetry.io/otel/sdk/log"
"go.opentelemetry.io/otel/sdk/resource"
"google.golang.org/grpc/credentials"
"go.opentelemetry.io/contrib/otelconf/internal/tls"
)
func loggerProvider(cfg configOptions, res *resource.Resource) (log.LoggerProvider, shutdownFunc, error) {
if cfg.opentelemetryConfig.LoggerProvider == nil {
return noop.NewLoggerProvider(), noopShutdown, nil
}
opts := append(cfg.loggerProviderOptions, sdklog.WithResource(res))
var errs []error
for _, processor := range cfg.opentelemetryConfig.LoggerProvider.Processors {
sp, err := logProcessor(cfg.ctx, processor)
if err == nil {
opts = append(opts, sdklog.WithProcessor(sp))
} else {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return noop.NewLoggerProvider(), noopShutdown, errors.Join(errs...)
}
lp := sdklog.NewLoggerProvider(opts...)
return lp, lp.Shutdown, nil
}
func logProcessor(ctx context.Context, processor LogRecordProcessor) (sdklog.Processor, error) {
if processor.Batch != nil && processor.Simple != nil {
return nil, errors.New("must not specify multiple log processor type")
}
if processor.Batch != nil {
exp, err := logExporter(ctx, processor.Batch.Exporter)
if err != nil {
return nil, err
}
return batchLogProcessor(processor.Batch, exp)
}
if processor.Simple != nil {
exp, err := logExporter(ctx, processor.Simple.Exporter)
if err != nil {
return nil, err
}
return sdklog.NewSimpleProcessor(exp), nil
}
return nil, errors.New("unsupported log processor type, must be one of simple or batch")
}
func logExporter(ctx context.Context, exporter LogRecordExporter) (sdklog.Exporter, error) {
if exporter.Console != nil && exporter.OTLP != nil {
return nil, errors.New("must not specify multiple exporters")
}
if exporter.Console != nil {
return stdoutlog.New(
stdoutlog.WithPrettyPrint(),
)
}
if exporter.OTLP != nil && exporter.OTLP.Protocol != nil {
switch *exporter.OTLP.Protocol {
case protocolProtobufHTTP:
return otlpHTTPLogExporter(ctx, exporter.OTLP)
case protocolProtobufGRPC:
return otlpGRPCLogExporter(ctx, exporter.OTLP)
default:
return nil, fmt.Errorf("unsupported protocol %q", *exporter.OTLP.Protocol)
}
}
return nil, errors.New("no valid log exporter")
}
func batchLogProcessor(blp *BatchLogRecordProcessor, exp sdklog.Exporter) (*sdklog.BatchProcessor, error) {
var opts []sdklog.BatchProcessorOption
if blp.ExportTimeout != nil {
if *blp.ExportTimeout < 0 {
return nil, fmt.Errorf("invalid export timeout %d", *blp.ExportTimeout)
}
opts = append(opts, sdklog.WithExportTimeout(time.Millisecond*time.Duration(*blp.ExportTimeout)))
}
if blp.MaxExportBatchSize != nil {
if *blp.MaxExportBatchSize < 0 {
return nil, fmt.Errorf("invalid batch size %d", *blp.MaxExportBatchSize)
}
opts = append(opts, sdklog.WithExportMaxBatchSize(*blp.MaxExportBatchSize))
}
if blp.MaxQueueSize != nil {
if *blp.MaxQueueSize < 0 {
return nil, fmt.Errorf("invalid queue size %d", *blp.MaxQueueSize)
}
opts = append(opts, sdklog.WithMaxQueueSize(*blp.MaxQueueSize))
}
if blp.ScheduleDelay != nil {
if *blp.ScheduleDelay < 0 {
return nil, fmt.Errorf("invalid schedule delay %d", *blp.ScheduleDelay)
}
opts = append(opts, sdklog.WithExportInterval(time.Millisecond*time.Duration(*blp.ScheduleDelay)))
}
return sdklog.NewBatchProcessor(exp, opts...), nil
}
func otlpHTTPLogExporter(ctx context.Context, otlpConfig *OTLP) (sdklog.Exporter, error) {
var opts []otlploghttp.Option
if otlpConfig.Endpoint != nil {
u, err := url.ParseRequestURI(*otlpConfig.Endpoint)
if err != nil {
return nil, err
}
opts = append(opts, otlploghttp.WithEndpoint(u.Host))
if u.Scheme == "http" {
opts = append(opts, otlploghttp.WithInsecure())
}
if u.Path != "" {
opts = append(opts, otlploghttp.WithURLPath(u.Path))
}
}
if otlpConfig.Compression != nil {
switch *otlpConfig.Compression {
case compressionGzip:
opts = append(opts, otlploghttp.WithCompression(otlploghttp.GzipCompression))
case compressionNone:
opts = append(opts, otlploghttp.WithCompression(otlploghttp.NoCompression))
default:
return nil, fmt.Errorf("unsupported compression %q", *otlpConfig.Compression)
}
}
if otlpConfig.Timeout != nil && *otlpConfig.Timeout > 0 {
opts = append(opts, otlploghttp.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout)))
}
headersConfig, err := createHeadersConfig(otlpConfig.Headers, otlpConfig.HeadersList)
if err != nil {
return nil, err
}
if len(headersConfig) > 0 {
opts = append(opts, otlploghttp.WithHeaders(headersConfig))
}
tlsConfig, err := tls.CreateConfig(otlpConfig.Certificate, otlpConfig.ClientCertificate, otlpConfig.ClientKey)
if err != nil {
return nil, err
}
opts = append(opts, otlploghttp.WithTLSClientConfig(tlsConfig))
return otlploghttp.New(ctx, opts...)
}
func otlpGRPCLogExporter(ctx context.Context, otlpConfig *OTLP) (sdklog.Exporter, error) {
var opts []otlploggrpc.Option
if otlpConfig.Endpoint != nil {
u, err := url.ParseRequestURI(*otlpConfig.Endpoint)
if err != nil {
return nil, err
}
// ParseRequestURI leaves the Host field empty when no
// scheme is specified (i.e. localhost:4317). This check is
// here to support the case where a user may not specify a
// scheme. The code does its best effort here by using
// otlpConfig.Endpoint as-is in that case
if u.Host != "" {
opts = append(opts, otlploggrpc.WithEndpoint(u.Host))
} else {
opts = append(opts, otlploggrpc.WithEndpoint(*otlpConfig.Endpoint))
}
if u.Scheme == "http" || (u.Scheme != "https" && otlpConfig.Insecure != nil && *otlpConfig.Insecure) {
opts = append(opts, otlploggrpc.WithInsecure())
}
}
if otlpConfig.Compression != nil {
switch *otlpConfig.Compression {
case compressionGzip:
opts = append(opts, otlploggrpc.WithCompressor(*otlpConfig.Compression))
case compressionNone:
// none requires no options
default:
return nil, fmt.Errorf("unsupported compression %q", *otlpConfig.Compression)
}
}
if otlpConfig.Timeout != nil && *otlpConfig.Timeout > 0 {
opts = append(opts, otlploggrpc.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout)))
}
headersConfig, err := createHeadersConfig(otlpConfig.Headers, otlpConfig.HeadersList)
if err != nil {
return nil, err
}
if len(headersConfig) > 0 {
opts = append(opts, otlploggrpc.WithHeaders(headersConfig))
}
if otlpConfig.Certificate != nil || otlpConfig.ClientCertificate != nil || otlpConfig.ClientKey != nil {
tlsConfig, err := tls.CreateConfig(otlpConfig.Certificate, otlpConfig.ClientCertificate, otlpConfig.ClientKey)
if err != nil {
return nil, err
}
opts = append(opts, otlploggrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)))
}
return otlploggrpc.New(ctx, opts...)
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package otelconf // import "go.opentelemetry.io/contrib/otelconf/v0.3.0"
import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
"net"
"net/http"
"net/url"
"os"
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/sdk/instrumentation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
"google.golang.org/grpc/credentials"
"go.opentelemetry.io/contrib/otelconf/internal/tls"
)
var zeroScope instrumentation.Scope
const instrumentKindUndefined = sdkmetric.InstrumentKind(0)
func meterProvider(cfg configOptions, res *resource.Resource) (metric.MeterProvider, shutdownFunc, error) {
if cfg.opentelemetryConfig.MeterProvider == nil {
return noop.NewMeterProvider(), noopShutdown, nil
}
opts := append(cfg.meterProviderOptions, sdkmetric.WithResource(res))
var errs []error
for _, reader := range cfg.opentelemetryConfig.MeterProvider.Readers {
r, err := metricReader(cfg.ctx, reader)
if err == nil {
opts = append(opts, sdkmetric.WithReader(r))
} else {
errs = append(errs, err)
}
}
for _, vw := range cfg.opentelemetryConfig.MeterProvider.Views {
v, err := view(vw)
if err == nil {
opts = append(opts, sdkmetric.WithView(v))
} else {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return noop.NewMeterProvider(), noopShutdown, errors.Join(errs...)
}
mp := sdkmetric.NewMeterProvider(opts...)
return mp, mp.Shutdown, nil
}
func metricReader(ctx context.Context, r MetricReader) (sdkmetric.Reader, error) {
if r.Periodic != nil && r.Pull != nil {
return nil, errors.New("must not specify multiple metric reader type")
}
if r.Periodic != nil {
var opts []sdkmetric.PeriodicReaderOption
if r.Periodic.Interval != nil {
opts = append(opts, sdkmetric.WithInterval(time.Duration(*r.Periodic.Interval)*time.Millisecond))
}
if r.Periodic.Timeout != nil {
opts = append(opts, sdkmetric.WithTimeout(time.Duration(*r.Periodic.Timeout)*time.Millisecond))
}
return periodicExporter(ctx, r.Periodic.Exporter, opts...)
}
if r.Pull != nil {
return pullReader(ctx, r.Pull.Exporter)
}
return nil, errors.New("no valid metric reader")
}
func pullReader(ctx context.Context, exporter PullMetricExporter) (sdkmetric.Reader, error) {
if exporter.Prometheus != nil {
return prometheusReader(ctx, exporter.Prometheus)
}
return nil, errors.New("no valid metric exporter")
}
func periodicExporter(ctx context.Context, exporter PushMetricExporter, opts ...sdkmetric.PeriodicReaderOption) (sdkmetric.Reader, error) {
if exporter.Console != nil && exporter.OTLP != nil {
return nil, errors.New("must not specify multiple exporters")
}
if exporter.Console != nil {
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
exp, err := stdoutmetric.New(
stdoutmetric.WithEncoder(enc),
)
if err != nil {
return nil, err
}
return sdkmetric.NewPeriodicReader(exp, opts...), nil
}
if exporter.OTLP != nil && exporter.OTLP.Protocol != nil {
var err error
var exp sdkmetric.Exporter
switch *exporter.OTLP.Protocol {
case protocolProtobufHTTP:
exp, err = otlpHTTPMetricExporter(ctx, exporter.OTLP)
case protocolProtobufGRPC:
exp, err = otlpGRPCMetricExporter(ctx, exporter.OTLP)
default:
return nil, fmt.Errorf("unsupported protocol %q", *exporter.OTLP.Protocol)
}
if err != nil {
return nil, err
}
return sdkmetric.NewPeriodicReader(exp, opts...), nil
}
return nil, errors.New("no valid metric exporter")
}
func otlpHTTPMetricExporter(ctx context.Context, otlpConfig *OTLPMetric) (sdkmetric.Exporter, error) {
opts := []otlpmetrichttp.Option{}
if otlpConfig.Endpoint != nil {
u, err := url.ParseRequestURI(*otlpConfig.Endpoint)
if err != nil {
return nil, err
}
opts = append(opts, otlpmetrichttp.WithEndpoint(u.Host))
if u.Scheme == "http" {
opts = append(opts, otlpmetrichttp.WithInsecure())
}
if u.Path != "" {
opts = append(opts, otlpmetrichttp.WithURLPath(u.Path))
}
}
if otlpConfig.Compression != nil {
switch *otlpConfig.Compression {
case compressionGzip:
opts = append(opts, otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression))
case compressionNone:
opts = append(opts, otlpmetrichttp.WithCompression(otlpmetrichttp.NoCompression))
default:
return nil, fmt.Errorf("unsupported compression %q", *otlpConfig.Compression)
}
}
if otlpConfig.Timeout != nil {
opts = append(opts, otlpmetrichttp.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout)))
}
headersConfig, err := createHeadersConfig(otlpConfig.Headers, otlpConfig.HeadersList)
if err != nil {
return nil, err
}
if len(headersConfig) > 0 {
opts = append(opts, otlpmetrichttp.WithHeaders(headersConfig))
}
if otlpConfig.TemporalityPreference != nil {
switch *otlpConfig.TemporalityPreference {
case "delta":
opts = append(opts, otlpmetrichttp.WithTemporalitySelector(deltaTemporality))
case "cumulative":
opts = append(opts, otlpmetrichttp.WithTemporalitySelector(cumulativeTemporality))
case "lowmemory":
opts = append(opts, otlpmetrichttp.WithTemporalitySelector(lowMemory))
default:
return nil, fmt.Errorf("unsupported temporality preference %q", *otlpConfig.TemporalityPreference)
}
}
tlsConfig, err := tls.CreateConfig(otlpConfig.Certificate, otlpConfig.ClientCertificate, otlpConfig.ClientKey)
if err != nil {
return nil, err
}
opts = append(opts, otlpmetrichttp.WithTLSClientConfig(tlsConfig))
return otlpmetrichttp.New(ctx, opts...)
}
func otlpGRPCMetricExporter(ctx context.Context, otlpConfig *OTLPMetric) (sdkmetric.Exporter, error) {
var opts []otlpmetricgrpc.Option
if otlpConfig.Endpoint != nil {
u, err := url.ParseRequestURI(*otlpConfig.Endpoint)
if err != nil {
return nil, err
}
// ParseRequestURI leaves the Host field empty when no
// scheme is specified (i.e. localhost:4317). This check is
// here to support the case where a user may not specify a
// scheme. The code does its best effort here by using
// otlpConfig.Endpoint as-is in that case
if u.Host != "" {
opts = append(opts, otlpmetricgrpc.WithEndpoint(u.Host))
} else {
opts = append(opts, otlpmetricgrpc.WithEndpoint(*otlpConfig.Endpoint))
}
if u.Scheme == "http" || (u.Scheme != "https" && otlpConfig.Insecure != nil && *otlpConfig.Insecure) {
opts = append(opts, otlpmetricgrpc.WithInsecure())
}
}
if otlpConfig.Compression != nil {
switch *otlpConfig.Compression {
case compressionGzip:
opts = append(opts, otlpmetricgrpc.WithCompressor(*otlpConfig.Compression))
case compressionNone:
// none requires no options
default:
return nil, fmt.Errorf("unsupported compression %q", *otlpConfig.Compression)
}
}
if otlpConfig.Timeout != nil && *otlpConfig.Timeout > 0 {
opts = append(opts, otlpmetricgrpc.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout)))
}
headersConfig, err := createHeadersConfig(otlpConfig.Headers, otlpConfig.HeadersList)
if err != nil {
return nil, err
}
if len(headersConfig) > 0 {
opts = append(opts, otlpmetricgrpc.WithHeaders(headersConfig))
}
if otlpConfig.TemporalityPreference != nil {
switch *otlpConfig.TemporalityPreference {
case "delta":
opts = append(opts, otlpmetricgrpc.WithTemporalitySelector(deltaTemporality))
case "cumulative":
opts = append(opts, otlpmetricgrpc.WithTemporalitySelector(cumulativeTemporality))
case "lowmemory":
opts = append(opts, otlpmetricgrpc.WithTemporalitySelector(lowMemory))
default:
return nil, fmt.Errorf("unsupported temporality preference %q", *otlpConfig.TemporalityPreference)
}
}
if otlpConfig.Certificate != nil || otlpConfig.ClientCertificate != nil || otlpConfig.ClientKey != nil {
tlsConfig, err := tls.CreateConfig(otlpConfig.Certificate, otlpConfig.ClientCertificate, otlpConfig.ClientKey)
if err != nil {
return nil, err
}
opts = append(opts, otlpmetricgrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)))
}
return otlpmetricgrpc.New(ctx, opts...)
}
func cumulativeTemporality(sdkmetric.InstrumentKind) metricdata.Temporality {
return metricdata.CumulativeTemporality
}
func deltaTemporality(ik sdkmetric.InstrumentKind) metricdata.Temporality {
switch ik {
case sdkmetric.InstrumentKindCounter, sdkmetric.InstrumentKindHistogram, sdkmetric.InstrumentKindObservableCounter:
return metricdata.DeltaTemporality
default:
return metricdata.CumulativeTemporality
}
}
func lowMemory(ik sdkmetric.InstrumentKind) metricdata.Temporality {
switch ik {
case sdkmetric.InstrumentKindCounter, sdkmetric.InstrumentKindHistogram:
return metricdata.DeltaTemporality
default:
return metricdata.CumulativeTemporality
}
}
// newIncludeExcludeFilter returns a Filter that includes attributes
// in the include list and excludes attributes in the excludes list.
// It returns an error if an attribute is in both lists
//
// If IncludeExclude is empty a include-all filter is returned.
func newIncludeExcludeFilter(lists *IncludeExclude) (attribute.Filter, error) {
if lists == nil {
return func(attribute.KeyValue) bool { return true }, nil
}
included := make(map[attribute.Key]struct{})
for _, k := range lists.Included {
included[attribute.Key(k)] = struct{}{}
}
excluded := make(map[attribute.Key]struct{})
for _, k := range lists.Excluded {
if _, ok := included[attribute.Key(k)]; ok {
return nil, fmt.Errorf("attribute cannot be in both include and exclude list: %s", k)
}
excluded[attribute.Key(k)] = struct{}{}
}
return func(kv attribute.KeyValue) bool {
// check if a value is excluded first
if _, ok := excluded[kv.Key]; ok {
return false
}
if len(included) == 0 {
return true
}
_, ok := included[kv.Key]
return ok
}, nil
}
func prometheusReader(ctx context.Context, prometheusConfig *Prometheus) (sdkmetric.Reader, error) {
if prometheusConfig.Host == nil {
return nil, errors.New("host must be specified")
}
if prometheusConfig.Port == nil {
return nil, errors.New("port must be specified")
}
opts, err := prometheusReaderOpts(prometheusConfig)
if err != nil {
return nil, err
}
reg := prometheus.NewRegistry()
opts = append(opts, otelprom.WithRegisterer(reg))
reader, err := otelprom.New(opts...)
if err != nil {
return nil, fmt.Errorf("error creating otel prometheus exporter: %w", err)
}
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))
server := http.Server{
// Timeouts are necessary to make a server resilient to attacks.
// We use values from this example: https://blog.cloudflare.com/exposing-go-on-the-internet/#:~:text=There%20are%20three%20main%20timeouts
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 120 * time.Second,
Handler: mux,
}
// Remove surrounding "[]" from the host definition to allow users to define the host as "[::1]" or "::1".
host := *prometheusConfig.Host
if len(host) > 2 && host[0] == '[' && host[len(host)-1] == ']' {
host = host[1 : len(host)-1]
}
addr := net.JoinHostPort(host, strconv.Itoa(*prometheusConfig.Port))
lis, err := net.Listen("tcp", addr)
if err != nil {
return nil, errors.Join(
fmt.Errorf("binding address %s for Prometheus exporter: %w", addr, err),
reader.Shutdown(ctx),
)
}
// Only for testing reasons, add the address to the http Server, will not be used.
server.Addr = lis.Addr().String()
go func() {
if err := server.Serve(lis); err != nil && !errors.Is(err, http.ErrServerClosed) {
otel.Handle(fmt.Errorf("the Prometheus HTTP server exited unexpectedly: %w", err))
}
}()
return readerWithServer{reader, &server}, nil
}
func prometheusReaderOpts(prometheusConfig *Prometheus) ([]otelprom.Option, error) {
var opts []otelprom.Option
if prometheusConfig.WithoutScopeInfo != nil && *prometheusConfig.WithoutScopeInfo {
opts = append(opts, otelprom.WithoutScopeInfo())
}
if prometheusConfig.WithoutTypeSuffix != nil && *prometheusConfig.WithoutTypeSuffix {
opts = append(opts, otelprom.WithoutCounterSuffixes()) //nolint:staticcheck // WithouTypeSuffix is deprecated, but we still need it for backwards compatibility.
}
if prometheusConfig.WithoutUnits != nil && *prometheusConfig.WithoutUnits {
opts = append(opts, otelprom.WithoutUnits()) //nolint:staticcheck // WithouTypeSuffix is deprecated, but we still need it for backwards compatibility.
}
if prometheusConfig.WithResourceConstantLabels != nil {
f, err := newIncludeExcludeFilter(prometheusConfig.WithResourceConstantLabels)
if err != nil {
return nil, err
}
opts = append(opts, otelprom.WithResourceAsConstantLabels(f))
}
return opts, nil
}
type readerWithServer struct {
sdkmetric.Reader
server *http.Server
}
func (rws readerWithServer) Shutdown(ctx context.Context) error {
return errors.Join(
rws.Reader.Shutdown(ctx),
rws.server.Shutdown(ctx),
)
}
func view(v View) (sdkmetric.View, error) {
if v.Selector == nil {
return nil, errors.New("view: no selector provided")
}
inst, err := instrument(*v.Selector)
if err != nil {
return nil, err
}
s, err := stream(v.Stream)
if err != nil {
return nil, err
}
return sdkmetric.NewView(inst, s), nil
}
func instrument(vs ViewSelector) (sdkmetric.Instrument, error) {
kind, err := instrumentKind(vs.InstrumentType)
if err != nil {
return sdkmetric.Instrument{}, fmt.Errorf("view_selector: %w", err)
}
inst := sdkmetric.Instrument{
Name: strOrEmpty(vs.InstrumentName),
Unit: strOrEmpty(vs.Unit),
Kind: kind,
Scope: instrumentation.Scope{
Name: strOrEmpty(vs.MeterName),
Version: strOrEmpty(vs.MeterVersion),
SchemaURL: strOrEmpty(vs.MeterSchemaUrl),
},
}
if instrumentIsEmpty(inst) {
return sdkmetric.Instrument{}, errors.New("view_selector: empty selector not supporter")
}
return inst, nil
}
func stream(vs *ViewStream) (sdkmetric.Stream, error) {
if vs == nil {
return sdkmetric.Stream{}, nil
}
f, err := newIncludeExcludeFilter(vs.AttributeKeys)
if err != nil {
return sdkmetric.Stream{}, err
}
return sdkmetric.Stream{
Name: strOrEmpty(vs.Name),
Description: strOrEmpty(vs.Description),
Aggregation: aggregation(vs.Aggregation),
AttributeFilter: f,
}, nil
}
func aggregation(aggr *ViewStreamAggregation) sdkmetric.Aggregation {
if aggr == nil {
return nil
}
if aggr.Base2ExponentialBucketHistogram != nil {
return sdkmetric.AggregationBase2ExponentialHistogram{
MaxSize: int32OrZero(aggr.Base2ExponentialBucketHistogram.MaxSize),
MaxScale: int32OrZero(aggr.Base2ExponentialBucketHistogram.MaxScale),
// Need to negate because config has the positive action RecordMinMax.
NoMinMax: !boolOrFalse(aggr.Base2ExponentialBucketHistogram.RecordMinMax),
}
}
if aggr.Default != nil {
// TODO: Understand what to set here.
return nil
}
if aggr.Drop != nil {
return sdkmetric.AggregationDrop{}
}
if aggr.ExplicitBucketHistogram != nil {
return sdkmetric.AggregationExplicitBucketHistogram{
Boundaries: aggr.ExplicitBucketHistogram.Boundaries,
// Need to negate because config has the positive action RecordMinMax.
NoMinMax: !boolOrFalse(aggr.ExplicitBucketHistogram.RecordMinMax),
}
}
if aggr.LastValue != nil {
return sdkmetric.AggregationLastValue{}
}
if aggr.Sum != nil {
return sdkmetric.AggregationSum{}
}
return nil
}
func instrumentKind(vsit *ViewSelectorInstrumentType) (sdkmetric.InstrumentKind, error) {
if vsit == nil {
// Equivalent to instrumentKindUndefined.
return instrumentKindUndefined, nil
}
switch *vsit {
case ViewSelectorInstrumentTypeCounter:
return sdkmetric.InstrumentKindCounter, nil
case ViewSelectorInstrumentTypeUpDownCounter:
return sdkmetric.InstrumentKindUpDownCounter, nil
case ViewSelectorInstrumentTypeHistogram:
return sdkmetric.InstrumentKindHistogram, nil
case ViewSelectorInstrumentTypeObservableCounter:
return sdkmetric.InstrumentKindObservableCounter, nil
case ViewSelectorInstrumentTypeObservableUpDownCounter:
return sdkmetric.InstrumentKindObservableUpDownCounter, nil
case ViewSelectorInstrumentTypeObservableGauge:
return sdkmetric.InstrumentKindObservableGauge, nil
}
return instrumentKindUndefined, errors.New("instrument_type: invalid value")
}
func instrumentIsEmpty(i sdkmetric.Instrument) bool {
return i.Name == "" &&
i.Description == "" &&
i.Kind == instrumentKindUndefined &&
i.Unit == "" &&
i.Scope == zeroScope
}
func boolOrFalse(pBool *bool) bool {
if pBool == nil {
return false
}
return *pBool
}
func int32OrZero(pInt *int) int32 {
if pInt == nil {
return 0
}
i := *pInt
if i > math.MaxInt32 {
return math.MaxInt32
}
if i < math.MinInt32 {
return math.MinInt32
}
return int32(i)
}
func strOrEmpty(pStr *string) string {
if pStr == nil {
return ""
}
return *pStr
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package otelconf // import "go.opentelemetry.io/contrib/otelconf/v0.3.0"
import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/contrib/otelconf/internal/kv"
)
func newResource(res *Resource) *resource.Resource {
if res == nil {
return resource.Default()
}
var attrs []attribute.KeyValue
for _, v := range res.Attributes {
attrs = append(attrs, kv.FromNameValue(v.Name, v.Value))
}
if res.SchemaUrl == nil {
return resource.NewSchemaless(attrs...)
}
return resource.NewWithAttributes(*res.SchemaUrl, attrs...)
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package otelconf // import "go.opentelemetry.io/contrib/otelconf/v0.3.0"
import (
"context"
"errors"
"fmt"
"net/url"
"time"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"google.golang.org/grpc/credentials"
"go.opentelemetry.io/contrib/otelconf/internal/tls"
)
var errInvalidSamplerConfiguration = errors.New("invalid sampler configuration")
func tracerProvider(cfg configOptions, res *resource.Resource) (trace.TracerProvider, shutdownFunc, error) {
if cfg.opentelemetryConfig.TracerProvider == nil {
return noop.NewTracerProvider(), noopShutdown, nil
}
opts := append(cfg.tracerProviderOptions, sdktrace.WithResource(res))
var errs []error
for _, processor := range cfg.opentelemetryConfig.TracerProvider.Processors {
sp, err := spanProcessor(cfg.ctx, processor)
if err == nil {
opts = append(opts, sdktrace.WithSpanProcessor(sp))
} else {
errs = append(errs, err)
}
}
if s, err := sampler(cfg.opentelemetryConfig.TracerProvider.Sampler); err == nil {
opts = append(opts, sdktrace.WithSampler(s))
} else {
errs = append(errs, err)
}
if len(errs) > 0 {
return noop.NewTracerProvider(), noopShutdown, errors.Join(errs...)
}
tp := sdktrace.NewTracerProvider(opts...)
return tp, tp.Shutdown, nil
}
func parentBasedSampler(s *SamplerParentBased) (sdktrace.Sampler, error) {
var rootSampler sdktrace.Sampler
var opts []sdktrace.ParentBasedSamplerOption
var errs []error
var err error
if s.Root == nil {
rootSampler = sdktrace.AlwaysSample()
} else {
rootSampler, err = sampler(s.Root)
if err != nil {
errs = append(errs, err)
}
}
if s.RemoteParentSampled != nil {
remoteParentSampler, err := sampler(s.RemoteParentSampled)
if err != nil {
errs = append(errs, err)
} else {
opts = append(opts, sdktrace.WithRemoteParentSampled(remoteParentSampler))
}
}
if s.RemoteParentNotSampled != nil {
remoteParentNotSampler, err := sampler(s.RemoteParentNotSampled)
if err != nil {
errs = append(errs, err)
} else {
opts = append(opts, sdktrace.WithRemoteParentNotSampled(remoteParentNotSampler))
}
}
if s.LocalParentSampled != nil {
localParentSampler, err := sampler(s.LocalParentSampled)
if err != nil {
errs = append(errs, err)
} else {
opts = append(opts, sdktrace.WithLocalParentSampled(localParentSampler))
}
}
if s.LocalParentNotSampled != nil {
localParentNotSampler, err := sampler(s.LocalParentNotSampled)
if err != nil {
errs = append(errs, err)
} else {
opts = append(opts, sdktrace.WithLocalParentNotSampled(localParentNotSampler))
}
}
if len(errs) > 0 {
return nil, errors.Join(errs...)
}
return sdktrace.ParentBased(rootSampler, opts...), nil
}
func sampler(s *Sampler) (sdktrace.Sampler, error) {
if s == nil {
// If omitted, parent based sampler with a root of always_on is used.
return sdktrace.ParentBased(sdktrace.AlwaysSample()), nil
}
if s.ParentBased != nil {
return parentBasedSampler(s.ParentBased)
}
if s.AlwaysOff != nil {
return sdktrace.NeverSample(), nil
}
if s.AlwaysOn != nil {
return sdktrace.AlwaysSample(), nil
}
if s.TraceIDRatioBased != nil {
if s.TraceIDRatioBased.Ratio == nil {
return sdktrace.TraceIDRatioBased(1), nil
}
return sdktrace.TraceIDRatioBased(*s.TraceIDRatioBased.Ratio), nil
}
return nil, errInvalidSamplerConfiguration
}
func spanExporter(ctx context.Context, exporter SpanExporter) (sdktrace.SpanExporter, error) {
if exporter.Console != nil && exporter.OTLP != nil {
return nil, errors.New("must not specify multiple exporters")
}
if exporter.Console != nil {
return stdouttrace.New(
stdouttrace.WithPrettyPrint(),
)
}
if exporter.OTLP != nil && exporter.OTLP.Protocol != nil {
switch *exporter.OTLP.Protocol {
case protocolProtobufHTTP:
return otlpHTTPSpanExporter(ctx, exporter.OTLP)
case protocolProtobufGRPC:
return otlpGRPCSpanExporter(ctx, exporter.OTLP)
default:
return nil, fmt.Errorf("unsupported protocol %q", *exporter.OTLP.Protocol)
}
}
return nil, errors.New("no valid span exporter")
}
func spanProcessor(ctx context.Context, processor SpanProcessor) (sdktrace.SpanProcessor, error) {
if processor.Batch != nil && processor.Simple != nil {
return nil, errors.New("must not specify multiple span processor type")
}
if processor.Batch != nil {
exp, err := spanExporter(ctx, processor.Batch.Exporter)
if err != nil {
return nil, err
}
return batchSpanProcessor(processor.Batch, exp)
}
if processor.Simple != nil {
exp, err := spanExporter(ctx, processor.Simple.Exporter)
if err != nil {
return nil, err
}
return sdktrace.NewSimpleSpanProcessor(exp), nil
}
return nil, errors.New("unsupported span processor type, must be one of simple or batch")
}
func otlpGRPCSpanExporter(ctx context.Context, otlpConfig *OTLP) (sdktrace.SpanExporter, error) {
var opts []otlptracegrpc.Option
if otlpConfig.Endpoint != nil {
u, err := url.ParseRequestURI(*otlpConfig.Endpoint)
if err != nil {
return nil, err
}
// ParseRequestURI leaves the Host field empty when no
// scheme is specified (i.e. localhost:4317). This check is
// here to support the case where a user may not specify a
// scheme. The code does its best effort here by using
// otlpConfig.Endpoint as-is in that case.
if u.Host != "" {
opts = append(opts, otlptracegrpc.WithEndpoint(u.Host))
} else {
opts = append(opts, otlptracegrpc.WithEndpoint(*otlpConfig.Endpoint))
}
if u.Scheme == "http" || (u.Scheme != "https" && otlpConfig.Insecure != nil && *otlpConfig.Insecure) {
opts = append(opts, otlptracegrpc.WithInsecure())
}
}
if otlpConfig.Compression != nil {
switch *otlpConfig.Compression {
case compressionGzip:
opts = append(opts, otlptracegrpc.WithCompressor(*otlpConfig.Compression))
case compressionNone:
// none requires no options
default:
return nil, fmt.Errorf("unsupported compression %q", *otlpConfig.Compression)
}
}
if otlpConfig.Timeout != nil && *otlpConfig.Timeout > 0 {
opts = append(opts, otlptracegrpc.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout)))
}
headersConfig, err := createHeadersConfig(otlpConfig.Headers, otlpConfig.HeadersList)
if err != nil {
return nil, err
}
if len(headersConfig) > 0 {
opts = append(opts, otlptracegrpc.WithHeaders(headersConfig))
}
if otlpConfig.Certificate != nil || otlpConfig.ClientCertificate != nil || otlpConfig.ClientKey != nil {
tlsConfig, err := tls.CreateConfig(otlpConfig.Certificate, otlpConfig.ClientCertificate, otlpConfig.ClientKey)
if err != nil {
return nil, err
}
opts = append(opts, otlptracegrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)))
}
return otlptracegrpc.New(ctx, opts...)
}
func otlpHTTPSpanExporter(ctx context.Context, otlpConfig *OTLP) (sdktrace.SpanExporter, error) {
var opts []otlptracehttp.Option
if otlpConfig.Endpoint != nil {
u, err := url.ParseRequestURI(*otlpConfig.Endpoint)
if err != nil {
return nil, err
}
opts = append(opts, otlptracehttp.WithEndpoint(u.Host))
if u.Scheme == "http" {
opts = append(opts, otlptracehttp.WithInsecure())
}
if u.Path != "" {
opts = append(opts, otlptracehttp.WithURLPath(u.Path))
}
}
if otlpConfig.Compression != nil {
switch *otlpConfig.Compression {
case compressionGzip:
opts = append(opts, otlptracehttp.WithCompression(otlptracehttp.GzipCompression))
case compressionNone:
opts = append(opts, otlptracehttp.WithCompression(otlptracehttp.NoCompression))
default:
return nil, fmt.Errorf("unsupported compression %q", *otlpConfig.Compression)
}
}
if otlpConfig.Timeout != nil && *otlpConfig.Timeout > 0 {
opts = append(opts, otlptracehttp.WithTimeout(time.Millisecond*time.Duration(*otlpConfig.Timeout)))
}
headersConfig, err := createHeadersConfig(otlpConfig.Headers, otlpConfig.HeadersList)
if err != nil {
return nil, err
}
if len(headersConfig) > 0 {
opts = append(opts, otlptracehttp.WithHeaders(headersConfig))
}
tlsConfig, err := tls.CreateConfig(otlpConfig.Certificate, otlpConfig.ClientCertificate, otlpConfig.ClientKey)
if err != nil {
return nil, err
}
opts = append(opts, otlptracehttp.WithTLSClientConfig(tlsConfig))
return otlptracehttp.New(ctx, opts...)
}
func batchSpanProcessor(bsp *BatchSpanProcessor, exp sdktrace.SpanExporter) (sdktrace.SpanProcessor, error) {
var opts []sdktrace.BatchSpanProcessorOption
if bsp.ExportTimeout != nil {
if *bsp.ExportTimeout < 0 {
return nil, fmt.Errorf("invalid export timeout %d", *bsp.ExportTimeout)
}
opts = append(opts, sdktrace.WithExportTimeout(time.Millisecond*time.Duration(*bsp.ExportTimeout)))
}
if bsp.MaxExportBatchSize != nil {
if *bsp.MaxExportBatchSize < 0 {
return nil, fmt.Errorf("invalid batch size %d", *bsp.MaxExportBatchSize)
}
opts = append(opts, sdktrace.WithMaxExportBatchSize(*bsp.MaxExportBatchSize))
}
if bsp.MaxQueueSize != nil {
if *bsp.MaxQueueSize < 0 {
return nil, fmt.Errorf("invalid queue size %d", *bsp.MaxQueueSize)
}
opts = append(opts, sdktrace.WithMaxQueueSize(*bsp.MaxQueueSize))
}
if bsp.ScheduleDelay != nil {
if *bsp.ScheduleDelay < 0 {
return nil, fmt.Errorf("invalid schedule delay %d", *bsp.ScheduleDelay)
}
opts = append(opts, sdktrace.WithBatchTimeout(time.Millisecond*time.Duration(*bsp.ScheduleDelay)))
}
return sdktrace.NewBatchSpanProcessor(exp, opts...), nil
}