// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package attribute // import "go.opentelemetry.io/otel/attribute"
import (
"bytes"
"sync"
"sync/atomic"
)
type (
// Encoder is a mechanism for serializing an attribute set into a specific
// string representation that supports caching, to avoid repeated
// serialization. An example could be an exporter encoding the attribute
// set into a wire representation.
Encoder interface {
// Encode returns the serialized encoding of the attribute set using
// its Iterator. This result may be cached by an attribute.Set.
Encode(iterator Iterator) string
// ID returns a value that is unique for each class of attribute
// encoder. Attribute encoders allocate these using `NewEncoderID`.
ID() EncoderID
}
// EncoderID is used to identify distinct Encoder
// implementations, for caching encoded results.
EncoderID struct {
value uint64
}
// defaultAttrEncoder uses a sync.Pool of buffers to reduce the number of
// allocations used in encoding attributes. This implementation encodes a
// comma-separated list of key=value, with '/'-escaping of '=', ',', and
// '\'.
defaultAttrEncoder struct {
// pool is a pool of attribute set builders. The buffers in this pool
// grow to a size that most attribute encodings will not allocate new
// memory.
pool sync.Pool // *bytes.Buffer
}
)
// escapeChar is used to ensure uniqueness of the attribute encoding where
// keys or values contain either '=' or ','. Since there is no parser needed
// for this encoding and its only requirement is to be unique, this choice is
// arbitrary. Users will see these in some exporters (e.g., stdout), so the
// backslash ('\') is used as a conventional choice.
const escapeChar = '\\'
var (
_ Encoder = &defaultAttrEncoder{}
// encoderIDCounter is for generating IDs for other attribute encoders.
encoderIDCounter uint64
defaultEncoderOnce sync.Once
defaultEncoderID = NewEncoderID()
defaultEncoderInstance *defaultAttrEncoder
)
// NewEncoderID returns a unique attribute encoder ID. It should be called
// once per each type of attribute encoder. Preferably in init() or in var
// definition.
func NewEncoderID() EncoderID {
return EncoderID{value: atomic.AddUint64(&encoderIDCounter, 1)}
}
// DefaultEncoder returns an attribute encoder that encodes attributes in such
// a way that each escaped attribute's key is followed by an equal sign and
// then by an escaped attribute's value. All key-value pairs are separated by
// a comma.
//
// Escaping is done by prepending a backslash before either a backslash, equal
// sign or a comma.
func DefaultEncoder() Encoder {
defaultEncoderOnce.Do(func() {
defaultEncoderInstance = &defaultAttrEncoder{
pool: sync.Pool{
New: func() any {
return &bytes.Buffer{}
},
},
}
})
return defaultEncoderInstance
}
// Encode is a part of an implementation of the AttributeEncoder interface.
func (d *defaultAttrEncoder) Encode(iter Iterator) string {
buf := d.pool.Get().(*bytes.Buffer)
defer d.pool.Put(buf)
buf.Reset()
for iter.Next() {
i, keyValue := iter.IndexedAttribute()
if i > 0 {
_ = buf.WriteByte(',')
}
copyAndEscape(buf, string(keyValue.Key))
_ = buf.WriteByte('=')
if keyValue.Value.Type() == STRING {
copyAndEscape(buf, keyValue.Value.AsString())
} else {
_, _ = buf.WriteString(keyValue.Value.Emit())
}
}
return buf.String()
}
// ID is a part of an implementation of the AttributeEncoder interface.
func (*defaultAttrEncoder) ID() EncoderID {
return defaultEncoderID
}
// copyAndEscape escapes `=`, `,` and its own escape character (`\`),
// making the default encoding unique.
func copyAndEscape(buf *bytes.Buffer, val string) {
for _, ch := range val {
switch ch {
case '=', ',', escapeChar:
_ = buf.WriteByte(escapeChar)
}
_, _ = buf.WriteRune(ch)
}
}
// Valid reports whether this encoder ID was allocated by
// [NewEncoderID]. Invalid encoder IDs will not be cached.
func (id EncoderID) Valid() bool {
return id.value != 0
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package attribute // import "go.opentelemetry.io/otel/attribute"
// Filter supports removing certain attributes from attribute sets. When
// the filter returns true, the attribute will be kept in the filtered
// attribute set. When the filter returns false, the attribute is excluded
// from the filtered attribute set, and the attribute instead appears in
// the removed list of excluded attributes.
type Filter func(KeyValue) bool
// NewAllowKeysFilter returns a Filter that only allows attributes with one of
// the provided keys.
//
// If keys is empty a deny-all filter is returned.
func NewAllowKeysFilter(keys ...Key) Filter {
if len(keys) == 0 {
return func(KeyValue) bool { return false }
}
allowed := make(map[Key]struct{}, len(keys))
for _, k := range keys {
allowed[k] = struct{}{}
}
return func(kv KeyValue) bool {
_, ok := allowed[kv.Key]
return ok
}
}
// NewDenyKeysFilter returns a Filter that only allows attributes
// that do not have one of the provided keys.
//
// If keys is empty an allow-all filter is returned.
func NewDenyKeysFilter(keys ...Key) Filter {
if len(keys) == 0 {
return func(KeyValue) bool { return true }
}
forbid := make(map[Key]struct{}, len(keys))
for _, k := range keys {
forbid[k] = struct{}{}
}
return func(kv KeyValue) bool {
_, ok := forbid[kv.Key]
return !ok
}
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package attribute // import "go.opentelemetry.io/otel/attribute"
import (
"fmt"
"reflect"
"go.opentelemetry.io/otel/attribute/internal/fnv"
)
// Type identifiers. These identifiers are hashed before the value of the
// corresponding type. This is done to distinguish values that are hashed with
// the same value representation (e.g. `int64(1)` and `true`, []int64{0} and
// int64(0)).
//
// These are all 8 byte length strings converted to a uint64 representation. A
// uint64 is used instead of the string directly as an optimization, it avoids
// the for loop in [fnv] which adds minor overhead.
const (
boolID uint64 = 7953749933313450591 // "_boolean" (little endian)
int64ID uint64 = 7592915492740740150 // "64_bit_i" (little endian)
float64ID uint64 = 7376742710626956342 // "64_bit_f" (little endian)
stringID uint64 = 6874584755375207263 // "_string_" (little endian)
boolSliceID uint64 = 6875993255270243167 // "_[]bool_" (little endian)
int64SliceID uint64 = 3762322556277578591 // "_[]int64" (little endian)
float64SliceID uint64 = 7308324551835016539 // "[]double" (little endian)
stringSliceID uint64 = 7453010373645655387 // "[]string" (little endian)
)
// hashKVs returns a new FNV-1a hash of kvs.
func hashKVs(kvs []KeyValue) fnv.Hash {
h := fnv.New()
for _, kv := range kvs {
h = hashKV(h, kv)
}
return h
}
// hashKV returns the FNV-1a hash of kv with h as the base.
func hashKV(h fnv.Hash, kv KeyValue) fnv.Hash {
h = h.String(string(kv.Key))
switch kv.Value.Type() {
case BOOL:
h = h.Uint64(boolID)
h = h.Uint64(kv.Value.numeric)
case INT64:
h = h.Uint64(int64ID)
h = h.Uint64(kv.Value.numeric)
case FLOAT64:
h = h.Uint64(float64ID)
// Assumes numeric stored with math.Float64bits.
h = h.Uint64(kv.Value.numeric)
case STRING:
h = h.Uint64(stringID)
h = h.String(kv.Value.stringly)
case BOOLSLICE:
h = h.Uint64(boolSliceID)
rv := reflect.ValueOf(kv.Value.slice)
for i := 0; i < rv.Len(); i++ {
h = h.Bool(rv.Index(i).Bool())
}
case INT64SLICE:
h = h.Uint64(int64SliceID)
rv := reflect.ValueOf(kv.Value.slice)
for i := 0; i < rv.Len(); i++ {
h = h.Int64(rv.Index(i).Int())
}
case FLOAT64SLICE:
h = h.Uint64(float64SliceID)
rv := reflect.ValueOf(kv.Value.slice)
for i := 0; i < rv.Len(); i++ {
h = h.Float64(rv.Index(i).Float())
}
case STRINGSLICE:
h = h.Uint64(stringSliceID)
rv := reflect.ValueOf(kv.Value.slice)
for i := 0; i < rv.Len(); i++ {
h = h.String(rv.Index(i).String())
}
case INVALID:
default:
// Logging is an alternative, but using the internal logger here
// causes an import cycle so it is not done.
v := kv.Value.AsInterface()
msg := fmt.Sprintf("unknown value type: %[1]v (%[1]T)", v)
panic(msg)
}
return h
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
/*
Package attribute provide several helper functions for some commonly used
logic of processing attributes.
*/
package attribute // import "go.opentelemetry.io/otel/attribute/internal"
import (
"reflect"
)
// BoolSliceValue converts a bool slice into an array with same elements as slice.
func BoolSliceValue(v []bool) any {
var zero bool
cp := reflect.New(reflect.ArrayOf(len(v), reflect.TypeOf(zero))).Elem()
reflect.Copy(cp, reflect.ValueOf(v))
return cp.Interface()
}
// Int64SliceValue converts an int64 slice into an array with same elements as slice.
func Int64SliceValue(v []int64) any {
var zero int64
cp := reflect.New(reflect.ArrayOf(len(v), reflect.TypeOf(zero))).Elem()
reflect.Copy(cp, reflect.ValueOf(v))
return cp.Interface()
}
// Float64SliceValue converts a float64 slice into an array with same elements as slice.
func Float64SliceValue(v []float64) any {
var zero float64
cp := reflect.New(reflect.ArrayOf(len(v), reflect.TypeOf(zero))).Elem()
reflect.Copy(cp, reflect.ValueOf(v))
return cp.Interface()
}
// StringSliceValue converts a string slice into an array with same elements as slice.
func StringSliceValue(v []string) any {
var zero string
cp := reflect.New(reflect.ArrayOf(len(v), reflect.TypeOf(zero))).Elem()
reflect.Copy(cp, reflect.ValueOf(v))
return cp.Interface()
}
// AsBoolSlice converts a bool array into a slice into with same elements as array.
func AsBoolSlice(v any) []bool {
rv := reflect.ValueOf(v)
if rv.Type().Kind() != reflect.Array {
return nil
}
cpy := make([]bool, rv.Len())
if len(cpy) > 0 {
_ = reflect.Copy(reflect.ValueOf(cpy), rv)
}
return cpy
}
// AsInt64Slice converts an int64 array into a slice into with same elements as array.
func AsInt64Slice(v any) []int64 {
rv := reflect.ValueOf(v)
if rv.Type().Kind() != reflect.Array {
return nil
}
cpy := make([]int64, rv.Len())
if len(cpy) > 0 {
_ = reflect.Copy(reflect.ValueOf(cpy), rv)
}
return cpy
}
// AsFloat64Slice converts a float64 array into a slice into with same elements as array.
func AsFloat64Slice(v any) []float64 {
rv := reflect.ValueOf(v)
if rv.Type().Kind() != reflect.Array {
return nil
}
cpy := make([]float64, rv.Len())
if len(cpy) > 0 {
_ = reflect.Copy(reflect.ValueOf(cpy), rv)
}
return cpy
}
// AsStringSlice converts a string array into a slice into with same elements as array.
func AsStringSlice(v any) []string {
rv := reflect.ValueOf(v)
if rv.Type().Kind() != reflect.Array {
return nil
}
cpy := make([]string, rv.Len())
if len(cpy) > 0 {
_ = reflect.Copy(reflect.ValueOf(cpy), rv)
}
return cpy
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package fnv provides an efficient and allocation free implementation of the
// FNV-1a, non-cryptographic hash functions created by Glenn Fowler, Landon
// Curt Noll, and Phong Vo. See
// https://en.wikipedia.org/wiki/Fowler-Noll-Vo_hash_function.
//
// This implementation is provided as an alternative to "hash/fnv". The
// built-in implementation requires two allocations per Write for a string (one
// for the hash pointer and the other to convert a string to a []byte). This
// implementation is more efficientient and does not require any allocations.
package fnv // import "go.opentelemetry.io/otel/attribute/internal/fnv"
import (
"math"
)
// Taken from "hash/fnv". Verified at:
//
// - https://datatracker.ietf.org/doc/html/draft-eastlake-fnv-17.html
// - http://www.isthe.com/chongo/tech/comp/fnv/index.html#FNV-param
const (
offset64 = 14695981039346656037
prime64 = 1099511628211
)
// Hash is an FNV-1a hash with appropriate hashing functions for methods.
type Hash uint64
// New returns a new initialized 64-bit FNV-1a Hash. Its value is laid out in
// big-endian byte order.
func New() Hash {
return offset64
}
func (h Hash) Uint64(val uint64) Hash {
v := uint64(h)
v = (v ^ ((val >> 56) & 0xFF)) * prime64
v = (v ^ ((val >> 48) & 0xFF)) * prime64
v = (v ^ ((val >> 40) & 0xFF)) * prime64
v = (v ^ ((val >> 32) & 0xFF)) * prime64
v = (v ^ ((val >> 24) & 0xFF)) * prime64
v = (v ^ ((val >> 16) & 0xFF)) * prime64
v = (v ^ ((val >> 8) & 0xFF)) * prime64
v = (v ^ ((val >> 0) & 0xFF)) * prime64
return Hash(v)
}
func (h Hash) Bool(val bool) Hash { // nolint:revive // val is not a flag.
if val {
return h.Uint64(1)
}
return h.Uint64(0)
}
func (h Hash) Float64(val float64) Hash {
return h.Uint64(math.Float64bits(val))
}
func (h Hash) Int64(val int64) Hash {
return h.Uint64(uint64(val)) // nolint:gosec // overflow doesn't matter since we are hashing.
}
func (h Hash) String(val string) Hash {
v := uint64(h)
for i := 0; i < len(val); i++ {
v ^= uint64(val[i])
v *= prime64
}
return Hash(v)
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package attribute // import "go.opentelemetry.io/otel/attribute"
// Iterator allows iterating over the set of attributes in order, sorted by
// key.
type Iterator struct {
storage *Set
idx int
}
// MergeIterator supports iterating over two sets of attributes while
// eliminating duplicate values from the combined set. The first iterator
// value takes precedence.
type MergeIterator struct {
one oneIterator
two oneIterator
current KeyValue
}
type oneIterator struct {
iter Iterator
done bool
attr KeyValue
}
// Next moves the iterator to the next position.
// Next reports whether there are more attributes.
func (i *Iterator) Next() bool {
i.idx++
return i.idx < i.Len()
}
// Label returns current KeyValue. Must be called only after Next returns
// true.
//
// Deprecated: Use Attribute instead.
func (i *Iterator) Label() KeyValue {
return i.Attribute()
}
// Attribute returns the current KeyValue of the Iterator. It must be called
// only after Next returns true.
func (i *Iterator) Attribute() KeyValue {
kv, _ := i.storage.Get(i.idx)
return kv
}
// IndexedLabel returns current index and attribute. Must be called only
// after Next returns true.
//
// Deprecated: Use IndexedAttribute instead.
func (i *Iterator) IndexedLabel() (int, KeyValue) {
return i.idx, i.Attribute()
}
// IndexedAttribute returns current index and attribute. Must be called only
// after Next returns true.
func (i *Iterator) IndexedAttribute() (int, KeyValue) {
return i.idx, i.Attribute()
}
// Len returns a number of attributes in the iterated set.
func (i *Iterator) Len() int {
return i.storage.Len()
}
// ToSlice is a convenience function that creates a slice of attributes from
// the passed iterator. The iterator is set up to start from the beginning
// before creating the slice.
func (i *Iterator) ToSlice() []KeyValue {
l := i.Len()
if l == 0 {
return nil
}
i.idx = -1
slice := make([]KeyValue, 0, l)
for i.Next() {
slice = append(slice, i.Attribute())
}
return slice
}
// NewMergeIterator returns a MergeIterator for merging two attribute sets.
// Duplicates are resolved by taking the value from the first set.
func NewMergeIterator(s1, s2 *Set) MergeIterator {
mi := MergeIterator{
one: makeOne(s1.Iter()),
two: makeOne(s2.Iter()),
}
return mi
}
func makeOne(iter Iterator) oneIterator {
oi := oneIterator{
iter: iter,
}
oi.advance()
return oi
}
func (oi *oneIterator) advance() {
if oi.done = !oi.iter.Next(); !oi.done {
oi.attr = oi.iter.Attribute()
}
}
// Next moves the iterator to the next position.
// Next reports whether there is another attribute available.
func (m *MergeIterator) Next() bool {
if m.one.done && m.two.done {
return false
}
if m.one.done {
m.current = m.two.attr
m.two.advance()
return true
}
if m.two.done {
m.current = m.one.attr
m.one.advance()
return true
}
if m.one.attr.Key == m.two.attr.Key {
m.current = m.one.attr // first iterator attribute value wins
m.one.advance()
m.two.advance()
return true
}
if m.one.attr.Key < m.two.attr.Key {
m.current = m.one.attr
m.one.advance()
return true
}
m.current = m.two.attr
m.two.advance()
return true
}
// Label returns the current value after Next() returns true.
//
// Deprecated: Use Attribute instead.
func (m *MergeIterator) Label() KeyValue {
return m.current
}
// Attribute returns the current value after Next() returns true.
func (m *MergeIterator) Attribute() KeyValue {
return m.current
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package attribute // import "go.opentelemetry.io/otel/attribute"
// Key represents the key part in key-value pairs. It's a string. The
// allowed character set in the key depends on the use of the key.
type Key string
// Bool creates a KeyValue instance with a BOOL Value.
//
// If creating both a key and value at the same time, use the provided
// convenience function instead -- Bool(name, value).
func (k Key) Bool(v bool) KeyValue {
return KeyValue{
Key: k,
Value: BoolValue(v),
}
}
// BoolSlice creates a KeyValue instance with a BOOLSLICE Value.
//
// If creating both a key and value at the same time, use the provided
// convenience function instead -- BoolSlice(name, value).
func (k Key) BoolSlice(v []bool) KeyValue {
return KeyValue{
Key: k,
Value: BoolSliceValue(v),
}
}
// Int creates a KeyValue instance with an INT64 Value.
//
// If creating both a key and value at the same time, use the provided
// convenience function instead -- Int(name, value).
func (k Key) Int(v int) KeyValue {
return KeyValue{
Key: k,
Value: IntValue(v),
}
}
// IntSlice creates a KeyValue instance with an INT64SLICE Value.
//
// If creating both a key and value at the same time, use the provided
// convenience function instead -- IntSlice(name, value).
func (k Key) IntSlice(v []int) KeyValue {
return KeyValue{
Key: k,
Value: IntSliceValue(v),
}
}
// Int64 creates a KeyValue instance with an INT64 Value.
//
// If creating both a key and value at the same time, use the provided
// convenience function instead -- Int64(name, value).
func (k Key) Int64(v int64) KeyValue {
return KeyValue{
Key: k,
Value: Int64Value(v),
}
}
// Int64Slice creates a KeyValue instance with an INT64SLICE Value.
//
// If creating both a key and value at the same time, use the provided
// convenience function instead -- Int64Slice(name, value).
func (k Key) Int64Slice(v []int64) KeyValue {
return KeyValue{
Key: k,
Value: Int64SliceValue(v),
}
}
// Float64 creates a KeyValue instance with a FLOAT64 Value.
//
// If creating both a key and value at the same time, use the provided
// convenience function instead -- Float64(name, value).
func (k Key) Float64(v float64) KeyValue {
return KeyValue{
Key: k,
Value: Float64Value(v),
}
}
// Float64Slice creates a KeyValue instance with a FLOAT64SLICE Value.
//
// If creating both a key and value at the same time, use the provided
// convenience function instead -- Float64(name, value).
func (k Key) Float64Slice(v []float64) KeyValue {
return KeyValue{
Key: k,
Value: Float64SliceValue(v),
}
}
// String creates a KeyValue instance with a STRING Value.
//
// If creating both a key and value at the same time, use the provided
// convenience function instead -- String(name, value).
func (k Key) String(v string) KeyValue {
return KeyValue{
Key: k,
Value: StringValue(v),
}
}
// StringSlice creates a KeyValue instance with a STRINGSLICE Value.
//
// If creating both a key and value at the same time, use the provided
// convenience function instead -- StringSlice(name, value).
func (k Key) StringSlice(v []string) KeyValue {
return KeyValue{
Key: k,
Value: StringSliceValue(v),
}
}
// Defined reports whether the key is not empty.
func (k Key) Defined() bool {
return len(k) != 0
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package attribute // import "go.opentelemetry.io/otel/attribute"
import (
"fmt"
)
// KeyValue holds a key and value pair.
type KeyValue struct {
Key Key
Value Value
}
// Valid reports whether kv is a valid OpenTelemetry attribute.
func (kv KeyValue) Valid() bool {
return kv.Key.Defined() && kv.Value.Type() != INVALID
}
// Bool creates a KeyValue with a BOOL Value type.
func Bool(k string, v bool) KeyValue {
return Key(k).Bool(v)
}
// BoolSlice creates a KeyValue with a BOOLSLICE Value type.
func BoolSlice(k string, v []bool) KeyValue {
return Key(k).BoolSlice(v)
}
// Int creates a KeyValue with an INT64 Value type.
func Int(k string, v int) KeyValue {
return Key(k).Int(v)
}
// IntSlice creates a KeyValue with an INT64SLICE Value type.
func IntSlice(k string, v []int) KeyValue {
return Key(k).IntSlice(v)
}
// Int64 creates a KeyValue with an INT64 Value type.
func Int64(k string, v int64) KeyValue {
return Key(k).Int64(v)
}
// Int64Slice creates a KeyValue with an INT64SLICE Value type.
func Int64Slice(k string, v []int64) KeyValue {
return Key(k).Int64Slice(v)
}
// Float64 creates a KeyValue with a FLOAT64 Value type.
func Float64(k string, v float64) KeyValue {
return Key(k).Float64(v)
}
// Float64Slice creates a KeyValue with a FLOAT64SLICE Value type.
func Float64Slice(k string, v []float64) KeyValue {
return Key(k).Float64Slice(v)
}
// String creates a KeyValue with a STRING Value type.
func String(k, v string) KeyValue {
return Key(k).String(v)
}
// StringSlice creates a KeyValue with a STRINGSLICE Value type.
func StringSlice(k string, v []string) KeyValue {
return Key(k).StringSlice(v)
}
// Stringer creates a new key-value pair with a passed name and a string
// value generated by the passed Stringer interface.
func Stringer(k string, v fmt.Stringer) KeyValue {
return Key(k).String(v.String())
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package attribute // import "go.opentelemetry.io/otel/attribute"
import (
"math"
)
func boolToRaw(b bool) uint64 { // nolint:revive // b is not a control flag.
if b {
return 1
}
return 0
}
func rawToBool(r uint64) bool {
return r != 0
}
func int64ToRaw(i int64) uint64 {
// Assumes original was a valid int64 (overflow not checked).
return uint64(i) // nolint: gosec
}
func rawToInt64(r uint64) int64 {
// Assumes original was a valid int64 (overflow not checked).
return int64(r) // nolint: gosec
}
func float64ToRaw(f float64) uint64 {
return math.Float64bits(f)
}
func rawToFloat64(r uint64) float64 {
return math.Float64frombits(r)
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package attribute // import "go.opentelemetry.io/otel/attribute"
import (
"cmp"
"encoding/json"
"reflect"
"slices"
"sort"
"go.opentelemetry.io/otel/attribute/internal/fnv"
)
type (
// Set is the representation for a distinct attribute set. It manages an
// immutable set of attributes, with an internal cache for storing
// attribute encodings.
//
// This type will remain comparable for backwards compatibility. The
// equivalence of Sets across versions is not guaranteed to be stable.
// Prior versions may find two Sets to be equal or not when compared
// directly (i.e. ==), but subsequent versions may not. Users should use
// the Equals method to ensure stable equivalence checking.
//
// Users should also use the Distinct returned from Equivalent as a map key
// instead of a Set directly. Set has relatively poor performance when used
// as a map key compared to Distinct.
Set struct {
hash fnv.Hash
data any
}
// Distinct is an identifier of a Set which is very likely to be unique.
//
// Distinct should be used as a map key instead of a Set for to provide better
// performance for map operations.
Distinct struct {
hash fnv.Hash
}
// Sortable implements sort.Interface, used for sorting KeyValue.
//
// Deprecated: This type is no longer used. It was added as a performance
// optimization for Go < 1.21 that is no longer needed (Go < 1.21 is no
// longer supported by the module).
Sortable []KeyValue
)
// Compile time check these types remain comparable.
var (
_ = isComparable(Set{})
_ = isComparable(Distinct{})
)
func isComparable[T comparable](t T) T { return t }
var (
// keyValueType is used in computeDistinctReflect.
keyValueType = reflect.TypeOf(KeyValue{})
// userDefinedEmptySet is an empty set. It was mistakenly exposed to users
// as something they can assign to, so it must remain addressable and
// mutable.
//
// This is kept for backwards compatibility, but should not be used in new code.
userDefinedEmptySet = &Set{
hash: fnv.New(),
data: [0]KeyValue{},
}
emptySet = Set{
hash: fnv.New(),
data: [0]KeyValue{},
}
)
// EmptySet returns a reference to a Set with no elements.
//
// This is a convenience provided for optimized calling utility.
func EmptySet() *Set {
// Continue to return the pointer to the user-defined empty set for
// backwards-compatibility.
//
// New code should not use this, instead use emptySet.
return userDefinedEmptySet
}
// Valid reports whether this value refers to a valid Set.
func (d Distinct) Valid() bool { return d.hash != 0 }
// reflectValue abbreviates reflect.ValueOf(d).
func (l Set) reflectValue() reflect.Value {
return reflect.ValueOf(l.data)
}
// Len returns the number of attributes in this set.
func (l *Set) Len() int {
if l == nil || l.hash == 0 {
return 0
}
return l.reflectValue().Len()
}
// Get returns the KeyValue at ordered position idx in this set.
func (l *Set) Get(idx int) (KeyValue, bool) {
if l == nil || l.hash == 0 {
return KeyValue{}, false
}
value := l.reflectValue()
if idx >= 0 && idx < value.Len() {
// Note: The Go compiler successfully avoids an allocation for
// the interface{} conversion here:
return value.Index(idx).Interface().(KeyValue), true
}
return KeyValue{}, false
}
// Value returns the value of a specified key in this set.
func (l *Set) Value(k Key) (Value, bool) {
if l == nil || l.hash == 0 {
return Value{}, false
}
rValue := l.reflectValue()
vlen := rValue.Len()
idx := sort.Search(vlen, func(idx int) bool {
return rValue.Index(idx).Interface().(KeyValue).Key >= k
})
if idx >= vlen {
return Value{}, false
}
keyValue := rValue.Index(idx).Interface().(KeyValue)
if k == keyValue.Key {
return keyValue.Value, true
}
return Value{}, false
}
// HasValue reports whether a key is defined in this set.
func (l *Set) HasValue(k Key) bool {
if l == nil {
return false
}
_, ok := l.Value(k)
return ok
}
// Iter returns an iterator for visiting the attributes in this set.
func (l *Set) Iter() Iterator {
return Iterator{
storage: l,
idx: -1,
}
}
// ToSlice returns the set of attributes belonging to this set, sorted, where
// keys appear no more than once.
func (l *Set) ToSlice() []KeyValue {
iter := l.Iter()
return iter.ToSlice()
}
// Equivalent returns a value that may be used as a map key. Equal Distinct
// values are very likely to be equivalent attribute Sets. Distinct value of any
// attribute set with the same elements as this, where sets are made unique by
// choosing the last value in the input for any given key.
func (l *Set) Equivalent() Distinct {
if l == nil || l.hash == 0 {
return Distinct{hash: emptySet.hash}
}
return Distinct{hash: l.hash}
}
// Equals reports whether the argument set is equivalent to this set.
func (l *Set) Equals(o *Set) bool {
if l.Equivalent() != o.Equivalent() {
return false
}
if l == nil || l.hash == 0 {
l = &emptySet
}
if o == nil || o.hash == 0 {
o = &emptySet
}
return l.data == o.data
}
// Encoded returns the encoded form of this set, according to encoder.
func (l *Set) Encoded(encoder Encoder) string {
if l == nil || encoder == nil {
return ""
}
return encoder.Encode(l.Iter())
}
// NewSet returns a new Set. See the documentation for
// NewSetWithSortableFiltered for more details.
//
// Except for empty sets, this method adds an additional allocation compared
// with calls that include a Sortable.
func NewSet(kvs ...KeyValue) Set {
s, _ := NewSetWithFiltered(kvs, nil)
return s
}
// NewSetWithSortable returns a new Set. See the documentation for
// NewSetWithSortableFiltered for more details.
//
// This call includes a Sortable option as a memory optimization.
//
// Deprecated: Use [NewSet] instead.
func NewSetWithSortable(kvs []KeyValue, _ *Sortable) Set {
s, _ := NewSetWithFiltered(kvs, nil)
return s
}
// NewSetWithFiltered returns a new Set. See the documentation for
// NewSetWithSortableFiltered for more details.
//
// This call includes a Filter to include/exclude attribute keys from the
// return value. Excluded keys are returned as a slice of attribute values.
func NewSetWithFiltered(kvs []KeyValue, filter Filter) (Set, []KeyValue) {
// Check for empty set.
if len(kvs) == 0 {
return emptySet, nil
}
// Stable sort so the following de-duplication can implement
// last-value-wins semantics.
slices.SortStableFunc(kvs, func(a, b KeyValue) int {
return cmp.Compare(a.Key, b.Key)
})
position := len(kvs) - 1
offset := position - 1
// The requirements stated above require that the stable
// result be placed in the end of the input slice, while
// overwritten values are swapped to the beginning.
//
// De-duplicate with last-value-wins semantics. Preserve
// duplicate values at the beginning of the input slice.
for ; offset >= 0; offset-- {
if kvs[offset].Key == kvs[position].Key {
continue
}
position--
kvs[offset], kvs[position] = kvs[position], kvs[offset]
}
kvs = kvs[position:]
if filter != nil {
if div := filteredToFront(kvs, filter); div != 0 {
return newSet(kvs[div:]), kvs[:div]
}
}
return newSet(kvs), nil
}
// NewSetWithSortableFiltered returns a new Set.
//
// Duplicate keys are eliminated by taking the last value. This
// re-orders the input slice so that unique last-values are contiguous
// at the end of the slice.
//
// This ensures the following:
//
// - Last-value-wins semantics
// - Caller sees the reordering, but doesn't lose values
// - Repeated call preserve last-value wins.
//
// Note that methods are defined on Set, although this returns Set. Callers
// can avoid memory allocations by:
//
// - allocating a Sortable for use as a temporary in this method
// - allocating a Set for storing the return value of this constructor.
//
// The result maintains a cache of encoded attributes, by attribute.EncoderID.
// This value should not be copied after its first use.
//
// The second []KeyValue return value is a list of attributes that were
// excluded by the Filter (if non-nil).
//
// Deprecated: Use [NewSetWithFiltered] instead.
func NewSetWithSortableFiltered(kvs []KeyValue, _ *Sortable, filter Filter) (Set, []KeyValue) {
return NewSetWithFiltered(kvs, filter)
}
// filteredToFront filters slice in-place using keep function. All KeyValues that need to
// be removed are moved to the front. All KeyValues that need to be kept are
// moved (in-order) to the back. The index for the first KeyValue to be kept is
// returned.
func filteredToFront(slice []KeyValue, keep Filter) int {
n := len(slice)
j := n
for i := n - 1; i >= 0; i-- {
if keep(slice[i]) {
j--
slice[i], slice[j] = slice[j], slice[i]
}
}
return j
}
// Filter returns a filtered copy of this Set. See the documentation for
// NewSetWithSortableFiltered for more details.
func (l *Set) Filter(re Filter) (Set, []KeyValue) {
if re == nil {
return *l, nil
}
// Iterate in reverse to the first attribute that will be filtered out.
n := l.Len()
first := n - 1
for ; first >= 0; first-- {
kv, _ := l.Get(first)
if !re(kv) {
break
}
}
// No attributes will be dropped, return the immutable Set l and nil.
if first < 0 {
return *l, nil
}
// Copy now that we know we need to return a modified set.
//
// Do not do this in-place on the underlying storage of *Set l. Sets are
// immutable and filtering should not change this.
slice := l.ToSlice()
// Don't re-iterate the slice if only slice[0] is filtered.
if first == 0 {
// It is safe to assume len(slice) >= 1 given we found at least one
// attribute above that needs to be filtered out.
return newSet(slice[1:]), slice[:1]
}
// Move the filtered slice[first] to the front (preserving order).
kv := slice[first]
copy(slice[1:first+1], slice[:first])
slice[0] = kv
// Do not re-evaluate re(slice[first+1:]).
div := filteredToFront(slice[1:first+1], re) + 1
return newSet(slice[div:]), slice[:div]
}
// newSet returns a new set based on the sorted and uniqued kvs.
func newSet(kvs []KeyValue) Set {
s := Set{
hash: hashKVs(kvs),
data: computeDataFixed(kvs),
}
if s.data == nil {
s.data = computeDataReflect(kvs)
}
return s
}
// computeDataFixed computes a Set data for small slices. It returns nil if the
// input is too large for this code path.
func computeDataFixed(kvs []KeyValue) any {
switch len(kvs) {
case 1:
return [1]KeyValue(kvs)
case 2:
return [2]KeyValue(kvs)
case 3:
return [3]KeyValue(kvs)
case 4:
return [4]KeyValue(kvs)
case 5:
return [5]KeyValue(kvs)
case 6:
return [6]KeyValue(kvs)
case 7:
return [7]KeyValue(kvs)
case 8:
return [8]KeyValue(kvs)
case 9:
return [9]KeyValue(kvs)
case 10:
return [10]KeyValue(kvs)
default:
return nil
}
}
// computeDataReflect computes a Set data using reflection, works for any size
// input.
func computeDataReflect(kvs []KeyValue) any {
at := reflect.New(reflect.ArrayOf(len(kvs), keyValueType)).Elem()
for i, keyValue := range kvs {
*(at.Index(i).Addr().Interface().(*KeyValue)) = keyValue
}
return at.Interface()
}
// MarshalJSON returns the JSON encoding of the Set.
func (l *Set) MarshalJSON() ([]byte, error) {
return json.Marshal(l.data)
}
// MarshalLog is the marshaling function used by the logging system to represent this Set.
func (l Set) MarshalLog() any {
kvs := make(map[string]string)
for _, kv := range l.ToSlice() {
kvs[string(kv.Key)] = kv.Value.Emit()
}
return kvs
}
// Len implements sort.Interface.
func (l *Sortable) Len() int {
return len(*l)
}
// Swap implements sort.Interface.
func (l *Sortable) Swap(i, j int) {
(*l)[i], (*l)[j] = (*l)[j], (*l)[i]
}
// Less implements sort.Interface.
func (l *Sortable) Less(i, j int) bool {
return (*l)[i].Key < (*l)[j].Key
}
// Code generated by "stringer -type=Type"; DO NOT EDIT.
package attribute
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[INVALID-0]
_ = x[BOOL-1]
_ = x[INT64-2]
_ = x[FLOAT64-3]
_ = x[STRING-4]
_ = x[BOOLSLICE-5]
_ = x[INT64SLICE-6]
_ = x[FLOAT64SLICE-7]
_ = x[STRINGSLICE-8]
}
const _Type_name = "INVALIDBOOLINT64FLOAT64STRINGBOOLSLICEINT64SLICEFLOAT64SLICESTRINGSLICE"
var _Type_index = [...]uint8{0, 7, 11, 16, 23, 29, 38, 48, 60, 71}
func (i Type) String() string {
idx := int(i) - 0
if i < 0 || idx >= len(_Type_index)-1 {
return "Type(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _Type_name[_Type_index[idx]:_Type_index[idx+1]]
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package attribute // import "go.opentelemetry.io/otel/attribute"
import (
"encoding/json"
"fmt"
"reflect"
"strconv"
attribute "go.opentelemetry.io/otel/attribute/internal"
)
//go:generate stringer -type=Type
// Type describes the type of the data Value holds.
type Type int // nolint: revive // redefines builtin Type.
// Value represents the value part in key-value pairs.
type Value struct {
vtype Type
numeric uint64
stringly string
slice any
}
const (
// INVALID is used for a Value with no value set.
INVALID Type = iota
// BOOL is a boolean Type Value.
BOOL
// INT64 is a 64-bit signed integral Type Value.
INT64
// FLOAT64 is a 64-bit floating point Type Value.
FLOAT64
// STRING is a string Type Value.
STRING
// BOOLSLICE is a slice of booleans Type Value.
BOOLSLICE
// INT64SLICE is a slice of 64-bit signed integral numbers Type Value.
INT64SLICE
// FLOAT64SLICE is a slice of 64-bit floating point numbers Type Value.
FLOAT64SLICE
// STRINGSLICE is a slice of strings Type Value.
STRINGSLICE
)
// BoolValue creates a BOOL Value.
func BoolValue(v bool) Value {
return Value{
vtype: BOOL,
numeric: boolToRaw(v),
}
}
// BoolSliceValue creates a BOOLSLICE Value.
func BoolSliceValue(v []bool) Value {
return Value{vtype: BOOLSLICE, slice: attribute.BoolSliceValue(v)}
}
// IntValue creates an INT64 Value.
func IntValue(v int) Value {
return Int64Value(int64(v))
}
// IntSliceValue creates an INTSLICE Value.
func IntSliceValue(v []int) Value {
var int64Val int64
cp := reflect.New(reflect.ArrayOf(len(v), reflect.TypeOf(int64Val)))
for i, val := range v {
cp.Elem().Index(i).SetInt(int64(val))
}
return Value{
vtype: INT64SLICE,
slice: cp.Elem().Interface(),
}
}
// Int64Value creates an INT64 Value.
func Int64Value(v int64) Value {
return Value{
vtype: INT64,
numeric: int64ToRaw(v),
}
}
// Int64SliceValue creates an INT64SLICE Value.
func Int64SliceValue(v []int64) Value {
return Value{vtype: INT64SLICE, slice: attribute.Int64SliceValue(v)}
}
// Float64Value creates a FLOAT64 Value.
func Float64Value(v float64) Value {
return Value{
vtype: FLOAT64,
numeric: float64ToRaw(v),
}
}
// Float64SliceValue creates a FLOAT64SLICE Value.
func Float64SliceValue(v []float64) Value {
return Value{vtype: FLOAT64SLICE, slice: attribute.Float64SliceValue(v)}
}
// StringValue creates a STRING Value.
func StringValue(v string) Value {
return Value{
vtype: STRING,
stringly: v,
}
}
// StringSliceValue creates a STRINGSLICE Value.
func StringSliceValue(v []string) Value {
return Value{vtype: STRINGSLICE, slice: attribute.StringSliceValue(v)}
}
// Type returns a type of the Value.
func (v Value) Type() Type {
return v.vtype
}
// AsBool returns the bool value. Make sure that the Value's type is
// BOOL.
func (v Value) AsBool() bool {
return rawToBool(v.numeric)
}
// AsBoolSlice returns the []bool value. Make sure that the Value's type is
// BOOLSLICE.
func (v Value) AsBoolSlice() []bool {
if v.vtype != BOOLSLICE {
return nil
}
return v.asBoolSlice()
}
func (v Value) asBoolSlice() []bool {
return attribute.AsBoolSlice(v.slice)
}
// AsInt64 returns the int64 value. Make sure that the Value's type is
// INT64.
func (v Value) AsInt64() int64 {
return rawToInt64(v.numeric)
}
// AsInt64Slice returns the []int64 value. Make sure that the Value's type is
// INT64SLICE.
func (v Value) AsInt64Slice() []int64 {
if v.vtype != INT64SLICE {
return nil
}
return v.asInt64Slice()
}
func (v Value) asInt64Slice() []int64 {
return attribute.AsInt64Slice(v.slice)
}
// AsFloat64 returns the float64 value. Make sure that the Value's
// type is FLOAT64.
func (v Value) AsFloat64() float64 {
return rawToFloat64(v.numeric)
}
// AsFloat64Slice returns the []float64 value. Make sure that the Value's type is
// FLOAT64SLICE.
func (v Value) AsFloat64Slice() []float64 {
if v.vtype != FLOAT64SLICE {
return nil
}
return v.asFloat64Slice()
}
func (v Value) asFloat64Slice() []float64 {
return attribute.AsFloat64Slice(v.slice)
}
// AsString returns the string value. Make sure that the Value's type
// is STRING.
func (v Value) AsString() string {
return v.stringly
}
// AsStringSlice returns the []string value. Make sure that the Value's type is
// STRINGSLICE.
func (v Value) AsStringSlice() []string {
if v.vtype != STRINGSLICE {
return nil
}
return v.asStringSlice()
}
func (v Value) asStringSlice() []string {
return attribute.AsStringSlice(v.slice)
}
type unknownValueType struct{}
// AsInterface returns Value's data as any.
func (v Value) AsInterface() any {
switch v.Type() {
case BOOL:
return v.AsBool()
case BOOLSLICE:
return v.asBoolSlice()
case INT64:
return v.AsInt64()
case INT64SLICE:
return v.asInt64Slice()
case FLOAT64:
return v.AsFloat64()
case FLOAT64SLICE:
return v.asFloat64Slice()
case STRING:
return v.stringly
case STRINGSLICE:
return v.asStringSlice()
}
return unknownValueType{}
}
// Emit returns a string representation of Value's data.
func (v Value) Emit() string {
switch v.Type() {
case BOOLSLICE:
return fmt.Sprint(v.asBoolSlice())
case BOOL:
return strconv.FormatBool(v.AsBool())
case INT64SLICE:
j, err := json.Marshal(v.asInt64Slice())
if err != nil {
return fmt.Sprintf("invalid: %v", v.asInt64Slice())
}
return string(j)
case INT64:
return strconv.FormatInt(v.AsInt64(), 10)
case FLOAT64SLICE:
j, err := json.Marshal(v.asFloat64Slice())
if err != nil {
return fmt.Sprintf("invalid: %v", v.asFloat64Slice())
}
return string(j)
case FLOAT64:
return fmt.Sprint(v.AsFloat64())
case STRINGSLICE:
j, err := json.Marshal(v.asStringSlice())
if err != nil {
return fmt.Sprintf("invalid: %v", v.asStringSlice())
}
return string(j)
case STRING:
return v.stringly
default:
return "unknown"
}
}
// MarshalJSON returns the JSON encoding of the Value.
func (v Value) MarshalJSON() ([]byte, error) {
var jsonVal struct {
Type string
Value any
}
jsonVal.Type = v.Type().String()
jsonVal.Value = v.AsInterface()
return json.Marshal(jsonVal)
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import (
"context"
"go.opentelemetry.io/otel/trace"
)
// Filter determines if a measurement should be offered.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
type Filter func(context.Context) bool
// TraceBasedFilter is a [Filter] that will only offer measurements
// if the passed context associated with the measurement contains a sampled
// [go.opentelemetry.io/otel/trace.SpanContext].
func TraceBasedFilter(ctx context.Context) bool {
return trace.SpanContextFromContext(ctx).IsSampled()
}
// AlwaysOnFilter is a [Filter] that always offers measurements.
func AlwaysOnFilter(context.Context) bool {
return true
}
// AlwaysOffFilter is a [Filter] that never offers measurements.
func AlwaysOffFilter(context.Context) bool {
return false
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import (
"context"
"math"
"math/rand/v2"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/reservoir"
)
// FixedSizeReservoirProvider returns a provider of [FixedSizeReservoir].
func FixedSizeReservoirProvider(k int) ReservoirProvider {
return func(attribute.Set) Reservoir {
return NewFixedSizeReservoir(k)
}
}
// NewFixedSizeReservoir returns a [FixedSizeReservoir] that samples at most
// k exemplars. If there are k or less measurements made, the Reservoir will
// sample each one. If there are more than k, the Reservoir will then randomly
// sample all additional measurement with a decreasing probability.
func NewFixedSizeReservoir(k int) *FixedSizeReservoir {
return newFixedSizeReservoir(newStorage(k))
}
var _ Reservoir = &FixedSizeReservoir{}
// FixedSizeReservoir is a [Reservoir] that samples at most k exemplars. If
// there are k or less measurements made, the Reservoir will sample each one.
// If there are more than k, the Reservoir will then randomly sample all
// additional measurement with a decreasing probability.
type FixedSizeReservoir struct {
reservoir.ConcurrentSafe
*storage
mu sync.Mutex
// count is the number of measurement seen.
count int64
// next is the next count that will store a measurement at a random index
// once the reservoir has been filled.
next int64
// w is the largest random number in a distribution that is used to compute
// the next next.
w float64
}
func newFixedSizeReservoir(s *storage) *FixedSizeReservoir {
r := &FixedSizeReservoir{
storage: s,
}
r.reset()
return r
}
// randomFloat64 returns, as a float64, a uniform pseudo-random number in the
// open interval (0.0,1.0).
func (*FixedSizeReservoir) randomFloat64() float64 {
// TODO: Use an algorithm that avoids rejection sampling. For example:
//
// const precision = 1 << 53 // 2^53
// // Generate an integer in [1, 2^53 - 1]
// v := rand.Uint64() % (precision - 1) + 1
// return float64(v) / float64(precision)
f := rand.Float64()
for f == 0 {
f = rand.Float64()
}
return f
}
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the Reservoir decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
//
// The time t is the time when the measurement was made. The v and a
// parameters are the value and dropped (filtered) attributes of the
// measurement respectively.
func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) {
// The following algorithm is "Algorithm L" from Li, Kim-Hung (4 December
// 1994). "Reservoir-Sampling Algorithms of Time Complexity
// O(n(1+log(N/n)))". ACM Transactions on Mathematical Software. 20 (4):
// 481–493 (https://dl.acm.org/doi/10.1145/198429.198435).
//
// A high-level overview of "Algorithm L":
// 0) Pre-calculate the random count greater than the storage size when
// an exemplar will be replaced.
// 1) Accept all measurements offered until the configured storage size is
// reached.
// 2) Loop:
// a) When the pre-calculate count is reached, replace a random
// existing exemplar with the offered measurement.
// b) Calculate the next random count greater than the existing one
// which will replace another exemplars
//
// The way a "replacement" count is computed is by looking at `n` number of
// independent random numbers each corresponding to an offered measurement.
// Of these numbers the smallest `k` (the same size as the storage
// capacity) of them are kept as a subset. The maximum value in this
// subset, called `w` is used to weight another random number generation
// for the next count that will be considered.
//
// By weighting the next count computation like described, it is able to
// perform a uniformly-weighted sampling algorithm based on the number of
// samples the reservoir has seen so far. The sampling will "slow down" as
// more and more samples are offered so as to reduce a bias towards those
// offered just prior to the end of the collection.
//
// This algorithm is preferred because of its balance of simplicity and
// performance. It will compute three random numbers (the bulk of
// computation time) for each item that becomes part of the reservoir, but
// it does not spend any time on items that do not. In particular it has an
// asymptotic runtime of O(k(1 + log(n/k)) where n is the number of
// measurements offered and k is the reservoir size.
//
// See https://en.wikipedia.org/wiki/Reservoir_sampling for an overview of
// this and other reservoir sampling algorithms. See
// https://github.com/MrAlias/reservoir-sampling for a performance
// comparison of reservoir sampling algorithms.
r.mu.Lock()
defer r.mu.Unlock()
if int(r.count) < cap(r.measurements) {
r.store(int(r.count), newMeasurement(ctx, t, n, a))
} else if r.count == r.next {
// Overwrite a random existing measurement with the one offered.
idx := int(rand.Int64N(int64(cap(r.measurements))))
r.store(idx, newMeasurement(ctx, t, n, a))
r.advance()
}
r.count++
}
// reset resets r to the initial state.
func (r *FixedSizeReservoir) reset() {
// This resets the number of exemplars known.
r.count = 0
// Random index inserts should only happen after the storage is full.
r.next = int64(cap(r.measurements))
// Initial random number in the series used to generate r.next.
//
// This is set before r.advance to reset or initialize the random number
// series. Without doing so it would always be 0 or never restart a new
// random number series.
//
// This maps the uniform random number in (0,1) to a geometric distribution
// over the same interval. The mean of the distribution is inversely
// proportional to the storage capacity.
r.w = math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.measurements)))
r.advance()
}
// advance updates the count at which the offered measurement will overwrite an
// existing exemplar.
func (r *FixedSizeReservoir) advance() {
// Calculate the next value in the random number series.
//
// The current value of r.w is based on the max of a distribution of random
// numbers (i.e. `w = max(u_1,u_2,...,u_k)` for `k` equal to the capacity
// of the storage and each `u` in the interval (0,w)). To calculate the
// next r.w we use the fact that when the next exemplar is selected to be
// included in the storage an existing one will be dropped, and the
// corresponding random number in the set used to calculate r.w will also
// be replaced. The replacement random number will also be within (0,w),
// therefore the next r.w will be based on the same distribution (i.e.
// `max(u_1,u_2,...,u_k)`). Therefore, we can sample the next r.w by
// computing the next random number `u` and take r.w as `w * u^(1/k)`.
r.w *= math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.measurements)))
// Use the new random number in the series to calculate the count of the
// next measurement that will be stored.
//
// Given 0 < r.w < 1, each iteration will result in subsequent r.w being
// smaller. This translates here into the next next being selected against
// a distribution with a higher mean (i.e. the expected value will increase
// and replacements become less likely)
//
// Important to note, the new r.next will always be at least 1 more than
// the last r.next.
r.next += int64(math.Log(r.randomFloat64())/math.Log(1-r.w)) + 1
}
// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) {
r.mu.Lock()
defer r.mu.Unlock()
r.storage.Collect(dest)
// Call reset here even though it will reset r.count and restart the random
// number series. This will persist any old exemplars as long as no new
// measurements are offered, but it will also prioritize those new
// measurements that are made over the older collection cycle ones.
r.reset()
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import (
"context"
"slices"
"sort"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/reservoir"
)
// HistogramReservoirProvider is a provider of [HistogramReservoir].
func HistogramReservoirProvider(bounds []float64) ReservoirProvider {
cp := slices.Clone(bounds)
slices.Sort(cp)
return func(attribute.Set) Reservoir {
return NewHistogramReservoir(cp)
}
}
// NewHistogramReservoir returns a [HistogramReservoir] that samples the last
// measurement that falls within a histogram bucket. The histogram bucket
// upper-boundaries are define by bounds.
//
// The passed bounds must be sorted before calling this function.
func NewHistogramReservoir(bounds []float64) *HistogramReservoir {
return &HistogramReservoir{
bounds: bounds,
storage: newStorage(len(bounds) + 1),
}
}
var _ Reservoir = &HistogramReservoir{}
// HistogramReservoir is a [Reservoir] that samples the last measurement that
// falls within a histogram bucket. The histogram bucket upper-boundaries are
// define by bounds.
type HistogramReservoir struct {
reservoir.ConcurrentSafe
*storage
mu sync.Mutex
// bounds are bucket bounds in ascending order.
bounds []float64
}
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the Reservoir decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
//
// The time t is the time when the measurement was made. The v and a
// parameters are the value and dropped (filtered) attributes of the
// measurement respectively.
func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a []attribute.KeyValue) {
var n float64
switch v.Type() {
case Int64ValueType:
n = float64(v.Int64())
case Float64ValueType:
n = v.Float64()
default:
panic("unknown value type")
}
idx := sort.SearchFloat64s(r.bounds, n)
m := newMeasurement(ctx, t, v, a)
r.mu.Lock()
defer r.mu.Unlock()
r.store(idx, m)
}
// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
func (r *HistogramReservoir) Collect(dest *[]Exemplar) {
r.mu.Lock()
defer r.mu.Unlock()
r.storage.Collect(dest)
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// storage is an exemplar storage for [Reservoir] implementations.
type storage struct {
// measurements are the measurements sampled.
//
// This does not use []metricdata.Exemplar because it potentially would
// require an allocation for trace and span IDs in the hot path of Offer.
measurements []measurement
}
func newStorage(n int) *storage {
return &storage{measurements: make([]measurement, n)}
}
func (r *storage) store(idx int, m measurement) {
r.measurements[idx] = m
}
// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
func (r *storage) Collect(dest *[]Exemplar) {
*dest = reset(*dest, len(r.measurements), len(r.measurements))
var n int
for _, m := range r.measurements {
if !m.valid {
continue
}
m.exemplar(&(*dest)[n])
n++
}
*dest = (*dest)[:n]
}
// measurement is a measurement made by a telemetry system.
type measurement struct {
// FilteredAttributes are the attributes dropped during the measurement.
FilteredAttributes []attribute.KeyValue
// Time is the time when the measurement was made.
Time time.Time
// Value is the value of the measurement.
Value Value
// SpanContext is the SpanContext active when a measurement was made.
SpanContext trace.SpanContext
valid bool
}
// newMeasurement returns a new non-empty Measurement.
func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []attribute.KeyValue) measurement {
return measurement{
FilteredAttributes: droppedAttr,
Time: ts,
Value: v,
SpanContext: trace.SpanContextFromContext(ctx),
valid: true,
}
}
// exemplar returns m as an [Exemplar].
func (m measurement) exemplar(dest *Exemplar) {
dest.FilteredAttributes = m.FilteredAttributes
dest.Time = m.Time
dest.Value = m.Value
if m.SpanContext.HasTraceID() {
traceID := m.SpanContext.TraceID()
dest.TraceID = traceID[:]
} else {
dest.TraceID = dest.TraceID[:0]
}
if m.SpanContext.HasSpanID() {
spanID := m.SpanContext.SpanID()
dest.SpanID = spanID[:]
} else {
dest.SpanID = dest.SpanID[:0]
}
}
func reset[T any](s []T, length, capacity int) []T {
if cap(s) < capacity {
return make([]T, length, capacity)
}
return s[:length]
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import "math"
// ValueType identifies the type of value used in exemplar data.
type ValueType uint8
const (
// UnknownValueType should not be used. It represents a misconfigured
// Value.
UnknownValueType ValueType = 0
// Int64ValueType represents a Value with int64 data.
Int64ValueType ValueType = 1
// Float64ValueType represents a Value with float64 data.
Float64ValueType ValueType = 2
)
// Value is the value of data held by an exemplar.
type Value struct {
t ValueType
val uint64
}
// NewValue returns a new [Value] for the provided value.
func NewValue[N int64 | float64](value N) Value {
switch v := any(value).(type) {
case int64:
// This can be later converted back to int64 (overflow not checked).
return Value{t: Int64ValueType, val: uint64(v)} // nolint:gosec
case float64:
return Value{t: Float64ValueType, val: math.Float64bits(v)}
}
return Value{}
}
// Type returns the [ValueType] of data held by v.
func (v Value) Type() ValueType { return v.t }
// Int64 returns the value of v as an int64. If the ValueType of v is not an
// Int64ValueType, 0 is returned.
func (v Value) Int64() int64 {
if v.t == Int64ValueType {
// Assumes the correct int64 was stored in v.val based on type.
return int64(v.val) // nolint: gosec
}
return 0
}
// Float64 returns the value of v as an float64. If the ValueType of v is not
// an Float64ValueType, 0 is returned.
func (v Value) Float64() float64 {
if v.t == Float64ValueType {
return math.Float64frombits(v.val)
}
return 0
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// now is used to return the current local time while allowing tests to
// override the default time.Now function.
var now = time.Now
// Measure receives measurements to be aggregated.
type Measure[N int64 | float64] func(context.Context, N, attribute.Set)
// ComputeAggregation stores the aggregate of measurements into dest and
// returns the number of aggregate data-points output.
type ComputeAggregation func(dest *metricdata.Aggregation) int
// Builder builds an aggregate function.
type Builder[N int64 | float64] struct {
// Temporality is the temporality used for the returned aggregate function.
//
// If this is not provided a default of cumulative will be used (except for
// the last-value aggregate function where delta is the only appropriate
// temporality).
Temporality metricdata.Temporality
// Filter is the attribute filter the aggregate function will use on the
// input of measurements.
Filter attribute.Filter
// ReservoirFunc is the factory function used by aggregate functions to
// create new exemplar reservoirs for a new seen attribute set.
//
// If this is not provided a default factory function that returns an
// dropReservoir reservoir will be used.
ReservoirFunc func(attribute.Set) FilteredExemplarReservoir[N]
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
// attribute.
//
// If AggregationLimit is less than or equal to zero there will not be an
// aggregation limit imposed (i.e. unlimited attribute sets).
AggregationLimit int
}
func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}
return dropReservoir
}
type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)
func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] {
if b.Filter != nil {
fltr := b.Filter // Copy to make it immutable after assignment.
return func(ctx context.Context, n N, a attribute.Set) {
fAttr, dropped := a.Filter(fltr)
f(ctx, n, fAttr, dropped)
}
}
return func(ctx context.Context, n N, a attribute.Set) {
f(ctx, n, a, nil)
}
}
// LastValue returns a last-value aggregate function input and output.
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
lv := newLastValue[N](b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(lv.measure), lv.delta
default:
return b.filter(lv.measure), lv.cumulative
}
}
// PrecomputedLastValue returns a last-value aggregate function input and
// output. The aggregation returned from the returned ComputeAggregation
// function will always only return values from the previous collection cycle.
func (b Builder[N]) PrecomputedLastValue() (Measure[N], ComputeAggregation) {
lv := newPrecomputedLastValue[N](b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(lv.measure), lv.delta
default:
return b.filter(lv.measure), lv.cumulative
}
}
// PrecomputedSum returns a sum aggregate function input and output. The
// arguments passed to the input are expected to be the precomputed sum values.
func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) {
s := newPrecomputedSum[N](monotonic, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
default:
return b.filter(s.measure), s.cumulative
}
}
// Sum returns a sum aggregate function input and output.
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
switch b.Temporality {
case metricdata.DeltaTemporality:
s := newDeltaSum[N](monotonic, b.AggregationLimit, b.resFunc())
return b.filter(s.measure), s.collect
default:
s := newCumulativeSum[N](monotonic, b.AggregationLimit, b.resFunc())
return b.filter(s.measure), s.collect
}
}
// ExplicitBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExplicitBucketHistogram(
boundaries []float64,
noMinMax, noSum bool,
) (Measure[N], ComputeAggregation) {
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
default:
return b.filter(h.measure), h.cumulative
}
}
// ExponentialBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExponentialBucketHistogram(
maxSize, maxScale int32,
noMinMax, noSum bool,
) (Measure[N], ComputeAggregation) {
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
default:
return b.filter(h.measure), h.cumulative
}
}
// reset ensures s has capacity and sets it length. If the capacity of s too
// small, a new slice is returned with the specified capacity and length.
func reset[T any](s []T, length, capacity int) []T {
if cap(s) < capacity {
return make([]T, length, capacity)
}
return s[:length]
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"math"
"runtime"
"sync"
"sync/atomic"
"go.opentelemetry.io/otel/attribute"
)
// atomicCounter is an efficient way of adding to a number which is either an
// int64 or float64. It is designed to be efficient when adding whole
// numbers, regardless of whether N is an int64 or float64.
//
// Inspired by the Prometheus counter implementation:
// https://github.com/prometheus/client_golang/blob/14ccb93091c00f86b85af7753100aa372d63602b/prometheus/counter.go#L108
type atomicCounter[N int64 | float64] struct {
// nFloatBits contains only the non-integer portion of the counter.
nFloatBits atomic.Uint64
// nInt contains only the integer portion of the counter.
nInt atomic.Int64
}
// load returns the current value. The caller must ensure all calls to add have
// returned prior to calling load.
func (n *atomicCounter[N]) load() N {
fval := math.Float64frombits(n.nFloatBits.Load())
ival := n.nInt.Load()
return N(fval + float64(ival))
}
func (n *atomicCounter[N]) add(value N) {
ival := int64(value)
// This case is where the value is an int, or if it is a whole-numbered float.
if float64(ival) == float64(value) {
n.nInt.Add(ival)
return
}
// Value must be a float below.
for {
oldBits := n.nFloatBits.Load()
newBits := math.Float64bits(math.Float64frombits(oldBits) + float64(value))
if n.nFloatBits.CompareAndSwap(oldBits, newBits) {
return
}
}
}
// hotColdWaitGroup is a synchronization primitive which enables lockless
// writes for concurrent writers and enables a reader to acquire exclusive
// access to a snapshot of state including only completed operations.
// Conceptually, it can be thought of as a "hot" wait group,
// and a "cold" wait group, with the ability for the reader to atomically swap
// the hot and cold wait groups, and wait for the now-cold wait group to
// complete.
//
// Inspired by the prometheus/client_golang histogram implementation:
// https://github.com/prometheus/client_golang/blob/a974e0d45e0aa54c65492559114894314d8a2447/prometheus/histogram.go#L725
//
// Usage:
//
// var hcwg hotColdWaitGroup
// var data [2]any
//
// func write() {
// hotIdx := hcwg.start()
// defer hcwg.done(hotIdx)
// // modify data without locking
// data[hotIdx].update()
// }
//
// func read() {
// coldIdx := hcwg.swapHotAndWait()
// // read data now that all writes to the cold data have completed.
// data[coldIdx].read()
// }
type hotColdWaitGroup struct {
// startedCountAndHotIdx contains a 63-bit counter in the lower bits,
// and a 1 bit hot index to denote which of the two data-points new
// measurements to write to. These are contained together so that read()
// can atomically swap the hot bit, reset the started writes to zero, and
// read the number writes that were started prior to the hot bit being
// swapped.
startedCountAndHotIdx atomic.Uint64
// endedCounts is the number of writes that have completed to each
// dataPoint.
endedCounts [2]atomic.Uint64
}
// start returns the hot index that the writer should write to. The returned
// hot index is 0 or 1. The caller must call done(hot index) after it finishes
// its operation. start() is safe to call concurrently with other methods.
func (l *hotColdWaitGroup) start() uint64 {
// We increment h.startedCountAndHotIdx so that the counter in the lower
// 63 bits gets incremented. At the same time, we get the new value
// back, which we can use to return the currently-hot index.
return l.startedCountAndHotIdx.Add(1) >> 63
}
// done signals to the reader that an operation has fully completed.
// done is safe to call concurrently.
func (l *hotColdWaitGroup) done(hotIdx uint64) {
l.endedCounts[hotIdx].Add(1)
}
// swapHotAndWait swaps the hot bit, waits for all start() calls to be done(),
// and then returns the now-cold index for the reader to read from. The
// returned index is 0 or 1. swapHotAndWait must not be called concurrently.
func (l *hotColdWaitGroup) swapHotAndWait() uint64 {
n := l.startedCountAndHotIdx.Load()
coldIdx := (^n) >> 63
// Swap the hot and cold index while resetting the started measurements
// count to zero.
n = l.startedCountAndHotIdx.Swap((coldIdx << 63))
hotIdx := n >> 63
startedCount := n & ((1 << 63) - 1)
// Wait for all measurements to the previously-hot map to finish.
for startedCount != l.endedCounts[hotIdx].Load() {
runtime.Gosched() // Let measurements complete.
}
// reset the number of ended operations
l.endedCounts[hotIdx].Store(0)
return hotIdx
}
// limitedSyncMap is a sync.Map which enforces the aggregation limit on
// attribute sets and provides a Len() function.
type limitedSyncMap struct {
sync.Map
aggLimit int
len int
lenMux sync.Mutex
}
func (m *limitedSyncMap) LoadOrStoreAttr(fltrAttr attribute.Set, newValue func(attribute.Set) any) any {
actual, loaded := m.Load(fltrAttr.Equivalent())
if loaded {
return actual
}
// If the overflow set exists, assume we have already overflowed and don't
// bother with the slow path below.
actual, loaded = m.Load(overflowSet.Equivalent())
if loaded {
return actual
}
// Slow path: add a new attribute set.
m.lenMux.Lock()
defer m.lenMux.Unlock()
// re-fetch now that we hold the lock to ensure we don't use the overflow
// set unless we are sure the attribute set isn't being written
// concurrently.
actual, loaded = m.Load(fltrAttr.Equivalent())
if loaded {
return actual
}
if m.aggLimit > 0 && m.len >= m.aggLimit-1 {
fltrAttr = overflowSet
}
actual, loaded = m.LoadOrStore(fltrAttr.Equivalent(), newValue(fltrAttr))
if !loaded {
m.len++
}
return actual
}
func (m *limitedSyncMap) Clear() {
m.lenMux.Lock()
defer m.lenMux.Unlock()
m.len = 0
m.Map.Clear()
}
func (m *limitedSyncMap) Len() int {
m.lenMux.Lock()
defer m.lenMux.Unlock()
return m.len
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)
// dropReservoir returns a [FilteredReservoir] that drops all measurements it is offered.
func dropReservoir[N int64 | float64](attribute.Set) FilteredExemplarReservoir[N] {
return &dropRes[N]{}
}
type dropRes[N int64 | float64] struct{}
// Offer does nothing, all measurements offered will be dropped.
func (*dropRes[N]) Offer(context.Context, N, []attribute.KeyValue) {}
// Collect resets dest. No exemplars will ever be returned.
func (*dropRes[N]) Collect(dest *[]exemplar.Exemplar) {
clear(*dest) // Erase elements to let GC collect objects
*dest = (*dest)[:0]
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"sync"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
var exemplarPool = sync.Pool{
New: func() any { return new([]exemplar.Exemplar) },
}
func collectExemplars[N int64 | float64](out *[]metricdata.Exemplar[N], f func(*[]exemplar.Exemplar)) {
dest := exemplarPool.Get().(*[]exemplar.Exemplar)
defer func() {
clear(*dest) // Erase elements to let GC collect objects.
*dest = (*dest)[:0]
exemplarPool.Put(dest)
}()
*dest = reset(*dest, len(*out), cap(*out))
f(dest)
*out = reset(*out, len(*dest), cap(*dest))
for i, e := range *dest {
(*out)[i].FilteredAttributes = e.FilteredAttributes
(*out)[i].Time = e.Time
(*out)[i].SpanID = e.SpanID
(*out)[i].TraceID = e.TraceID
switch e.Value.Type() {
case exemplar.Int64ValueType:
(*out)[i].Value = N(e.Value.Int64())
case exemplar.Float64ValueType:
(*out)[i].Value = N(e.Value.Float64())
}
}
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"errors"
"math"
"sync"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
const (
expoMaxScale = 20
expoMinScale = -10
smallestNonZeroNormalFloat64 = 0x1p-1022
// These redefine the Math constants with a type, so the compiler won't coerce
// them into an int on 32 bit platforms.
maxInt64 int64 = math.MaxInt64
minInt64 int64 = math.MinInt64
)
// expoHistogramDataPoint is a single data point in an exponential histogram.
type expoHistogramDataPoint[N int64 | float64] struct {
attrs attribute.Set
res FilteredExemplarReservoir[N]
count uint64
min N
max N
sum N
maxSize int
noMinMax bool
noSum bool
scale int32
posBuckets expoBuckets
negBuckets expoBuckets
zeroCount uint64
}
func newExpoHistogramDataPoint[N int64 | float64](
attrs attribute.Set,
maxSize int,
maxScale int32,
noMinMax, noSum bool,
) *expoHistogramDataPoint[N] { // nolint:revive // we need this control flag
f := math.MaxFloat64
ma := N(f) // if N is int64, max will overflow to -9223372036854775808
mi := N(-f)
if N(maxInt64) > N(f) {
ma = N(maxInt64)
mi = N(minInt64)
}
return &expoHistogramDataPoint[N]{
attrs: attrs,
min: ma,
max: mi,
maxSize: maxSize,
noMinMax: noMinMax,
noSum: noSum,
scale: maxScale,
}
}
// record adds a new measurement to the histogram. It will rescale the buckets if needed.
func (p *expoHistogramDataPoint[N]) record(v N) {
p.count++
if !p.noMinMax {
if v < p.min {
p.min = v
}
if v > p.max {
p.max = v
}
}
if !p.noSum {
p.sum += v
}
absV := math.Abs(float64(v))
if float64(absV) == 0.0 {
p.zeroCount++
return
}
bin := p.getBin(absV)
bucket := &p.posBuckets
if v < 0 {
bucket = &p.negBuckets
}
// If the new bin would make the counts larger than maxScale, we need to
// downscale current measurements.
if scaleDelta := p.scaleChange(bin, bucket.startBin, len(bucket.counts)); scaleDelta > 0 {
if p.scale-scaleDelta < expoMinScale {
// With a scale of -10 there is only two buckets for the whole range of float64 values.
// This can only happen if there is a max size of 1.
otel.Handle(errors.New("exponential histogram scale underflow"))
return
}
// Downscale
p.scale -= scaleDelta
p.posBuckets.downscale(scaleDelta)
p.negBuckets.downscale(scaleDelta)
bin = p.getBin(absV)
}
bucket.record(bin)
}
// getBin returns the bin v should be recorded into.
func (p *expoHistogramDataPoint[N]) getBin(v float64) int32 {
frac, expInt := math.Frexp(v)
// 11-bit exponential.
exp := int32(expInt) // nolint: gosec
if p.scale <= 0 {
// Because of the choice of fraction is always 1 power of two higher than we want.
var correction int32 = 1
if frac == .5 {
// If v is an exact power of two the frac will be .5 and the exp
// will be one higher than we want.
correction = 2
}
return (exp - correction) >> (-p.scale)
}
return exp<<p.scale + int32(math.Log(frac)*scaleFactors[p.scale]) - 1
}
// scaleFactors are constants used in calculating the logarithm index. They are
// equivalent to 2^index/log(2).
var scaleFactors = [21]float64{
math.Ldexp(math.Log2E, 0),
math.Ldexp(math.Log2E, 1),
math.Ldexp(math.Log2E, 2),
math.Ldexp(math.Log2E, 3),
math.Ldexp(math.Log2E, 4),
math.Ldexp(math.Log2E, 5),
math.Ldexp(math.Log2E, 6),
math.Ldexp(math.Log2E, 7),
math.Ldexp(math.Log2E, 8),
math.Ldexp(math.Log2E, 9),
math.Ldexp(math.Log2E, 10),
math.Ldexp(math.Log2E, 11),
math.Ldexp(math.Log2E, 12),
math.Ldexp(math.Log2E, 13),
math.Ldexp(math.Log2E, 14),
math.Ldexp(math.Log2E, 15),
math.Ldexp(math.Log2E, 16),
math.Ldexp(math.Log2E, 17),
math.Ldexp(math.Log2E, 18),
math.Ldexp(math.Log2E, 19),
math.Ldexp(math.Log2E, 20),
}
// scaleChange returns the magnitude of the scale change needed to fit bin in
// the bucket. If no scale change is needed 0 is returned.
func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin int32, length int) int32 {
if length == 0 {
// No need to rescale if there are no buckets.
return 0
}
low := int(startBin)
high := int(bin)
if startBin >= bin {
low = int(bin)
high = int(startBin) + length - 1
}
var count int32
for high-low >= p.maxSize {
low >>= 1
high >>= 1
count++
if count > expoMaxScale-expoMinScale {
return count
}
}
return count
}
// expoBuckets is a set of buckets in an exponential histogram.
type expoBuckets struct {
startBin int32
counts []uint64
}
// record increments the count for the given bin, and expands the buckets if needed.
// Size changes must be done before calling this function.
func (b *expoBuckets) record(bin int32) {
if len(b.counts) == 0 {
b.counts = []uint64{1}
b.startBin = bin
return
}
endBin := int(b.startBin) + len(b.counts) - 1
// if the new bin is inside the current range
if bin >= b.startBin && int(bin) <= endBin {
b.counts[bin-b.startBin]++
return
}
// if the new bin is before the current start add spaces to the counts
if bin < b.startBin {
origLen := len(b.counts)
newLength := endBin - int(bin) + 1
shift := b.startBin - bin
if newLength > cap(b.counts) {
b.counts = append(b.counts, make([]uint64, newLength-len(b.counts))...)
}
copy(b.counts[shift:origLen+int(shift)], b.counts)
b.counts = b.counts[:newLength]
for i := 1; i < int(shift); i++ {
b.counts[i] = 0
}
b.startBin = bin
b.counts[0] = 1
return
}
// if the new is after the end add spaces to the end
if int(bin) > endBin {
if int(bin-b.startBin) < cap(b.counts) {
b.counts = b.counts[:bin-b.startBin+1]
for i := endBin + 1 - int(b.startBin); i < len(b.counts); i++ {
b.counts[i] = 0
}
b.counts[bin-b.startBin] = 1
return
}
end := make([]uint64, int(bin-b.startBin)-len(b.counts)+1)
b.counts = append(b.counts, end...)
b.counts[bin-b.startBin] = 1
}
}
// downscale shrinks a bucket by a factor of 2*s. It will sum counts into the
// correct lower resolution bucket.
func (b *expoBuckets) downscale(delta int32) {
// Example
// delta = 2
// Original offset: -6
// Counts: [ 3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
// bins: -6 -5, -4, -3, -2, -1, 0, 1, 2, 3, 4
// new bins:-2, -2, -1, -1, -1, -1, 0, 0, 0, 0, 1
// new Offset: -2
// new Counts: [4, 14, 30, 10]
if len(b.counts) <= 1 || delta < 1 {
b.startBin >>= delta
return
}
steps := int32(1) << delta
offset := b.startBin % steps
offset = (offset + steps) % steps // to make offset positive
for i := 1; i < len(b.counts); i++ {
idx := i + int(offset)
if idx%int(steps) == 0 {
b.counts[idx/int(steps)] = b.counts[i]
continue
}
b.counts[idx/int(steps)] += b.counts[i]
}
lastIdx := (len(b.counts) - 1 + int(offset)) / int(steps)
b.counts = b.counts[:lastIdx+1]
b.startBin >>= delta
}
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](
maxSize, maxScale int32,
noMinMax, noSum bool,
limit int,
r func(attribute.Set) FilteredExemplarReservoir[N],
) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
maxSize: int(maxSize),
maxScale: maxScale,
newRes: r,
limit: newLimiter[expoHistogramDataPoint[N]](limit),
values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]),
start: now(),
}
}
// expoHistogram summarizes a set of measurements as an histogram with exponentially
// defined buckets.
type expoHistogram[N int64 | float64] struct {
noSum bool
noMinMax bool
maxSize int
maxScale int32
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex
start time.Time
}
func (e *expoHistogram[N]) measure(
ctx context.Context,
value N,
fltrAttr attribute.Set,
droppedAttr []attribute.KeyValue,
) {
// Ignore NaN and infinity.
if math.IsInf(float64(value), 0) || math.IsNaN(float64(value)) {
return
}
e.valuesMu.Lock()
defer e.valuesMu.Unlock()
v, ok := e.values[fltrAttr.Equivalent()]
if !ok {
fltrAttr = e.limit.Attributes(fltrAttr, e.values)
// If we overflowed, make sure we add to the existing overflow series
// if it already exists.
v, ok = e.values[fltrAttr.Equivalent()]
if !ok {
v = newExpoHistogramDataPoint[N](fltrAttr, e.maxSize, e.maxScale, e.noMinMax, e.noSum)
v.res = e.newRes(fltrAttr)
e.values[fltrAttr.Equivalent()] = v
}
}
v.record(value)
v.res.Offer(ctx, value, droppedAttr)
}
func (e *expoHistogram[N]) delta(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// If *dest is not a metricdata.ExponentialHistogram, memory reuse is missed.
// In that case, use the zero-value h and hope for better alignment next cycle.
h, _ := (*dest).(metricdata.ExponentialHistogram[N])
h.Temporality = metricdata.DeltaTemporality
e.valuesMu.Lock()
defer e.valuesMu.Unlock()
n := len(e.values)
hDPts := reset(h.DataPoints, n, n)
var i int
for _, val := range e.values {
hDPts[i].Attributes = val.attrs
hDPts[i].StartTime = e.start
hDPts[i].Time = t
hDPts[i].Count = val.count
hDPts[i].Scale = val.scale
hDPts[i].ZeroCount = val.zeroCount
hDPts[i].ZeroThreshold = 0.0
hDPts[i].PositiveBucket.Offset = val.posBuckets.startBin
hDPts[i].PositiveBucket.Counts = reset(
hDPts[i].PositiveBucket.Counts,
len(val.posBuckets.counts),
len(val.posBuckets.counts),
)
copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts)
hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin
hDPts[i].NegativeBucket.Counts = reset(
hDPts[i].NegativeBucket.Counts,
len(val.negBuckets.counts),
len(val.negBuckets.counts),
)
copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts)
if !e.noSum {
hDPts[i].Sum = val.sum
}
if !e.noMinMax {
hDPts[i].Min = metricdata.NewExtrema(val.min)
hDPts[i].Max = metricdata.NewExtrema(val.max)
}
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
i++
}
// Unused attribute sets do not report.
clear(e.values)
e.start = t
h.DataPoints = hDPts
*dest = h
return n
}
func (e *expoHistogram[N]) cumulative(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// If *dest is not a metricdata.ExponentialHistogram, memory reuse is missed.
// In that case, use the zero-value h and hope for better alignment next cycle.
h, _ := (*dest).(metricdata.ExponentialHistogram[N])
h.Temporality = metricdata.CumulativeTemporality
e.valuesMu.Lock()
defer e.valuesMu.Unlock()
n := len(e.values)
hDPts := reset(h.DataPoints, n, n)
var i int
for _, val := range e.values {
hDPts[i].Attributes = val.attrs
hDPts[i].StartTime = e.start
hDPts[i].Time = t
hDPts[i].Count = val.count
hDPts[i].Scale = val.scale
hDPts[i].ZeroCount = val.zeroCount
hDPts[i].ZeroThreshold = 0.0
hDPts[i].PositiveBucket.Offset = val.posBuckets.startBin
hDPts[i].PositiveBucket.Counts = reset(
hDPts[i].PositiveBucket.Counts,
len(val.posBuckets.counts),
len(val.posBuckets.counts),
)
copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts)
hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin
hDPts[i].NegativeBucket.Counts = reset(
hDPts[i].NegativeBucket.Counts,
len(val.negBuckets.counts),
len(val.negBuckets.counts),
)
copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts)
if !e.noSum {
hDPts[i].Sum = val.sum
}
if !e.noMinMax {
hDPts[i].Min = metricdata.NewExtrema(val.min)
hDPts[i].Max = metricdata.NewExtrema(val.max)
}
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
i++
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
}
h.DataPoints = hDPts
*dest = h
return n
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/reservoir"
)
// FilteredExemplarReservoir wraps a [exemplar.Reservoir] with a filter.
type FilteredExemplarReservoir[N int64 | float64] interface {
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the filter decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
Offer(ctx context.Context, val N, attr []attribute.KeyValue)
// Collect returns all the held exemplars in the reservoir.
Collect(dest *[]exemplar.Exemplar)
}
// filteredExemplarReservoir handles the pre-sampled exemplar of measurements made.
type filteredExemplarReservoir[N int64 | float64] struct {
filter exemplar.Filter
reservoir exemplar.Reservoir
// The exemplar.Reservoir is not required to be concurrent safe, but
// implementations can indicate that they are concurrent-safe by embedding
// reservoir.ConcurrentSafe in order to improve performance.
reservoirMux sync.Mutex
concurrentSafe bool
}
// NewFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which only offers values
// that are allowed by the filter.
func NewFilteredExemplarReservoir[N int64 | float64](
f exemplar.Filter,
r exemplar.Reservoir,
) FilteredExemplarReservoir[N] {
_, concurrentSafe := r.(reservoir.ConcurrentSafe)
return &filteredExemplarReservoir[N]{
filter: f,
reservoir: r,
concurrentSafe: concurrentSafe,
}
}
func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
if f.filter(ctx) {
// only record the current time if we are sampling this measurement.
ts := time.Now()
if !f.concurrentSafe {
f.reservoirMux.Lock()
defer f.reservoirMux.Unlock()
}
f.reservoir.Offer(ctx, ts, exemplar.NewValue(val), attr)
}
}
func (f *filteredExemplarReservoir[N]) Collect(dest *[]exemplar.Exemplar) {
if !f.concurrentSafe {
f.reservoirMux.Lock()
defer f.reservoirMux.Unlock()
}
f.reservoir.Collect(dest)
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"slices"
"sort"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
type buckets[N int64 | float64] struct {
attrs attribute.Set
res FilteredExemplarReservoir[N]
counts []uint64
count uint64
total N
min, max N
}
// newBuckets returns buckets with n bins.
func newBuckets[N int64 | float64](attrs attribute.Set, n int) *buckets[N] {
return &buckets[N]{attrs: attrs, counts: make([]uint64, n)}
}
func (b *buckets[N]) sum(value N) { b.total += value }
func (b *buckets[N]) bin(idx int) {
b.counts[idx]++
b.count++
}
func (b *buckets[N]) minMax(value N) {
if value < b.min {
b.min = value
} else if value > b.max {
b.max = value
}
}
// histValues summarizes a set of measurements as an histValues with
// explicitly defined buckets.
type histValues[N int64 | float64] struct {
noMinMax bool
noSum bool
bounds []float64
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[buckets[N]]
values map[attribute.Distinct]*buckets[N]
valuesMu sync.Mutex
}
func newHistValues[N int64 | float64](
bounds []float64,
noMinMax, noSum bool,
limit int,
r func(attribute.Set) FilteredExemplarReservoir[N],
) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
// complete control over the fix.
b := slices.Clone(bounds)
slices.Sort(b)
return &histValues[N]{
noMinMax: noMinMax,
noSum: noSum,
bounds: b,
newRes: r,
limit: newLimiter[buckets[N]](limit),
values: make(map[attribute.Distinct]*buckets[N]),
}
}
// Aggregate records the measurement value, scoped by attr, and aggregates it
// into a histogram.
func (s *histValues[N]) measure(
ctx context.Context,
value N,
fltrAttr attribute.Set,
droppedAttr []attribute.KeyValue,
) {
// This search will return an index in the range [0, len(s.bounds)], where
// it will return len(s.bounds) if value is greater than the last element
// of s.bounds. This aligns with the buckets in that the length of buckets
// is len(s.bounds)+1, with the last bucket representing:
// (s.bounds[len(s.bounds)-1], +∞).
idx := sort.SearchFloat64s(s.bounds, float64(value))
s.valuesMu.Lock()
defer s.valuesMu.Unlock()
b, ok := s.values[fltrAttr.Equivalent()]
if !ok {
fltrAttr = s.limit.Attributes(fltrAttr, s.values)
// If we overflowed, make sure we add to the existing overflow series
// if it already exists.
b, ok = s.values[fltrAttr.Equivalent()]
if !ok {
// N+1 buckets. For example:
//
// bounds = [0, 5, 10]
//
// Then,
//
// buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
b = newBuckets[N](fltrAttr, len(s.bounds)+1)
b.res = s.newRes(fltrAttr)
// Ensure min and max are recorded values (not zero), for new buckets.
b.min, b.max = value, value
s.values[fltrAttr.Equivalent()] = b
}
}
b.bin(idx)
if !s.noMinMax {
b.minMax(value)
}
if !s.noSum {
b.sum(value)
}
b.res.Offer(ctx, value, droppedAttr)
}
// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
func newHistogram[N int64 | float64](
boundaries []float64,
noMinMax, noSum bool,
limit int,
r func(attribute.Set) FilteredExemplarReservoir[N],
) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](boundaries, noMinMax, noSum, limit, r),
start: now(),
}
}
// histogram summarizes a set of measurements as an histogram with explicitly
// defined buckets.
type histogram[N int64 | float64] struct {
*histValues[N]
start time.Time
}
func (s *histogram[N]) delta(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// If *dest is not a metricdata.Histogram, memory reuse is missed. In that
// case, use the zero-value h and hope for better alignment next cycle.
h, _ := (*dest).(metricdata.Histogram[N])
h.Temporality = metricdata.DeltaTemporality
s.valuesMu.Lock()
defer s.valuesMu.Unlock()
// Do not allow modification of our copy of bounds.
bounds := slices.Clone(s.bounds)
n := len(s.values)
hDPts := reset(h.DataPoints, n, n)
var i int
for _, val := range s.values {
hDPts[i].Attributes = val.attrs
hDPts[i].StartTime = s.start
hDPts[i].Time = t
hDPts[i].Count = val.count
hDPts[i].Bounds = bounds
hDPts[i].BucketCounts = val.counts
if !s.noSum {
hDPts[i].Sum = val.total
}
if !s.noMinMax {
hDPts[i].Min = metricdata.NewExtrema(val.min)
hDPts[i].Max = metricdata.NewExtrema(val.max)
}
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
i++
}
// Unused attribute sets do not report.
clear(s.values)
// The delta collection cycle resets.
s.start = t
h.DataPoints = hDPts
*dest = h
return n
}
func (s *histogram[N]) cumulative(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// If *dest is not a metricdata.Histogram, memory reuse is missed. In that
// case, use the zero-value h and hope for better alignment next cycle.
h, _ := (*dest).(metricdata.Histogram[N])
h.Temporality = metricdata.CumulativeTemporality
s.valuesMu.Lock()
defer s.valuesMu.Unlock()
// Do not allow modification of our copy of bounds.
bounds := slices.Clone(s.bounds)
n := len(s.values)
hDPts := reset(h.DataPoints, n, n)
var i int
for _, val := range s.values {
hDPts[i].Attributes = val.attrs
hDPts[i].StartTime = s.start
hDPts[i].Time = t
hDPts[i].Count = val.count
hDPts[i].Bounds = bounds
// The HistogramDataPoint field values returned need to be copies of
// the buckets value as we will keep updating them.
//
// TODO (#3047): Making copies for bounds and counts incurs a large
// memory allocation footprint. Alternatives should be explored.
hDPts[i].BucketCounts = slices.Clone(val.counts)
if !s.noSum {
hDPts[i].Sum = val.total
}
if !s.noMinMax {
hDPts[i].Min = metricdata.NewExtrema(val.min)
hDPts[i].Max = metricdata.NewExtrema(val.max)
}
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
i++
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
}
h.DataPoints = hDPts
*dest = h
return n
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// datapoint is timestamped measurement data.
type datapoint[N int64 | float64] struct {
attrs attribute.Set
value N
res FilteredExemplarReservoir[N]
}
func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *lastValue[N] {
return &lastValue[N]{
newRes: r,
limit: newLimiter[datapoint[N]](limit),
values: make(map[attribute.Distinct]*datapoint[N]),
start: now(),
}
}
// lastValue summarizes a set of measurements as the last one made.
type lastValue[N int64 | float64] struct {
sync.Mutex
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[datapoint[N]]
values map[attribute.Distinct]*datapoint[N]
start time.Time
}
func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
s.Lock()
defer s.Unlock()
d, ok := s.values[fltrAttr.Equivalent()]
if !ok {
fltrAttr = s.limit.Attributes(fltrAttr, s.values)
d = &datapoint[N]{
res: s.newRes(fltrAttr),
attrs: fltrAttr,
}
}
d.value = value
d.res.Offer(ctx, value, droppedAttr)
s.values[fltrAttr.Equivalent()] = d
}
func (s *lastValue[N]) delta(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
s.Lock()
defer s.Unlock()
n := s.copyDpts(&gData.DataPoints, t)
// Do not report stale values.
clear(s.values)
// Update start time for delta temporality.
s.start = t
*dest = gData
return n
}
func (s *lastValue[N]) cumulative(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
s.Lock()
defer s.Unlock()
n := s.copyDpts(&gData.DataPoints, t)
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
*dest = gData
return n
}
// copyDpts copies the datapoints held by s into dest. The number of datapoints
// copied is returned.
func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) int {
n := len(s.values)
*dest = reset(*dest, n, n)
var i int
for _, v := range s.values {
(*dest)[i].Attributes = v.attrs
(*dest)[i].StartTime = s.start
(*dest)[i].Time = t
(*dest)[i].Value = v.value
collectExemplars(&(*dest)[i].Exemplars, v.res.Collect)
i++
}
return n
}
// newPrecomputedLastValue returns an aggregator that summarizes a set of
// observations as the last one made.
func newPrecomputedLastValue[N int64 | float64](
limit int,
r func(attribute.Set) FilteredExemplarReservoir[N],
) *precomputedLastValue[N] {
return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)}
}
// precomputedLastValue summarizes a set of observations as the last one made.
type precomputedLastValue[N int64 | float64] struct {
*lastValue[N]
}
func (s *precomputedLastValue[N]) delta(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
s.Lock()
defer s.Unlock()
n := s.copyDpts(&gData.DataPoints, t)
// Do not report stale values.
clear(s.values)
// Update start time for delta temporality.
s.start = t
*dest = gData
return n
}
func (s *precomputedLastValue[N]) cumulative(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
s.Lock()
defer s.Unlock()
n := s.copyDpts(&gData.DataPoints, t)
// Do not report stale values.
clear(s.values)
*dest = gData
return n
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import "go.opentelemetry.io/otel/attribute"
// overflowSet is the attribute set used to record a measurement when adding
// another distinct attribute set to the aggregate would exceed the aggregate
// limit.
var overflowSet = attribute.NewSet(attribute.Bool("otel.metric.overflow", true))
// limiter limits aggregate values.
type limiter[V any] struct {
// aggLimit is the maximum number of metric streams that can be aggregated.
//
// Any metric stream with attributes distinct from any set already
// aggregated once the aggLimit will be meet will instead be aggregated
// into an "overflow" metric stream. That stream will only contain the
// "otel.metric.overflow"=true attribute.
aggLimit int
}
// newLimiter returns a new Limiter with the provided aggregation limit.
func newLimiter[V any](aggregation int) limiter[V] {
return limiter[V]{aggLimit: aggregation}
}
// Attributes checks if adding a measurement for attrs will exceed the
// aggregation cardinality limit for the existing measurements. If it will,
// overflowSet is returned. Otherwise, if it will not exceed the limit, or the
// limit is not set (limit <= 0), attr is returned.
func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]*V) attribute.Set {
if l.aggLimit > 0 {
_, exists := measurements[attrs.Equivalent()]
if !exists && len(measurements) >= l.aggLimit-1 {
return overflowSet
}
}
return attrs
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
type sumValue[N int64 | float64] struct {
n atomicCounter[N]
res FilteredExemplarReservoir[N]
attrs attribute.Set
}
type valueMap[N int64 | float64] struct {
values limitedSyncMap
newRes func(attribute.Set) FilteredExemplarReservoir[N]
}
func (s *valueMap[N]) measure(
ctx context.Context,
value N,
fltrAttr attribute.Set,
droppedAttr []attribute.KeyValue,
) {
sv := s.values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any {
return &sumValue[N]{
res: s.newRes(attr),
attrs: attr,
}
}).(*sumValue[N])
sv.n.add(value)
// It is possible for collection to race with measurement and observe the
// exemplar in the batch of metrics after the add() for cumulative sums.
// This is an accepted tradeoff to avoid locking during measurement.
sv.res.Offer(ctx, value, droppedAttr)
}
// newDeltaSum returns an aggregator that summarizes a set of measurements as
// their arithmetic sum. Each sum is scoped by attributes and the aggregation
// cycle the measurements were made in.
func newDeltaSum[N int64 | float64](
monotonic bool,
limit int,
r func(attribute.Set) FilteredExemplarReservoir[N],
) *deltaSum[N] {
return &deltaSum[N]{
monotonic: monotonic,
start: now(),
hotColdValMap: [2]valueMap[N]{
{
values: limitedSyncMap{aggLimit: limit},
newRes: r,
},
{
values: limitedSyncMap{aggLimit: limit},
newRes: r,
},
},
}
}
// deltaSum is the storage for sums which resets every collection interval.
type deltaSum[N int64 | float64] struct {
monotonic bool
start time.Time
hcwg hotColdWaitGroup
hotColdValMap [2]valueMap[N]
}
func (s *deltaSum[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
hotIdx := s.hcwg.start()
defer s.hcwg.done(hotIdx)
s.hotColdValMap[hotIdx].measure(ctx, value, fltrAttr, droppedAttr)
}
func (s *deltaSum[N]) collect(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
// use the zero-value sData and hope for better alignment next cycle.
sData, _ := (*dest).(metricdata.Sum[N])
sData.Temporality = metricdata.DeltaTemporality
sData.IsMonotonic = s.monotonic
// delta always clears values on collection
readIdx := s.hcwg.swapHotAndWait()
// The len will not change while we iterate over values, since we waited
// for all writes to finish to the cold values and len.
n := s.hotColdValMap[readIdx].values.Len()
dPts := reset(sData.DataPoints, n, n)
var i int
s.hotColdValMap[readIdx].values.Range(func(_, value any) bool {
val := value.(*sumValue[N])
collectExemplars(&dPts[i].Exemplars, val.res.Collect)
dPts[i].Attributes = val.attrs
dPts[i].StartTime = s.start
dPts[i].Time = t
dPts[i].Value = val.n.load()
i++
return true
})
s.hotColdValMap[readIdx].values.Clear()
// The delta collection cycle resets.
s.start = t
sData.DataPoints = dPts
*dest = sData
return i
}
// newCumulativeSum returns an aggregator that summarizes a set of measurements
// as their arithmetic sum. Each sum is scoped by attributes and the
// aggregation cycle the measurements were made in.
func newCumulativeSum[N int64 | float64](
monotonic bool,
limit int,
r func(attribute.Set) FilteredExemplarReservoir[N],
) *cumulativeSum[N] {
return &cumulativeSum[N]{
monotonic: monotonic,
start: now(),
valueMap: valueMap[N]{
values: limitedSyncMap{aggLimit: limit},
newRes: r,
},
}
}
// deltaSum is the storage for sums which never reset.
type cumulativeSum[N int64 | float64] struct {
monotonic bool
start time.Time
valueMap[N]
}
func (s *cumulativeSum[N]) collect(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
// use the zero-value sData and hope for better alignment next cycle.
sData, _ := (*dest).(metricdata.Sum[N])
sData.Temporality = metricdata.CumulativeTemporality
sData.IsMonotonic = s.monotonic
// Values are being concurrently written while we iterate, so only use the
// current length for capacity.
dPts := reset(sData.DataPoints, 0, s.values.Len())
var i int
s.values.Range(func(_, value any) bool {
val := value.(*sumValue[N])
newPt := metricdata.DataPoint[N]{
Attributes: val.attrs,
StartTime: s.start,
Time: t,
Value: val.n.load(),
}
collectExemplars(&newPt.Exemplars, val.res.Collect)
dPts = append(dPts, newPt)
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
i++
return true
})
sData.DataPoints = dPts
*dest = sData
return i
}
// newPrecomputedSum returns an aggregator that summarizes a set of
// observations as their arithmetic sum. Each sum is scoped by attributes and
// the aggregation cycle the measurements were made in.
func newPrecomputedSum[N int64 | float64](
monotonic bool,
limit int,
r func(attribute.Set) FilteredExemplarReservoir[N],
) *precomputedSum[N] {
return &precomputedSum[N]{
deltaSum: newDeltaSum(monotonic, limit, r),
}
}
// precomputedSum summarizes a set of observations as their arithmetic sum.
type precomputedSum[N int64 | float64] struct {
*deltaSum[N]
reported map[any]N
}
func (s *precomputedSum[N]) delta(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
newReported := make(map[any]N)
// If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
// use the zero-value sData and hope for better alignment next cycle.
sData, _ := (*dest).(metricdata.Sum[N])
sData.Temporality = metricdata.DeltaTemporality
sData.IsMonotonic = s.monotonic
// delta always clears values on collection
readIdx := s.hcwg.swapHotAndWait()
// The len will not change while we iterate over values, since we waited
// for all writes to finish to the cold values and len.
n := s.hotColdValMap[readIdx].values.Len()
dPts := reset(sData.DataPoints, n, n)
var i int
s.hotColdValMap[readIdx].values.Range(func(key, value any) bool {
val := value.(*sumValue[N])
n := val.n.load()
delta := n - s.reported[key]
collectExemplars(&dPts[i].Exemplars, val.res.Collect)
dPts[i].Attributes = val.attrs
dPts[i].StartTime = s.start
dPts[i].Time = t
dPts[i].Value = delta
newReported[key] = n
i++
return true
})
s.hotColdValMap[readIdx].values.Clear()
s.reported = newReported
// The delta collection cycle resets.
s.start = t
sData.DataPoints = dPts
*dest = sData
return i
}
func (s *precomputedSum[N]) cumulative(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
// use the zero-value sData and hope for better alignment next cycle.
sData, _ := (*dest).(metricdata.Sum[N])
sData.Temporality = metricdata.CumulativeTemporality
sData.IsMonotonic = s.monotonic
// cumulative precomputed always clears values on collection
readIdx := s.hcwg.swapHotAndWait()
// The len will not change while we iterate over values, since we waited
// for all writes to finish to the cold values and len.
n := s.hotColdValMap[readIdx].values.Len()
dPts := reset(sData.DataPoints, n, n)
var i int
s.hotColdValMap[readIdx].values.Range(func(_, value any) bool {
val := value.(*sumValue[N])
collectExemplars(&dPts[i].Exemplars, val.res.Collect)
dPts[i].Attributes = val.attrs
dPts[i].StartTime = s.start
dPts[i].Time = t
dPts[i].Value = val.n.load()
i++
return true
})
s.hotColdValMap[readIdx].values.Clear()
sData.DataPoints = dPts
*dest = sData
return i
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package metricdata provides types for the metric SDK data model.
package metricdata // import "go.opentelemetry.io/otel/sdk/metric/metricdata"
import (
"encoding/json"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/resource"
)
// ResourceMetrics is a collection of ScopeMetrics and the associated Resource
// that created them.
type ResourceMetrics struct {
// Resource represents the entity that collected the metrics.
Resource *resource.Resource
// ScopeMetrics are the collection of metrics with unique Scopes.
ScopeMetrics []ScopeMetrics
}
// ScopeMetrics is a collection of Metrics Produces by a Meter.
type ScopeMetrics struct {
// Scope is the Scope that the Meter was created with.
Scope instrumentation.Scope
// Metrics are a list of aggregations created by the Meter.
Metrics []Metrics
}
// Metrics is a collection of one or more aggregated timeseries from an Instrument.
type Metrics struct {
// Name is the name of the Instrument that created this data.
Name string
// Description is the description of the Instrument, which can be used in documentation.
Description string
// Unit is the unit in which the Instrument reports.
Unit string
// Data is the aggregated data from an Instrument.
Data Aggregation
}
// Aggregation is the store of data reported by an Instrument.
// It will be one of: Gauge, Sum, Histogram.
type Aggregation interface {
privateAggregation()
}
// Gauge represents a measurement of the current value of an instrument.
type Gauge[N int64 | float64] struct {
// DataPoints are the individual aggregated measurements with unique
// Attributes.
DataPoints []DataPoint[N]
}
func (Gauge[N]) privateAggregation() {}
// Sum represents the sum of all measurements of values from an instrument.
type Sum[N int64 | float64] struct {
// DataPoints are the individual aggregated measurements with unique
// Attributes.
DataPoints []DataPoint[N]
// Temporality describes if the aggregation is reported as the change from the
// last report time, or the cumulative changes since a fixed start time.
Temporality Temporality
// IsMonotonic represents if this aggregation only increases or decreases.
IsMonotonic bool
}
func (Sum[N]) privateAggregation() {}
// DataPoint is a single data point in a timeseries.
type DataPoint[N int64 | float64] struct {
// Attributes is the set of key value pairs that uniquely identify the
// timeseries.
Attributes attribute.Set
// StartTime is when the timeseries was started. (optional)
StartTime time.Time `json:",omitempty"`
// Time is the time when the timeseries was recorded. (optional)
Time time.Time `json:",omitempty"`
// Value is the value of this data point.
Value N
// Exemplars is the sampled Exemplars collected during the timeseries.
Exemplars []Exemplar[N] `json:",omitempty"`
}
// Histogram represents the histogram of all measurements of values from an instrument.
type Histogram[N int64 | float64] struct {
// DataPoints are the individual aggregated measurements with unique
// Attributes.
DataPoints []HistogramDataPoint[N]
// Temporality describes if the aggregation is reported as the change from the
// last report time, or the cumulative changes since a fixed start time.
Temporality Temporality
}
func (Histogram[N]) privateAggregation() {}
// HistogramDataPoint is a single histogram data point in a timeseries.
type HistogramDataPoint[N int64 | float64] struct {
// Attributes is the set of key value pairs that uniquely identify the
// timeseries.
Attributes attribute.Set
// StartTime is when the timeseries was started.
StartTime time.Time
// Time is the time when the timeseries was recorded.
Time time.Time
// Count is the number of updates this histogram has been calculated with.
Count uint64
// Bounds are the upper bounds of the buckets of the histogram. Because the
// last boundary is +infinity this one is implied.
Bounds []float64
// BucketCounts is the count of each of the buckets.
BucketCounts []uint64
// Min is the minimum value recorded. (optional)
Min Extrema[N]
// Max is the maximum value recorded. (optional)
Max Extrema[N]
// Sum is the sum of the values recorded.
Sum N
// Exemplars is the sampled Exemplars collected during the timeseries.
Exemplars []Exemplar[N] `json:",omitempty"`
}
// ExponentialHistogram represents the histogram of all measurements of values from an instrument.
type ExponentialHistogram[N int64 | float64] struct {
// DataPoints are the individual aggregated measurements with unique
// attributes.
DataPoints []ExponentialHistogramDataPoint[N]
// Temporality describes if the aggregation is reported as the change from the
// last report time, or the cumulative changes since a fixed start time.
Temporality Temporality
}
func (ExponentialHistogram[N]) privateAggregation() {}
// ExponentialHistogramDataPoint is a single exponential histogram data point in a timeseries.
type ExponentialHistogramDataPoint[N int64 | float64] struct {
// Attributes is the set of key value pairs that uniquely identify the
// timeseries.
Attributes attribute.Set
// StartTime is when the timeseries was started.
StartTime time.Time
// Time is the time when the timeseries was recorded.
Time time.Time
// Count is the number of updates this histogram has been calculated with.
Count uint64
// Min is the minimum value recorded. (optional)
Min Extrema[N]
// Max is the maximum value recorded. (optional)
Max Extrema[N]
// Sum is the sum of the values recorded.
Sum N
// Scale describes the resolution of the histogram. Boundaries are
// located at powers of the base, where:
//
// base = 2 ^ (2 ^ -Scale)
Scale int32
// ZeroCount is the number of values whose absolute value
// is less than or equal to [ZeroThreshold].
// When ZeroThreshold is 0, this is the number of values that
// cannot be expressed using the standard exponential formula
// as well as values that have been rounded to zero.
// ZeroCount represents the special zero count bucket.
ZeroCount uint64
// PositiveBucket is range of positive value bucket counts.
PositiveBucket ExponentialBucket
// NegativeBucket is range of negative value bucket counts.
NegativeBucket ExponentialBucket
// ZeroThreshold is the width of the zero region. Where the zero region is
// defined as the closed interval [-ZeroThreshold, ZeroThreshold].
ZeroThreshold float64
// Exemplars is the sampled Exemplars collected during the timeseries.
Exemplars []Exemplar[N] `json:",omitempty"`
}
// ExponentialBucket are a set of bucket counts, encoded in a contiguous array
// of counts.
type ExponentialBucket struct {
// Offset is the bucket index of the first entry in the Counts slice.
Offset int32
// Counts is an slice where Counts[i] carries the count of the bucket at
// index (Offset+i). Counts[i] is the count of values greater than
// base^(Offset+i) and less than or equal to base^(Offset+i+1).
Counts []uint64
}
// Extrema is the minimum or maximum value of a dataset.
type Extrema[N int64 | float64] struct {
value N
valid bool
}
// MarshalText converts the Extrema value to text.
func (e Extrema[N]) MarshalText() ([]byte, error) {
if !e.valid {
return json.Marshal(nil)
}
return json.Marshal(e.value)
}
// MarshalJSON converts the Extrema value to JSON number.
func (e *Extrema[N]) MarshalJSON() ([]byte, error) {
return e.MarshalText()
}
// NewExtrema returns an Extrema set to v.
func NewExtrema[N int64 | float64](v N) Extrema[N] {
return Extrema[N]{value: v, valid: true}
}
// Value returns the Extrema value and true if the Extrema is defined.
// Otherwise, if the Extrema is its zero-value, defined will be false.
func (e Extrema[N]) Value() (v N, defined bool) {
return e.value, e.valid
}
// Exemplar is a measurement sampled from a timeseries providing a typical
// example.
type Exemplar[N int64 | float64] struct {
// FilteredAttributes are the attributes recorded with the measurement but
// filtered out of the timeseries' aggregated data.
FilteredAttributes []attribute.KeyValue
// Time is the time when the measurement was recorded.
Time time.Time
// Value is the measured value.
Value N
// SpanID is the ID of the span that was active during the measurement. If
// no span was active or the span was not sampled this will be empty.
SpanID []byte `json:",omitempty"`
// TraceID is the ID of the trace the active span belonged to during the
// measurement. If no span was active or the span was not sampled this will
// be empty.
TraceID []byte `json:",omitempty"`
}
// Summary metric data are used to convey quantile summaries,
// a Prometheus (see: https://prometheus.io/docs/concepts/metric_types/#summary)
// data type.
//
// These data points cannot always be merged in a meaningful way. The Summary
// type is only used by bridges from other metrics libraries, and cannot be
// produced using OpenTelemetry instrumentation.
type Summary struct {
// DataPoints are the individual aggregated measurements with unique
// attributes.
DataPoints []SummaryDataPoint
}
func (Summary) privateAggregation() {}
// SummaryDataPoint is a single data point in a timeseries that describes the
// time-varying values of a Summary metric.
type SummaryDataPoint struct {
// Attributes is the set of key value pairs that uniquely identify the
// timeseries.
Attributes attribute.Set
// StartTime is when the timeseries was started.
StartTime time.Time
// Time is the time when the timeseries was recorded.
Time time.Time
// Count is the number of updates this summary has been calculated with.
Count uint64
// Sum is the sum of the values recorded.
Sum float64
// (Optional) list of values at different quantiles of the distribution calculated
// from the current snapshot. The quantiles must be strictly increasing.
QuantileValues []QuantileValue
}
// QuantileValue is the value at a given quantile of a summary.
type QuantileValue struct {
// Quantile is the quantile of this value.
//
// Must be in the interval [0.0, 1.0].
Quantile float64
// Value is the value at the given quantile of a summary.
//
// Quantile values must NOT be negative.
Value float64
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package metricdatatest provides testing functionality for use with the
// metricdata package.
package metricdatatest // import "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
import (
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// Datatypes are the concrete data-types the metricdata package provides.
type Datatypes interface {
metricdata.DataPoint[float64] |
metricdata.DataPoint[int64] |
metricdata.Gauge[float64] |
metricdata.Gauge[int64] |
metricdata.Histogram[float64] |
metricdata.Histogram[int64] |
metricdata.HistogramDataPoint[float64] |
metricdata.HistogramDataPoint[int64] |
metricdata.Extrema[int64] |
metricdata.Extrema[float64] |
metricdata.Metrics |
metricdata.ResourceMetrics |
metricdata.ScopeMetrics |
metricdata.Sum[float64] |
metricdata.Sum[int64] |
metricdata.Exemplar[float64] |
metricdata.Exemplar[int64] |
metricdata.ExponentialHistogram[float64] |
metricdata.ExponentialHistogram[int64] |
metricdata.ExponentialHistogramDataPoint[float64] |
metricdata.ExponentialHistogramDataPoint[int64] |
metricdata.ExponentialBucket |
metricdata.Summary |
metricdata.SummaryDataPoint |
metricdata.QuantileValue
// Interface types are not allowed in union types, therefore the
// Aggregation and Value type from metricdata are not included here.
}
// TestingT is an interface that implements [testing.T], but without the
// private method of [testing.TB], so other testing packages can rely on it as
// well.
// The methods in this interface must match the [testing.TB] interface.
type TestingT interface {
Helper()
// DO NOT CHANGE: any modification will not be backwards compatible and
// must never be done outside of a new major release.
Error(...any)
// DO NOT CHANGE: any modification will not be backwards compatible and
// must never be done outside of a new major release.
}
type config struct {
ignoreTimestamp bool
ignoreExemplars bool
ignoreValue bool
}
func newConfig(opts []Option) config {
var cfg config
for _, opt := range opts {
cfg = opt.apply(cfg)
}
return cfg
}
// Option allows for fine grain control over how AssertEqual operates.
type Option interface {
apply(cfg config) config
}
type fnOption func(cfg config) config
func (fn fnOption) apply(cfg config) config {
return fn(cfg)
}
// IgnoreTimestamp disables checking if timestamps are different.
func IgnoreTimestamp() Option {
return fnOption(func(cfg config) config {
cfg.ignoreTimestamp = true
return cfg
})
}
// IgnoreExemplars disables checking if Exemplars are different.
func IgnoreExemplars() Option {
return fnOption(func(cfg config) config {
cfg.ignoreExemplars = true
return cfg
})
}
// IgnoreValue disables checking if values are different. This can be
// useful for non-deterministic values, like measured durations.
//
// This will ignore the value and trace information for Exemplars;
// the buckets, zero count, scale, sum, max, min, and counts of
// ExponentialHistogramDataPoints; the buckets, sum, count, max,
// and min of HistogramDataPoints; the value of DataPoints.
func IgnoreValue() Option {
return fnOption(func(cfg config) config {
cfg.ignoreValue = true
return cfg
})
}
// AssertEqual asserts that the two concrete data-types from the metricdata
// package are equal.
func AssertEqual[T Datatypes](t TestingT, expected, actual T, opts ...Option) bool {
t.Helper()
cfg := newConfig(opts)
// Generic types cannot be type asserted. Use an interface instead.
aIface := any(actual)
var r []string
switch e := any(expected).(type) {
case metricdata.Exemplar[int64]:
r = equalExemplars(e, aIface.(metricdata.Exemplar[int64]), cfg)
case metricdata.Exemplar[float64]:
r = equalExemplars(e, aIface.(metricdata.Exemplar[float64]), cfg)
case metricdata.DataPoint[int64]:
r = equalDataPoints(e, aIface.(metricdata.DataPoint[int64]), cfg)
case metricdata.DataPoint[float64]:
r = equalDataPoints(e, aIface.(metricdata.DataPoint[float64]), cfg)
case metricdata.Gauge[int64]:
r = equalGauges(e, aIface.(metricdata.Gauge[int64]), cfg)
case metricdata.Gauge[float64]:
r = equalGauges(e, aIface.(metricdata.Gauge[float64]), cfg)
case metricdata.Histogram[float64]:
r = equalHistograms(e, aIface.(metricdata.Histogram[float64]), cfg)
case metricdata.Histogram[int64]:
r = equalHistograms(e, aIface.(metricdata.Histogram[int64]), cfg)
case metricdata.HistogramDataPoint[float64]:
r = equalHistogramDataPoints(e, aIface.(metricdata.HistogramDataPoint[float64]), cfg)
case metricdata.HistogramDataPoint[int64]:
r = equalHistogramDataPoints(e, aIface.(metricdata.HistogramDataPoint[int64]), cfg)
case metricdata.Extrema[int64]:
r = equalExtrema(e, aIface.(metricdata.Extrema[int64]), cfg)
case metricdata.Extrema[float64]:
r = equalExtrema(e, aIface.(metricdata.Extrema[float64]), cfg)
case metricdata.Metrics:
r = equalMetrics(e, aIface.(metricdata.Metrics), cfg)
case metricdata.ResourceMetrics:
r = equalResourceMetrics(e, aIface.(metricdata.ResourceMetrics), cfg)
case metricdata.ScopeMetrics:
r = equalScopeMetrics(e, aIface.(metricdata.ScopeMetrics), cfg)
case metricdata.Sum[int64]:
r = equalSums(e, aIface.(metricdata.Sum[int64]), cfg)
case metricdata.Sum[float64]:
r = equalSums(e, aIface.(metricdata.Sum[float64]), cfg)
case metricdata.ExponentialHistogram[float64]:
r = equalExponentialHistograms(e, aIface.(metricdata.ExponentialHistogram[float64]), cfg)
case metricdata.ExponentialHistogram[int64]:
r = equalExponentialHistograms(e, aIface.(metricdata.ExponentialHistogram[int64]), cfg)
case metricdata.ExponentialHistogramDataPoint[float64]:
r = equalExponentialHistogramDataPoints(e, aIface.(metricdata.ExponentialHistogramDataPoint[float64]), cfg)
case metricdata.ExponentialHistogramDataPoint[int64]:
r = equalExponentialHistogramDataPoints(e, aIface.(metricdata.ExponentialHistogramDataPoint[int64]), cfg)
case metricdata.ExponentialBucket:
r = equalExponentialBuckets(e, aIface.(metricdata.ExponentialBucket), cfg)
case metricdata.Summary:
r = equalSummary(e, aIface.(metricdata.Summary), cfg)
case metricdata.SummaryDataPoint:
r = equalSummaryDataPoint(e, aIface.(metricdata.SummaryDataPoint), cfg)
case metricdata.QuantileValue:
r = equalQuantileValue(e, aIface.(metricdata.QuantileValue), cfg)
default:
// We control all types passed to this, panic to signal developers
// early they changed things in an incompatible way.
panic(fmt.Sprintf("unknown types: %T", expected))
}
if len(r) > 0 {
t.Error(r)
return false
}
return true
}
// AssertAggregationsEqual asserts that two Aggregations are equal.
func AssertAggregationsEqual(t TestingT, expected, actual metricdata.Aggregation, opts ...Option) bool {
t.Helper()
cfg := newConfig(opts)
if r := equalAggregations(expected, actual, cfg); len(r) > 0 {
t.Error(r)
return false
}
return true
}
// AssertHasAttributes asserts that all Datapoints or HistogramDataPoints have all passed attrs.
func AssertHasAttributes[T Datatypes](t TestingT, actual T, attrs ...attribute.KeyValue) bool {
t.Helper()
var reasons []string
switch e := any(actual).(type) {
case metricdata.Exemplar[int64]:
reasons = hasAttributesExemplars(e, attrs...)
case metricdata.Exemplar[float64]:
reasons = hasAttributesExemplars(e, attrs...)
case metricdata.DataPoint[int64]:
reasons = hasAttributesDataPoints(e, attrs...)
case metricdata.DataPoint[float64]:
reasons = hasAttributesDataPoints(e, attrs...)
case metricdata.Gauge[int64]:
reasons = hasAttributesGauge(e, attrs...)
case metricdata.Gauge[float64]:
reasons = hasAttributesGauge(e, attrs...)
case metricdata.Sum[int64]:
reasons = hasAttributesSum(e, attrs...)
case metricdata.Sum[float64]:
reasons = hasAttributesSum(e, attrs...)
case metricdata.HistogramDataPoint[int64]:
reasons = hasAttributesHistogramDataPoints(e, attrs...)
case metricdata.HistogramDataPoint[float64]:
reasons = hasAttributesHistogramDataPoints(e, attrs...)
case metricdata.Extrema[int64], metricdata.Extrema[float64]:
// Nothing to check.
case metricdata.Histogram[int64]:
reasons = hasAttributesHistogram(e, attrs...)
case metricdata.Histogram[float64]:
reasons = hasAttributesHistogram(e, attrs...)
case metricdata.Metrics:
reasons = hasAttributesMetrics(e, attrs...)
case metricdata.ScopeMetrics:
reasons = hasAttributesScopeMetrics(e, attrs...)
case metricdata.ResourceMetrics:
reasons = hasAttributesResourceMetrics(e, attrs...)
case metricdata.ExponentialHistogram[int64]:
reasons = hasAttributesExponentialHistogram(e, attrs...)
case metricdata.ExponentialHistogram[float64]:
reasons = hasAttributesExponentialHistogram(e, attrs...)
case metricdata.ExponentialHistogramDataPoint[int64]:
reasons = hasAttributesExponentialHistogramDataPoints(e, attrs...)
case metricdata.ExponentialHistogramDataPoint[float64]:
reasons = hasAttributesExponentialHistogramDataPoints(e, attrs...)
case metricdata.ExponentialBucket:
// Nothing to check.
case metricdata.Summary:
reasons = hasAttributesSummary(e, attrs...)
case metricdata.SummaryDataPoint:
reasons = hasAttributesSummaryDataPoint(e, attrs...)
case metricdata.QuantileValue:
// Nothing to check.
default:
// We control all types passed to this, panic to signal developers
// early they changed things in an incompatible way.
panic(fmt.Sprintf("unknown types: %T", actual))
}
if len(reasons) > 0 {
t.Error(reasons)
return false
}
return true
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package metricdatatest // import "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
import (
"bytes"
"fmt"
"reflect"
"slices"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// equalResourceMetrics returns reasons ResourceMetrics are not equal. If they
// are equal, the returned reasons will be empty.
//
// The ScopeMetrics each ResourceMetrics contains are compared based on
// containing the same ScopeMetrics, not the order they are stored in.
func equalResourceMetrics(a, b metricdata.ResourceMetrics, cfg config) (reasons []string) {
if !a.Resource.Equal(b.Resource) {
reasons = append(reasons, notEqualStr("Resources", a.Resource, b.Resource))
}
r := compareDiff(diffSlices(
a.ScopeMetrics,
b.ScopeMetrics,
func(a, b metricdata.ScopeMetrics) bool {
r := equalScopeMetrics(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, "ResourceMetrics ScopeMetrics not equal:\n"+r)
}
return reasons
}
// equalScopeMetrics returns reasons ScopeMetrics are not equal. If they are
// equal, the returned reasons will be empty.
//
// The Metrics each ScopeMetrics contains are compared based on containing the
// same Metrics, not the order they are stored in.
func equalScopeMetrics(a, b metricdata.ScopeMetrics, cfg config) (reasons []string) {
if a.Scope != b.Scope {
reasons = append(reasons, notEqualStr("Scope", a.Scope, b.Scope))
}
r := compareDiff(diffSlices(
a.Metrics,
b.Metrics,
func(a, b metricdata.Metrics) bool {
r := equalMetrics(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, "ScopeMetrics Metrics not equal:\n"+r)
}
return reasons
}
// equalMetrics returns reasons Metrics are not equal. If they are equal, the
// returned reasons will be empty.
func equalMetrics(a, b metricdata.Metrics, cfg config) (reasons []string) {
if a.Name != b.Name {
reasons = append(reasons, notEqualStr("Name", a.Name, b.Name))
}
if a.Description != b.Description {
reasons = append(reasons, notEqualStr("Description", a.Description, b.Description))
}
if a.Unit != b.Unit {
reasons = append(reasons, notEqualStr("Unit", a.Unit, b.Unit))
}
r := equalAggregations(a.Data, b.Data, cfg)
if len(r) > 0 {
reasons = append(reasons, "Metrics Data not equal:")
reasons = append(reasons, r...)
}
return reasons
}
// equalAggregations returns reasons a and b are not equal. If they are equal,
// the returned reasons will be empty.
func equalAggregations(a, b metricdata.Aggregation, cfg config) (reasons []string) {
if a == nil || b == nil {
if a != b {
return []string{notEqualStr("Aggregation", a, b)}
}
return reasons
}
if reflect.TypeOf(a) != reflect.TypeOf(b) {
return []string{fmt.Sprintf("Aggregation types not equal:\nexpected: %T\nactual: %T", a, b)}
}
switch v := a.(type) {
case metricdata.Gauge[int64]:
r := equalGauges(v, b.(metricdata.Gauge[int64]), cfg)
if len(r) > 0 {
reasons = append(reasons, "Gauge[int64] not equal:")
reasons = append(reasons, r...)
}
case metricdata.Gauge[float64]:
r := equalGauges(v, b.(metricdata.Gauge[float64]), cfg)
if len(r) > 0 {
reasons = append(reasons, "Gauge[float64] not equal:")
reasons = append(reasons, r...)
}
case metricdata.Sum[int64]:
r := equalSums(v, b.(metricdata.Sum[int64]), cfg)
if len(r) > 0 {
reasons = append(reasons, "Sum[int64] not equal:")
reasons = append(reasons, r...)
}
case metricdata.Sum[float64]:
r := equalSums(v, b.(metricdata.Sum[float64]), cfg)
if len(r) > 0 {
reasons = append(reasons, "Sum[float64] not equal:")
reasons = append(reasons, r...)
}
case metricdata.Histogram[int64]:
r := equalHistograms(v, b.(metricdata.Histogram[int64]), cfg)
if len(r) > 0 {
reasons = append(reasons, "Histogram not equal:")
reasons = append(reasons, r...)
}
case metricdata.Histogram[float64]:
r := equalHistograms(v, b.(metricdata.Histogram[float64]), cfg)
if len(r) > 0 {
reasons = append(reasons, "Histogram not equal:")
reasons = append(reasons, r...)
}
case metricdata.ExponentialHistogram[int64]:
r := equalExponentialHistograms(v, b.(metricdata.ExponentialHistogram[int64]), cfg)
if len(r) > 0 {
reasons = append(reasons, "ExponentialHistogram not equal:")
reasons = append(reasons, r...)
}
case metricdata.ExponentialHistogram[float64]:
r := equalExponentialHistograms(v, b.(metricdata.ExponentialHistogram[float64]), cfg)
if len(r) > 0 {
reasons = append(reasons, "ExponentialHistogram not equal:")
reasons = append(reasons, r...)
}
case metricdata.Summary:
r := equalSummary(v, b.(metricdata.Summary), cfg)
if len(r) > 0 {
reasons = append(reasons, "Summary not equal:")
reasons = append(reasons, r...)
}
default:
reasons = append(reasons, fmt.Sprintf("Aggregation of unknown types %T", a))
}
return reasons
}
// equalGauges returns reasons Gauges are not equal. If they are equal, the
// returned reasons will be empty.
//
// The DataPoints each Gauge contains are compared based on containing the
// same DataPoints, not the order they are stored in.
func equalGauges[N int64 | float64](a, b metricdata.Gauge[N], cfg config) (reasons []string) {
r := compareDiff(diffSlices(
a.DataPoints,
b.DataPoints,
func(a, b metricdata.DataPoint[N]) bool {
r := equalDataPoints(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, "Gauge DataPoints not equal:\n"+r)
}
return reasons
}
// equalSums returns reasons Sums are not equal. If they are equal, the
// returned reasons will be empty.
//
// The DataPoints each Sum contains are compared based on containing the same
// DataPoints, not the order they are stored in.
func equalSums[N int64 | float64](a, b metricdata.Sum[N], cfg config) (reasons []string) {
if a.Temporality != b.Temporality {
reasons = append(reasons, notEqualStr("Temporality", a.Temporality, b.Temporality))
}
if a.IsMonotonic != b.IsMonotonic {
reasons = append(reasons, notEqualStr("IsMonotonic", a.IsMonotonic, b.IsMonotonic))
}
r := compareDiff(diffSlices(
a.DataPoints,
b.DataPoints,
func(a, b metricdata.DataPoint[N]) bool {
r := equalDataPoints(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, "Sum DataPoints not equal:\n"+r)
}
return reasons
}
// equalHistograms returns reasons Histograms are not equal. If they are
// equal, the returned reasons will be empty.
//
// The DataPoints each Histogram contains are compared based on containing the
// same HistogramDataPoint, not the order they are stored in.
func equalHistograms[N int64 | float64](a, b metricdata.Histogram[N], cfg config) (reasons []string) {
if a.Temporality != b.Temporality {
reasons = append(reasons, notEqualStr("Temporality", a.Temporality, b.Temporality))
}
r := compareDiff(diffSlices(
a.DataPoints,
b.DataPoints,
func(a, b metricdata.HistogramDataPoint[N]) bool {
r := equalHistogramDataPoints(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, "Histogram DataPoints not equal:\n"+r)
}
return reasons
}
// equalDataPoints returns reasons DataPoints are not equal. If they are
// equal, the returned reasons will be empty.
func equalDataPoints[N int64 | float64](
a, b metricdata.DataPoint[N],
cfg config,
) (reasons []string) { // nolint: revive // Intentional internal control flag
if !a.Attributes.Equals(&b.Attributes) {
reasons = append(reasons, notEqualStr(
"Attributes",
a.Attributes.Encoded(attribute.DefaultEncoder()),
b.Attributes.Encoded(attribute.DefaultEncoder()),
))
}
if !cfg.ignoreTimestamp {
if !a.StartTime.Equal(b.StartTime) {
reasons = append(reasons, notEqualStr("StartTime", a.StartTime.UnixNano(), b.StartTime.UnixNano()))
}
if !a.Time.Equal(b.Time) {
reasons = append(reasons, notEqualStr("Time", a.Time.UnixNano(), b.Time.UnixNano()))
}
}
if !cfg.ignoreValue {
if a.Value != b.Value {
reasons = append(reasons, notEqualStr("Value", a.Value, b.Value))
}
}
if !cfg.ignoreExemplars {
r := compareDiff(diffSlices(
a.Exemplars,
b.Exemplars,
func(a, b metricdata.Exemplar[N]) bool {
r := equalExemplars(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, "Exemplars not equal:\n"+r)
}
}
return reasons
}
// equalHistogramDataPoints returns reasons HistogramDataPoints are not equal.
// If they are equal, the returned reasons will be empty.
func equalHistogramDataPoints[N int64 | float64](
a, b metricdata.HistogramDataPoint[N],
cfg config,
) (reasons []string) { // nolint: revive // Intentional internal control flag
if !a.Attributes.Equals(&b.Attributes) {
reasons = append(reasons, notEqualStr(
"Attributes",
a.Attributes.Encoded(attribute.DefaultEncoder()),
b.Attributes.Encoded(attribute.DefaultEncoder()),
))
}
if !cfg.ignoreTimestamp {
if !a.StartTime.Equal(b.StartTime) {
reasons = append(reasons, notEqualStr("StartTime", a.StartTime.UnixNano(), b.StartTime.UnixNano()))
}
if !a.Time.Equal(b.Time) {
reasons = append(reasons, notEqualStr("Time", a.Time.UnixNano(), b.Time.UnixNano()))
}
}
if !cfg.ignoreValue {
if a.Count != b.Count {
reasons = append(reasons, notEqualStr("Count", a.Count, b.Count))
}
if !slices.Equal(a.Bounds, b.Bounds) {
reasons = append(reasons, notEqualStr("Bounds", a.Bounds, b.Bounds))
}
if !slices.Equal(a.BucketCounts, b.BucketCounts) {
reasons = append(reasons, notEqualStr("BucketCounts", a.BucketCounts, b.BucketCounts))
}
if !eqExtrema(a.Min, b.Min) {
reasons = append(reasons, notEqualStr("Min", a.Min, b.Min))
}
if !eqExtrema(a.Max, b.Max) {
reasons = append(reasons, notEqualStr("Max", a.Max, b.Max))
}
if a.Sum != b.Sum {
reasons = append(reasons, notEqualStr("Sum", a.Sum, b.Sum))
}
}
if !cfg.ignoreExemplars {
r := compareDiff(diffSlices(
a.Exemplars,
b.Exemplars,
func(a, b metricdata.Exemplar[N]) bool {
r := equalExemplars(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, "Exemplars not equal:\n"+r)
}
}
return reasons
}
// equalExponentialHistograms returns reasons exponential Histograms are not equal. If they are
// equal, the returned reasons will be empty.
//
// The DataPoints each Histogram contains are compared based on containing the
// same HistogramDataPoint, not the order they are stored in.
func equalExponentialHistograms[N int64 | float64](
a, b metricdata.ExponentialHistogram[N],
cfg config,
) (reasons []string) {
if a.Temporality != b.Temporality {
reasons = append(reasons, notEqualStr("Temporality", a.Temporality, b.Temporality))
}
r := compareDiff(diffSlices(
a.DataPoints,
b.DataPoints,
func(a, b metricdata.ExponentialHistogramDataPoint[N]) bool {
r := equalExponentialHistogramDataPoints(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, "Histogram DataPoints not equal:\n"+r)
}
return reasons
}
// equalExponentialHistogramDataPoints returns reasons HistogramDataPoints are not equal.
// If they are equal, the returned reasons will be empty.
func equalExponentialHistogramDataPoints[N int64 | float64](
a, b metricdata.ExponentialHistogramDataPoint[N],
cfg config,
) (reasons []string) { // nolint: revive // Intentional internal control flag
if !a.Attributes.Equals(&b.Attributes) {
reasons = append(reasons, notEqualStr(
"Attributes",
a.Attributes.Encoded(attribute.DefaultEncoder()),
b.Attributes.Encoded(attribute.DefaultEncoder()),
))
}
if !cfg.ignoreTimestamp {
if !a.StartTime.Equal(b.StartTime) {
reasons = append(reasons, notEqualStr("StartTime", a.StartTime.UnixNano(), b.StartTime.UnixNano()))
}
if !a.Time.Equal(b.Time) {
reasons = append(reasons, notEqualStr("Time", a.Time.UnixNano(), b.Time.UnixNano()))
}
}
if !cfg.ignoreValue {
if a.Count != b.Count {
reasons = append(reasons, notEqualStr("Count", a.Count, b.Count))
}
if !eqExtrema(a.Min, b.Min) {
reasons = append(reasons, notEqualStr("Min", a.Min, b.Min))
}
if !eqExtrema(a.Max, b.Max) {
reasons = append(reasons, notEqualStr("Max", a.Max, b.Max))
}
if a.Sum != b.Sum {
reasons = append(reasons, notEqualStr("Sum", a.Sum, b.Sum))
}
if a.Scale != b.Scale {
reasons = append(reasons, notEqualStr("Scale", a.Scale, b.Scale))
}
if a.ZeroCount != b.ZeroCount {
reasons = append(reasons, notEqualStr("ZeroCount", a.ZeroCount, b.ZeroCount))
}
r := equalExponentialBuckets(a.PositiveBucket, b.PositiveBucket, cfg)
if len(r) > 0 {
reasons = append(reasons, r...)
}
r = equalExponentialBuckets(a.NegativeBucket, b.NegativeBucket, cfg)
if len(r) > 0 {
reasons = append(reasons, r...)
}
}
if !cfg.ignoreExemplars {
r := compareDiff(diffSlices(
a.Exemplars,
b.Exemplars,
func(a, b metricdata.Exemplar[N]) bool {
r := equalExemplars(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, "Exemplars not equal:\n"+r)
}
}
return reasons
}
func equalExponentialBuckets(a, b metricdata.ExponentialBucket, _ config) (reasons []string) {
if a.Offset != b.Offset {
reasons = append(reasons, notEqualStr("Offset", a.Offset, b.Offset))
}
if !slices.Equal(a.Counts, b.Counts) {
reasons = append(reasons, notEqualStr("Counts", a.Counts, b.Counts))
}
return reasons
}
func equalSummary(a, b metricdata.Summary, cfg config) (reasons []string) {
r := compareDiff(diffSlices(
a.DataPoints,
b.DataPoints,
func(a, b metricdata.SummaryDataPoint) bool {
r := equalSummaryDataPoint(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, "Summary DataPoints not equal:\n"+r)
}
return reasons
}
func equalSummaryDataPoint(a, b metricdata.SummaryDataPoint, cfg config) (reasons []string) {
if !a.Attributes.Equals(&b.Attributes) {
reasons = append(reasons, notEqualStr(
"Attributes",
a.Attributes.Encoded(attribute.DefaultEncoder()),
b.Attributes.Encoded(attribute.DefaultEncoder()),
))
}
if !cfg.ignoreTimestamp {
if !a.StartTime.Equal(b.StartTime) {
reasons = append(reasons, notEqualStr("StartTime", a.StartTime.UnixNano(), b.StartTime.UnixNano()))
}
if !a.Time.Equal(b.Time) {
reasons = append(reasons, notEqualStr("Time", a.Time.UnixNano(), b.Time.UnixNano()))
}
}
if !cfg.ignoreValue {
if a.Count != b.Count {
reasons = append(reasons, notEqualStr("Count", a.Count, b.Count))
}
if a.Sum != b.Sum {
reasons = append(reasons, notEqualStr("Sum", a.Sum, b.Sum))
}
r := compareDiff(diffSlices(
a.QuantileValues,
b.QuantileValues,
func(a, b metricdata.QuantileValue) bool {
r := equalQuantileValue(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, r)
}
}
return reasons
}
func equalQuantileValue(a, b metricdata.QuantileValue, _ config) (reasons []string) {
if a.Quantile != b.Quantile {
reasons = append(reasons, notEqualStr("Quantile", a.Quantile, b.Quantile))
}
if a.Value != b.Value {
reasons = append(reasons, notEqualStr("Value", a.Value, b.Value))
}
return reasons
}
func notEqualStr(prefix string, expected, actual any) string {
return fmt.Sprintf("%s not equal:\nexpected: %v\nactual: %v", prefix, expected, actual)
}
func equalExtrema[N int64 | float64](a, b metricdata.Extrema[N], _ config) (reasons []string) {
if !eqExtrema(a, b) {
reasons = append(reasons, notEqualStr("Extrema", a, b))
}
return reasons
}
func eqExtrema[N int64 | float64](a, b metricdata.Extrema[N]) bool {
aV, aOk := a.Value()
bV, bOk := b.Value()
if !aOk || !bOk {
return aOk == bOk
}
return aV == bV
}
func equalKeyValue(a, b attribute.KeyValue) bool {
if a.Key != b.Key {
return false
}
if a.Value.Type() != b.Value.Type() {
return false
}
switch a.Value.Type() {
case attribute.BOOL:
if a.Value.AsBool() != b.Value.AsBool() {
return false
}
case attribute.INT64:
if a.Value.AsInt64() != b.Value.AsInt64() {
return false
}
case attribute.FLOAT64:
if a.Value.AsFloat64() != b.Value.AsFloat64() {
return false
}
case attribute.STRING:
if a.Value.AsString() != b.Value.AsString() {
return false
}
case attribute.BOOLSLICE:
if ok := slices.Equal(a.Value.AsBoolSlice(), b.Value.AsBoolSlice()); !ok {
return false
}
case attribute.INT64SLICE:
if ok := slices.Equal(a.Value.AsInt64Slice(), b.Value.AsInt64Slice()); !ok {
return false
}
case attribute.FLOAT64SLICE:
if ok := slices.Equal(a.Value.AsFloat64Slice(), b.Value.AsFloat64Slice()); !ok {
return false
}
case attribute.STRINGSLICE:
if ok := slices.Equal(a.Value.AsStringSlice(), b.Value.AsStringSlice()); !ok {
return false
}
default:
// We control all types passed to this, panic to signal developers
// early they changed things in an incompatible way.
panic(fmt.Sprintf("unknown attribute value type: %s", a.Value.Type()))
}
return true
}
func equalExemplars[N int64 | float64](a, b metricdata.Exemplar[N], cfg config) (reasons []string) {
if !slices.EqualFunc(a.FilteredAttributes, b.FilteredAttributes, equalKeyValue) {
reasons = append(reasons, notEqualStr("FilteredAttributes", a.FilteredAttributes, b.FilteredAttributes))
}
if !cfg.ignoreTimestamp {
if !a.Time.Equal(b.Time) {
reasons = append(reasons, notEqualStr("Time", a.Time.UnixNano(), b.Time.UnixNano()))
}
}
if !cfg.ignoreValue {
if a.Value != b.Value {
reasons = append(reasons, notEqualStr("Value", a.Value, b.Value))
}
}
if !slices.Equal(a.SpanID, b.SpanID) {
reasons = append(reasons, notEqualStr("SpanID", a.SpanID, b.SpanID))
}
if !slices.Equal(a.TraceID, b.TraceID) {
reasons = append(reasons, notEqualStr("TraceID", a.TraceID, b.TraceID))
}
return reasons
}
func diffSlices[T any](a, b []T, equal func(T, T) bool) (extraA, extraB []T) {
visited := make([]bool, len(b))
for i := range a {
found := false
for j := range b {
if visited[j] {
continue
}
if equal(a[i], b[j]) {
visited[j] = true
found = true
break
}
}
if !found {
extraA = append(extraA, a[i])
}
}
for j := range b {
if visited[j] {
continue
}
extraB = append(extraB, b[j])
}
return extraA, extraB
}
func compareDiff[T any](extraExpected, extraActual []T) string {
if len(extraExpected) == 0 && len(extraActual) == 0 {
return ""
}
formatter := func(v T) string {
return fmt.Sprintf("%#v", v)
}
var msg bytes.Buffer
if len(extraExpected) > 0 {
_, _ = msg.WriteString("missing expected values:\n")
for _, v := range extraExpected {
_, _ = msg.WriteString(formatter(v) + "\n")
}
}
if len(extraActual) > 0 {
_, _ = msg.WriteString("unexpected additional values:\n")
for _, v := range extraActual {
_, _ = msg.WriteString(formatter(v) + "\n")
}
}
return msg.String()
}
func missingAttrStr(name string) string {
return "missing attribute " + name
}
func hasAttributesExemplars[T int64 | float64](
exemplar metricdata.Exemplar[T],
attrs ...attribute.KeyValue,
) (reasons []string) {
s := attribute.NewSet(exemplar.FilteredAttributes...)
for _, attr := range attrs {
val, ok := s.Value(attr.Key)
if !ok {
reasons = append(reasons, missingAttrStr(string(attr.Key)))
continue
}
if val != attr.Value {
reasons = append(reasons, notEqualStr(string(attr.Key), attr.Value.Emit(), val.Emit()))
}
}
return reasons
}
func hasAttributesDataPoints[T int64 | float64](
dp metricdata.DataPoint[T],
attrs ...attribute.KeyValue,
) (reasons []string) {
for _, attr := range attrs {
val, ok := dp.Attributes.Value(attr.Key)
if !ok {
reasons = append(reasons, missingAttrStr(string(attr.Key)))
continue
}
if val != attr.Value {
reasons = append(reasons, notEqualStr(string(attr.Key), attr.Value.Emit(), val.Emit()))
}
}
return reasons
}
func hasAttributesGauge[T int64 | float64](gauge metricdata.Gauge[T], attrs ...attribute.KeyValue) (reasons []string) {
for n, dp := range gauge.DataPoints {
reas := hasAttributesDataPoints(dp, attrs...)
if len(reas) > 0 {
reasons = append(reasons, fmt.Sprintf("gauge datapoint %d attributes:\n", n))
reasons = append(reasons, reas...)
}
}
return reasons
}
func hasAttributesSum[T int64 | float64](sum metricdata.Sum[T], attrs ...attribute.KeyValue) (reasons []string) {
for n, dp := range sum.DataPoints {
reas := hasAttributesDataPoints(dp, attrs...)
if len(reas) > 0 {
reasons = append(reasons, fmt.Sprintf("sum datapoint %d attributes:\n", n))
reasons = append(reasons, reas...)
}
}
return reasons
}
func hasAttributesHistogramDataPoints[T int64 | float64](
dp metricdata.HistogramDataPoint[T],
attrs ...attribute.KeyValue,
) (reasons []string) {
for _, attr := range attrs {
val, ok := dp.Attributes.Value(attr.Key)
if !ok {
reasons = append(reasons, missingAttrStr(string(attr.Key)))
continue
}
if val != attr.Value {
reasons = append(reasons, notEqualStr(string(attr.Key), attr.Value.Emit(), val.Emit()))
}
}
return reasons
}
func hasAttributesHistogram[T int64 | float64](
histogram metricdata.Histogram[T],
attrs ...attribute.KeyValue,
) (reasons []string) {
for n, dp := range histogram.DataPoints {
reas := hasAttributesHistogramDataPoints(dp, attrs...)
if len(reas) > 0 {
reasons = append(reasons, fmt.Sprintf("histogram datapoint %d attributes:\n", n))
reasons = append(reasons, reas...)
}
}
return reasons
}
func hasAttributesExponentialHistogramDataPoints[T int64 | float64](
dp metricdata.ExponentialHistogramDataPoint[T],
attrs ...attribute.KeyValue,
) (reasons []string) {
for _, attr := range attrs {
val, ok := dp.Attributes.Value(attr.Key)
if !ok {
reasons = append(reasons, missingAttrStr(string(attr.Key)))
continue
}
if val != attr.Value {
reasons = append(reasons, notEqualStr(string(attr.Key), attr.Value.Emit(), val.Emit()))
}
}
return reasons
}
func hasAttributesExponentialHistogram[T int64 | float64](
histogram metricdata.ExponentialHistogram[T],
attrs ...attribute.KeyValue,
) (reasons []string) {
for n, dp := range histogram.DataPoints {
reas := hasAttributesExponentialHistogramDataPoints(dp, attrs...)
if len(reas) > 0 {
reasons = append(reasons, fmt.Sprintf("histogram datapoint %d attributes:\n", n))
reasons = append(reasons, reas...)
}
}
return reasons
}
func hasAttributesAggregation(agg metricdata.Aggregation, attrs ...attribute.KeyValue) (reasons []string) {
switch agg := agg.(type) {
case metricdata.Gauge[int64]:
reasons = hasAttributesGauge(agg, attrs...)
case metricdata.Gauge[float64]:
reasons = hasAttributesGauge(agg, attrs...)
case metricdata.Sum[int64]:
reasons = hasAttributesSum(agg, attrs...)
case metricdata.Sum[float64]:
reasons = hasAttributesSum(agg, attrs...)
case metricdata.Histogram[int64]:
reasons = hasAttributesHistogram(agg, attrs...)
case metricdata.Histogram[float64]:
reasons = hasAttributesHistogram(agg, attrs...)
case metricdata.ExponentialHistogram[int64]:
reasons = hasAttributesExponentialHistogram(agg, attrs...)
case metricdata.ExponentialHistogram[float64]:
reasons = hasAttributesExponentialHistogram(agg, attrs...)
case metricdata.Summary:
reasons = hasAttributesSummary(agg, attrs...)
default:
reasons = []string{fmt.Sprintf("unknown aggregation %T", agg)}
}
return reasons
}
func hasAttributesMetrics(metrics metricdata.Metrics, attrs ...attribute.KeyValue) (reasons []string) {
reas := hasAttributesAggregation(metrics.Data, attrs...)
if len(reas) > 0 {
reasons = append(reasons, fmt.Sprintf("Metric %s:\n", metrics.Name))
reasons = append(reasons, reas...)
}
return reasons
}
func hasAttributesScopeMetrics(sm metricdata.ScopeMetrics, attrs ...attribute.KeyValue) (reasons []string) {
for n, metrics := range sm.Metrics {
reas := hasAttributesMetrics(metrics, attrs...)
if len(reas) > 0 {
reasons = append(reasons, fmt.Sprintf("ScopeMetrics %s Metrics %d:\n", sm.Scope.Name, n))
reasons = append(reasons, reas...)
}
}
return reasons
}
func hasAttributesResourceMetrics(rm metricdata.ResourceMetrics, attrs ...attribute.KeyValue) (reasons []string) {
for n, sm := range rm.ScopeMetrics {
reas := hasAttributesScopeMetrics(sm, attrs...)
if len(reas) > 0 {
reasons = append(reasons, fmt.Sprintf("ResourceMetrics ScopeMetrics %d:\n", n))
reasons = append(reasons, reas...)
}
}
return reasons
}
func hasAttributesSummary(summary metricdata.Summary, attrs ...attribute.KeyValue) (reasons []string) {
for n, dp := range summary.DataPoints {
reas := hasAttributesSummaryDataPoint(dp, attrs...)
if len(reas) > 0 {
reasons = append(reasons, fmt.Sprintf("summary datapoint %d attributes:\n", n))
reasons = append(reasons, reas...)
}
}
return reasons
}
func hasAttributesSummaryDataPoint(dp metricdata.SummaryDataPoint, attrs ...attribute.KeyValue) (reasons []string) {
for _, attr := range attrs {
val, ok := dp.Attributes.Value(attr.Key)
if !ok {
reasons = append(reasons, missingAttrStr(string(attr.Key)))
continue
}
if val != attr.Value {
reasons = append(reasons, notEqualStr(string(attr.Key), attr.Value.Emit(), val.Emit()))
}
}
return reasons
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
//go:generate stringer -type=Temporality
package metricdata // import "go.opentelemetry.io/otel/sdk/metric/metricdata"
// Temporality defines the window that an aggregation was calculated over.
type Temporality uint8
const (
// undefinedTemporality represents an unset Temporality.
//nolint:unused
undefinedTemporality Temporality = iota
// CumulativeTemporality defines a measurement interval that continues to
// expand forward in time from a starting point. New measurements are
// added to all previous measurements since a start time.
CumulativeTemporality
// DeltaTemporality defines a measurement interval that resets each cycle.
// Measurements from one cycle are recorded independently, measurements
// from other cycles do not affect them.
DeltaTemporality
)
// MarshalText returns the byte encoded of t.
func (t Temporality) MarshalText() ([]byte, error) {
return []byte(t.String()), nil
}
// Code generated by "stringer -type=Temporality"; DO NOT EDIT.
package metricdata
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[undefinedTemporality-0]
_ = x[CumulativeTemporality-1]
_ = x[DeltaTemporality-2]
}
const _Temporality_name = "undefinedTemporalityCumulativeTemporalityDeltaTemporality"
var _Temporality_index = [...]uint8{0, 20, 41, 57}
func (i Temporality) String() string {
idx := int(i) - 0
if i < 0 || idx >= len(_Temporality_index)-1 {
return "Temporality(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _Temporality_name[_Temporality_index[idx]:_Temporality_index[idx+1]]
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package trace // import "go.opentelemetry.io/otel/trace"
import (
"context"
"encoding/json"
"fmt"
"math"
"os"
"reflect"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"unicode/utf8"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/trace/embedded"
"go.opentelemetry.io/otel/trace/internal/telemetry"
)
// newAutoTracerProvider returns an auto-instrumentable [trace.TracerProvider].
// If an [go.opentelemetry.io/auto.Instrumentation] is configured to instrument
// the process using the returned TracerProvider, all of the telemetry it
// produces will be processed and handled by that Instrumentation. By default,
// if no Instrumentation instruments the TracerProvider it will not generate
// any trace telemetry.
func newAutoTracerProvider() TracerProvider { return tracerProviderInstance }
var tracerProviderInstance = new(autoTracerProvider)
type autoTracerProvider struct{ embedded.TracerProvider }
var _ TracerProvider = autoTracerProvider{}
func (autoTracerProvider) Tracer(name string, opts ...TracerOption) Tracer {
cfg := NewTracerConfig(opts...)
return autoTracer{
name: name,
version: cfg.InstrumentationVersion(),
schemaURL: cfg.SchemaURL(),
}
}
type autoTracer struct {
embedded.Tracer
name, schemaURL, version string
}
var _ Tracer = autoTracer{}
func (t autoTracer) Start(ctx context.Context, name string, opts ...SpanStartOption) (context.Context, Span) {
var psc, sc SpanContext
sampled := true
span := new(autoSpan)
// Ask eBPF for sampling decision and span context info.
t.start(ctx, span, &psc, &sampled, &sc)
span.sampled.Store(sampled)
span.spanContext = sc
ctx = ContextWithSpan(ctx, span)
if sampled {
// Only build traces if sampled.
cfg := NewSpanStartConfig(opts...)
span.traces, span.span = t.traces(name, cfg, span.spanContext, psc)
}
return ctx, span
}
// Expected to be implemented in eBPF.
//
//go:noinline
func (*autoTracer) start(
ctx context.Context,
spanPtr *autoSpan,
psc *SpanContext,
sampled *bool,
sc *SpanContext,
) {
start(ctx, spanPtr, psc, sampled, sc)
}
// start is used for testing.
var start = func(context.Context, *autoSpan, *SpanContext, *bool, *SpanContext) {}
func (t autoTracer) traces(name string, cfg SpanConfig, sc, psc SpanContext) (*telemetry.Traces, *telemetry.Span) {
span := &telemetry.Span{
TraceID: telemetry.TraceID(sc.TraceID()),
SpanID: telemetry.SpanID(sc.SpanID()),
Flags: uint32(sc.TraceFlags()),
TraceState: sc.TraceState().String(),
ParentSpanID: telemetry.SpanID(psc.SpanID()),
Name: name,
Kind: spanKind(cfg.SpanKind()),
}
span.Attrs, span.DroppedAttrs = convCappedAttrs(maxSpan.Attrs, cfg.Attributes())
links := cfg.Links()
if limit := maxSpan.Links; limit == 0 {
n := int64(len(links))
if n > 0 {
span.DroppedLinks = uint32(min(n, math.MaxUint32)) // nolint: gosec // Bounds checked.
}
} else {
if limit > 0 {
n := int64(max(len(links)-limit, 0))
span.DroppedLinks = uint32(min(n, math.MaxUint32)) // nolint: gosec // Bounds checked.
links = links[n:]
}
span.Links = convLinks(links)
}
if t := cfg.Timestamp(); !t.IsZero() {
span.StartTime = cfg.Timestamp()
} else {
span.StartTime = time.Now()
}
return &telemetry.Traces{
ResourceSpans: []*telemetry.ResourceSpans{
{
ScopeSpans: []*telemetry.ScopeSpans{
{
Scope: &telemetry.Scope{
Name: t.name,
Version: t.version,
},
Spans: []*telemetry.Span{span},
SchemaURL: t.schemaURL,
},
},
},
},
}, span
}
func spanKind(kind SpanKind) telemetry.SpanKind {
switch kind {
case SpanKindInternal:
return telemetry.SpanKindInternal
case SpanKindServer:
return telemetry.SpanKindServer
case SpanKindClient:
return telemetry.SpanKindClient
case SpanKindProducer:
return telemetry.SpanKindProducer
case SpanKindConsumer:
return telemetry.SpanKindConsumer
}
return telemetry.SpanKind(0) // undefined.
}
type autoSpan struct {
embedded.Span
spanContext SpanContext
sampled atomic.Bool
mu sync.Mutex
traces *telemetry.Traces
span *telemetry.Span
}
func (s *autoSpan) SpanContext() SpanContext {
if s == nil {
return SpanContext{}
}
// s.spanContext is immutable, do not acquire lock s.mu.
return s.spanContext
}
func (s *autoSpan) IsRecording() bool {
if s == nil {
return false
}
return s.sampled.Load()
}
func (s *autoSpan) SetStatus(c codes.Code, msg string) {
if s == nil || !s.sampled.Load() {
return
}
s.mu.Lock()
defer s.mu.Unlock()
if s.span.Status == nil {
s.span.Status = new(telemetry.Status)
}
s.span.Status.Message = msg
switch c {
case codes.Unset:
s.span.Status.Code = telemetry.StatusCodeUnset
case codes.Error:
s.span.Status.Code = telemetry.StatusCodeError
case codes.Ok:
s.span.Status.Code = telemetry.StatusCodeOK
}
}
func (s *autoSpan) SetAttributes(attrs ...attribute.KeyValue) {
if s == nil || !s.sampled.Load() {
return
}
s.mu.Lock()
defer s.mu.Unlock()
limit := maxSpan.Attrs
if limit == 0 {
// No attributes allowed.
n := int64(len(attrs))
if n > 0 {
s.span.DroppedAttrs += uint32(min(n, math.MaxUint32)) // nolint: gosec // Bounds checked.
}
return
}
m := make(map[string]int)
for i, a := range s.span.Attrs {
m[a.Key] = i
}
for _, a := range attrs {
val := convAttrValue(a.Value)
if val.Empty() {
s.span.DroppedAttrs++
continue
}
if idx, ok := m[string(a.Key)]; ok {
s.span.Attrs[idx] = telemetry.Attr{
Key: string(a.Key),
Value: val,
}
} else if limit < 0 || len(s.span.Attrs) < limit {
s.span.Attrs = append(s.span.Attrs, telemetry.Attr{
Key: string(a.Key),
Value: val,
})
m[string(a.Key)] = len(s.span.Attrs) - 1
} else {
s.span.DroppedAttrs++
}
}
}
// convCappedAttrs converts up to limit attrs into a []telemetry.Attr. The
// number of dropped attributes is also returned.
func convCappedAttrs(limit int, attrs []attribute.KeyValue) ([]telemetry.Attr, uint32) {
n := len(attrs)
if limit == 0 {
var out uint32
if n > 0 {
out = uint32(min(int64(n), math.MaxUint32)) // nolint: gosec // Bounds checked.
}
return nil, out
}
if limit < 0 {
// Unlimited.
return convAttrs(attrs), 0
}
if n < 0 {
n = 0
}
limit = min(n, limit)
return convAttrs(attrs[:limit]), uint32(n - limit) // nolint: gosec // Bounds checked.
}
func convAttrs(attrs []attribute.KeyValue) []telemetry.Attr {
if len(attrs) == 0 {
// Avoid allocations if not necessary.
return nil
}
out := make([]telemetry.Attr, 0, len(attrs))
for _, attr := range attrs {
key := string(attr.Key)
val := convAttrValue(attr.Value)
if val.Empty() {
continue
}
out = append(out, telemetry.Attr{Key: key, Value: val})
}
return out
}
func convAttrValue(value attribute.Value) telemetry.Value {
switch value.Type() {
case attribute.BOOL:
return telemetry.BoolValue(value.AsBool())
case attribute.INT64:
return telemetry.Int64Value(value.AsInt64())
case attribute.FLOAT64:
return telemetry.Float64Value(value.AsFloat64())
case attribute.STRING:
v := truncate(maxSpan.AttrValueLen, value.AsString())
return telemetry.StringValue(v)
case attribute.BOOLSLICE:
slice := value.AsBoolSlice()
out := make([]telemetry.Value, 0, len(slice))
for _, v := range slice {
out = append(out, telemetry.BoolValue(v))
}
return telemetry.SliceValue(out...)
case attribute.INT64SLICE:
slice := value.AsInt64Slice()
out := make([]telemetry.Value, 0, len(slice))
for _, v := range slice {
out = append(out, telemetry.Int64Value(v))
}
return telemetry.SliceValue(out...)
case attribute.FLOAT64SLICE:
slice := value.AsFloat64Slice()
out := make([]telemetry.Value, 0, len(slice))
for _, v := range slice {
out = append(out, telemetry.Float64Value(v))
}
return telemetry.SliceValue(out...)
case attribute.STRINGSLICE:
slice := value.AsStringSlice()
out := make([]telemetry.Value, 0, len(slice))
for _, v := range slice {
v = truncate(maxSpan.AttrValueLen, v)
out = append(out, telemetry.StringValue(v))
}
return telemetry.SliceValue(out...)
}
return telemetry.Value{}
}
// truncate returns a truncated version of s such that it contains less than
// the limit number of characters. Truncation is applied by returning the limit
// number of valid characters contained in s.
//
// If limit is negative, it returns the original string.
//
// UTF-8 is supported. When truncating, all invalid characters are dropped
// before applying truncation.
//
// If s already contains less than the limit number of bytes, it is returned
// unchanged. No invalid characters are removed.
func truncate(limit int, s string) string {
// This prioritize performance in the following order based on the most
// common expected use-cases.
//
// - Short values less than the default limit (128).
// - Strings with valid encodings that exceed the limit.
// - No limit.
// - Strings with invalid encodings that exceed the limit.
if limit < 0 || len(s) <= limit {
return s
}
// Optimistically, assume all valid UTF-8.
var b strings.Builder
count := 0
for i, c := range s {
if c != utf8.RuneError {
count++
if count > limit {
return s[:i]
}
continue
}
_, size := utf8.DecodeRuneInString(s[i:])
if size == 1 {
// Invalid encoding.
b.Grow(len(s) - 1)
_, _ = b.WriteString(s[:i])
s = s[i:]
break
}
}
// Fast-path, no invalid input.
if b.Cap() == 0 {
return s
}
// Truncate while validating UTF-8.
for i := 0; i < len(s) && count < limit; {
c := s[i]
if c < utf8.RuneSelf {
// Optimization for single byte runes (common case).
_ = b.WriteByte(c)
i++
count++
continue
}
_, size := utf8.DecodeRuneInString(s[i:])
if size == 1 {
// We checked for all 1-byte runes above, this is a RuneError.
i++
continue
}
_, _ = b.WriteString(s[i : i+size])
i += size
count++
}
return b.String()
}
func (s *autoSpan) End(opts ...SpanEndOption) {
if s == nil || !s.sampled.Swap(false) {
return
}
// s.end exists so the lock (s.mu) is not held while s.ended is called.
s.ended(s.end(opts))
}
func (s *autoSpan) end(opts []SpanEndOption) []byte {
s.mu.Lock()
defer s.mu.Unlock()
cfg := NewSpanEndConfig(opts...)
if t := cfg.Timestamp(); !t.IsZero() {
s.span.EndTime = cfg.Timestamp()
} else {
s.span.EndTime = time.Now()
}
b, _ := json.Marshal(s.traces) // TODO: do not ignore this error.
return b
}
// Expected to be implemented in eBPF.
//
//go:noinline
func (*autoSpan) ended(buf []byte) { ended(buf) }
// ended is used for testing.
var ended = func([]byte) {}
func (s *autoSpan) RecordError(err error, opts ...EventOption) {
if s == nil || err == nil || !s.sampled.Load() {
return
}
cfg := NewEventConfig(opts...)
attrs := cfg.Attributes()
attrs = append(attrs,
semconv.ExceptionType(typeStr(err)),
semconv.ExceptionMessage(err.Error()),
)
if cfg.StackTrace() {
buf := make([]byte, 2048)
n := runtime.Stack(buf, false)
attrs = append(attrs, semconv.ExceptionStacktrace(string(buf[0:n])))
}
s.mu.Lock()
defer s.mu.Unlock()
s.addEvent(semconv.ExceptionEventName, cfg.Timestamp(), attrs)
}
func typeStr(i any) string {
t := reflect.TypeOf(i)
if t.PkgPath() == "" && t.Name() == "" {
// Likely a builtin type.
return t.String()
}
return fmt.Sprintf("%s.%s", t.PkgPath(), t.Name())
}
func (s *autoSpan) AddEvent(name string, opts ...EventOption) {
if s == nil || !s.sampled.Load() {
return
}
cfg := NewEventConfig(opts...)
s.mu.Lock()
defer s.mu.Unlock()
s.addEvent(name, cfg.Timestamp(), cfg.Attributes())
}
// addEvent adds an event with name and attrs at tStamp to the span. The span
// lock (s.mu) needs to be held by the caller.
func (s *autoSpan) addEvent(name string, tStamp time.Time, attrs []attribute.KeyValue) {
limit := maxSpan.Events
if limit == 0 {
s.span.DroppedEvents++
return
}
if limit > 0 && len(s.span.Events) == limit {
// Drop head while avoiding allocation of more capacity.
copy(s.span.Events[:limit-1], s.span.Events[1:])
s.span.Events = s.span.Events[:limit-1]
s.span.DroppedEvents++
}
e := &telemetry.SpanEvent{Time: tStamp, Name: name}
e.Attrs, e.DroppedAttrs = convCappedAttrs(maxSpan.EventAttrs, attrs)
s.span.Events = append(s.span.Events, e)
}
func (s *autoSpan) AddLink(link Link) {
if s == nil || !s.sampled.Load() {
return
}
l := maxSpan.Links
s.mu.Lock()
defer s.mu.Unlock()
if l == 0 {
s.span.DroppedLinks++
return
}
if l > 0 && len(s.span.Links) == l {
// Drop head while avoiding allocation of more capacity.
copy(s.span.Links[:l-1], s.span.Links[1:])
s.span.Links = s.span.Links[:l-1]
s.span.DroppedLinks++
}
s.span.Links = append(s.span.Links, convLink(link))
}
func convLinks(links []Link) []*telemetry.SpanLink {
out := make([]*telemetry.SpanLink, 0, len(links))
for _, link := range links {
out = append(out, convLink(link))
}
return out
}
func convLink(link Link) *telemetry.SpanLink {
l := &telemetry.SpanLink{
TraceID: telemetry.TraceID(link.SpanContext.TraceID()),
SpanID: telemetry.SpanID(link.SpanContext.SpanID()),
TraceState: link.SpanContext.TraceState().String(),
Flags: uint32(link.SpanContext.TraceFlags()),
}
l.Attrs, l.DroppedAttrs = convCappedAttrs(maxSpan.LinkAttrs, link.Attributes)
return l
}
func (s *autoSpan) SetName(name string) {
if s == nil || !s.sampled.Load() {
return
}
s.mu.Lock()
defer s.mu.Unlock()
s.span.Name = name
}
func (*autoSpan) TracerProvider() TracerProvider { return newAutoTracerProvider() }
// maxSpan are the span limits resolved during startup.
var maxSpan = newSpanLimits()
type spanLimits struct {
// Attrs is the number of allowed attributes for a span.
//
// This is resolved from the environment variable value for the
// OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT key if it exists. Otherwise, the
// environment variable value for OTEL_ATTRIBUTE_COUNT_LIMIT, or 128 if
// that is not set, is used.
Attrs int
// AttrValueLen is the maximum attribute value length allowed for a span.
//
// This is resolved from the environment variable value for the
// OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT key if it exists. Otherwise, the
// environment variable value for OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT, or -1
// if that is not set, is used.
AttrValueLen int
// Events is the number of allowed events for a span.
//
// This is resolved from the environment variable value for the
// OTEL_SPAN_EVENT_COUNT_LIMIT key, or 128 is used if that is not set.
Events int
// EventAttrs is the number of allowed attributes for a span event.
//
// The is resolved from the environment variable value for the
// OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT key, or 128 is used if that is not set.
EventAttrs int
// Links is the number of allowed Links for a span.
//
// This is resolved from the environment variable value for the
// OTEL_SPAN_LINK_COUNT_LIMIT, or 128 is used if that is not set.
Links int
// LinkAttrs is the number of allowed attributes for a span link.
//
// This is resolved from the environment variable value for the
// OTEL_LINK_ATTRIBUTE_COUNT_LIMIT, or 128 is used if that is not set.
LinkAttrs int
}
func newSpanLimits() spanLimits {
return spanLimits{
Attrs: firstEnv(
128,
"OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT",
"OTEL_ATTRIBUTE_COUNT_LIMIT",
),
AttrValueLen: firstEnv(
-1, // Unlimited.
"OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT",
"OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT",
),
Events: firstEnv(128, "OTEL_SPAN_EVENT_COUNT_LIMIT"),
EventAttrs: firstEnv(128, "OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT"),
Links: firstEnv(128, "OTEL_SPAN_LINK_COUNT_LIMIT"),
LinkAttrs: firstEnv(128, "OTEL_LINK_ATTRIBUTE_COUNT_LIMIT"),
}
}
// firstEnv returns the parsed integer value of the first matching environment
// variable from keys. The defaultVal is returned if the value is not an
// integer or no match is found.
func firstEnv(defaultVal int, keys ...string) int {
for _, key := range keys {
strV := os.Getenv(key)
if strV == "" {
continue
}
v, err := strconv.Atoi(strV)
if err == nil {
return v
}
// Ignore invalid environment variable.
}
return defaultVal
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package trace // import "go.opentelemetry.io/otel/trace"
import (
"slices"
"time"
"go.opentelemetry.io/otel/attribute"
)
// TracerConfig is a group of options for a Tracer.
type TracerConfig struct {
instrumentationVersion string
// Schema URL of the telemetry emitted by the Tracer.
schemaURL string
attrs attribute.Set
}
// InstrumentationVersion returns the version of the library providing instrumentation.
func (t *TracerConfig) InstrumentationVersion() string {
return t.instrumentationVersion
}
// InstrumentationAttributes returns the attributes associated with the library
// providing instrumentation.
func (t *TracerConfig) InstrumentationAttributes() attribute.Set {
return t.attrs
}
// SchemaURL returns the Schema URL of the telemetry emitted by the Tracer.
func (t *TracerConfig) SchemaURL() string {
return t.schemaURL
}
// NewTracerConfig applies all the options to a returned TracerConfig.
func NewTracerConfig(options ...TracerOption) TracerConfig {
var config TracerConfig
for _, option := range options {
config = option.apply(config)
}
return config
}
// TracerOption applies an option to a TracerConfig.
type TracerOption interface {
apply(TracerConfig) TracerConfig
}
type tracerOptionFunc func(TracerConfig) TracerConfig
func (fn tracerOptionFunc) apply(cfg TracerConfig) TracerConfig {
return fn(cfg)
}
// SpanConfig is a group of options for a Span.
type SpanConfig struct {
attributes []attribute.KeyValue
timestamp time.Time
links []Link
newRoot bool
spanKind SpanKind
stackTrace bool
}
// Attributes describe the associated qualities of a Span.
func (cfg *SpanConfig) Attributes() []attribute.KeyValue {
return cfg.attributes
}
// Timestamp is a time in a Span life-cycle.
func (cfg *SpanConfig) Timestamp() time.Time {
return cfg.timestamp
}
// StackTrace reports whether stack trace capturing is enabled.
func (cfg *SpanConfig) StackTrace() bool {
return cfg.stackTrace
}
// Links are the associations a Span has with other Spans.
func (cfg *SpanConfig) Links() []Link {
return cfg.links
}
// NewRoot identifies a Span as the root Span for a new trace. This is
// commonly used when an existing trace crosses trust boundaries and the
// remote parent span context should be ignored for security.
func (cfg *SpanConfig) NewRoot() bool {
return cfg.newRoot
}
// SpanKind is the role a Span has in a trace.
func (cfg *SpanConfig) SpanKind() SpanKind {
return cfg.spanKind
}
// NewSpanStartConfig applies all the options to a returned SpanConfig.
// No validation is performed on the returned SpanConfig (e.g. no uniqueness
// checking or bounding of data), it is left to the SDK to perform this
// action.
func NewSpanStartConfig(options ...SpanStartOption) SpanConfig {
var c SpanConfig
for _, option := range options {
c = option.applySpanStart(c)
}
return c
}
// NewSpanEndConfig applies all the options to a returned SpanConfig.
// No validation is performed on the returned SpanConfig (e.g. no uniqueness
// checking or bounding of data), it is left to the SDK to perform this
// action.
func NewSpanEndConfig(options ...SpanEndOption) SpanConfig {
var c SpanConfig
for _, option := range options {
c = option.applySpanEnd(c)
}
return c
}
// SpanStartOption applies an option to a SpanConfig. These options are applicable
// only when the span is created.
type SpanStartOption interface {
applySpanStart(SpanConfig) SpanConfig
}
type spanOptionFunc func(SpanConfig) SpanConfig
func (fn spanOptionFunc) applySpanStart(cfg SpanConfig) SpanConfig {
return fn(cfg)
}
// SpanEndOption applies an option to a SpanConfig. These options are
// applicable only when the span is ended.
type SpanEndOption interface {
applySpanEnd(SpanConfig) SpanConfig
}
// EventConfig is a group of options for an Event.
type EventConfig struct {
attributes []attribute.KeyValue
timestamp time.Time
stackTrace bool
}
// Attributes describe the associated qualities of an Event.
func (cfg *EventConfig) Attributes() []attribute.KeyValue {
return cfg.attributes
}
// Timestamp is a time in an Event life-cycle.
func (cfg *EventConfig) Timestamp() time.Time {
return cfg.timestamp
}
// StackTrace reports whether stack trace capturing is enabled.
func (cfg *EventConfig) StackTrace() bool {
return cfg.stackTrace
}
// NewEventConfig applies all the EventOptions to a returned EventConfig. If no
// timestamp option is passed, the returned EventConfig will have a Timestamp
// set to the call time, otherwise no validation is performed on the returned
// EventConfig.
func NewEventConfig(options ...EventOption) EventConfig {
var c EventConfig
for _, option := range options {
c = option.applyEvent(c)
}
if c.timestamp.IsZero() {
c.timestamp = time.Now()
}
return c
}
// EventOption applies span event options to an EventConfig.
type EventOption interface {
applyEvent(EventConfig) EventConfig
}
// SpanOption are options that can be used at both the beginning and end of a span.
type SpanOption interface {
SpanStartOption
SpanEndOption
}
// SpanStartEventOption are options that can be used at the start of a span, or with an event.
type SpanStartEventOption interface {
SpanStartOption
EventOption
}
// SpanEndEventOption are options that can be used at the end of a span, or with an event.
type SpanEndEventOption interface {
SpanEndOption
EventOption
}
type attributeOption []attribute.KeyValue
func (o attributeOption) applySpan(c SpanConfig) SpanConfig {
c.attributes = append(c.attributes, []attribute.KeyValue(o)...)
return c
}
func (o attributeOption) applySpanStart(c SpanConfig) SpanConfig { return o.applySpan(c) }
func (o attributeOption) applyEvent(c EventConfig) EventConfig {
c.attributes = append(c.attributes, []attribute.KeyValue(o)...)
return c
}
var _ SpanStartEventOption = attributeOption{}
// WithAttributes adds the attributes related to a span life-cycle event.
// These attributes are used to describe the work a Span represents when this
// option is provided to a Span's start event. Otherwise, these
// attributes provide additional information about the event being recorded
// (e.g. error, state change, processing progress, system event).
//
// If multiple of these options are passed the attributes of each successive
// option will extend the attributes instead of overwriting. There is no
// guarantee of uniqueness in the resulting attributes.
func WithAttributes(attributes ...attribute.KeyValue) SpanStartEventOption {
return attributeOption(attributes)
}
// SpanEventOption are options that can be used with an event or a span.
type SpanEventOption interface {
SpanOption
EventOption
}
type timestampOption time.Time
func (o timestampOption) applySpan(c SpanConfig) SpanConfig {
c.timestamp = time.Time(o)
return c
}
func (o timestampOption) applySpanStart(c SpanConfig) SpanConfig { return o.applySpan(c) }
func (o timestampOption) applySpanEnd(c SpanConfig) SpanConfig { return o.applySpan(c) }
func (o timestampOption) applyEvent(c EventConfig) EventConfig {
c.timestamp = time.Time(o)
return c
}
var _ SpanEventOption = timestampOption{}
// WithTimestamp sets the time of a Span or Event life-cycle moment (e.g.
// started, stopped, errored).
func WithTimestamp(t time.Time) SpanEventOption {
return timestampOption(t)
}
type stackTraceOption bool
func (o stackTraceOption) applyEvent(c EventConfig) EventConfig {
c.stackTrace = bool(o)
return c
}
func (o stackTraceOption) applySpan(c SpanConfig) SpanConfig {
c.stackTrace = bool(o)
return c
}
func (o stackTraceOption) applySpanEnd(c SpanConfig) SpanConfig { return o.applySpan(c) }
// WithStackTrace sets the flag to capture the error with stack trace (e.g. true, false).
func WithStackTrace(b bool) SpanEndEventOption {
return stackTraceOption(b)
}
// WithLinks adds links to a Span. The links are added to the existing Span
// links, i.e. this does not overwrite. Links with invalid span context are ignored.
func WithLinks(links ...Link) SpanStartOption {
return spanOptionFunc(func(cfg SpanConfig) SpanConfig {
cfg.links = append(cfg.links, links...)
return cfg
})
}
// WithNewRoot specifies that the Span should be treated as a root Span. Any
// existing parent span context will be ignored when defining the Span's trace
// identifiers.
func WithNewRoot() SpanStartOption {
return spanOptionFunc(func(cfg SpanConfig) SpanConfig {
cfg.newRoot = true
return cfg
})
}
// WithSpanKind sets the SpanKind of a Span.
func WithSpanKind(kind SpanKind) SpanStartOption {
return spanOptionFunc(func(cfg SpanConfig) SpanConfig {
cfg.spanKind = kind
return cfg
})
}
// WithInstrumentationVersion sets the instrumentation version.
func WithInstrumentationVersion(version string) TracerOption {
return tracerOptionFunc(func(cfg TracerConfig) TracerConfig {
cfg.instrumentationVersion = version
return cfg
})
}
// mergeSets returns the union of keys between a and b. Any duplicate keys will
// use the value associated with b.
func mergeSets(a, b attribute.Set) attribute.Set {
// NewMergeIterator uses the first value for any duplicates.
iter := attribute.NewMergeIterator(&b, &a)
merged := make([]attribute.KeyValue, 0, a.Len()+b.Len())
for iter.Next() {
merged = append(merged, iter.Attribute())
}
return attribute.NewSet(merged...)
}
// WithInstrumentationAttributes adds the instrumentation attributes.
//
// This is equivalent to calling [WithInstrumentationAttributeSet] with an
// [attribute.Set] created from a clone of the passed attributes.
// [WithInstrumentationAttributeSet] is recommended for more control.
//
// If multiple [WithInstrumentationAttributes] or [WithInstrumentationAttributeSet]
// options are passed, the attributes will be merged together in the order
// they are passed. Attributes with duplicate keys will use the last value passed.
func WithInstrumentationAttributes(attr ...attribute.KeyValue) TracerOption {
set := attribute.NewSet(slices.Clone(attr)...)
return WithInstrumentationAttributeSet(set)
}
// WithInstrumentationAttributeSet adds the instrumentation attributes.
//
// If multiple [WithInstrumentationAttributes] or [WithInstrumentationAttributeSet]
// options are passed, the attributes will be merged together in the order
// they are passed. Attributes with duplicate keys will use the last value passed.
func WithInstrumentationAttributeSet(set attribute.Set) TracerOption {
if set.Len() == 0 {
return tracerOptionFunc(func(config TracerConfig) TracerConfig {
return config
})
}
return tracerOptionFunc(func(config TracerConfig) TracerConfig {
if config.attrs.Len() == 0 {
config.attrs = set
} else {
config.attrs = mergeSets(config.attrs, set)
}
return config
})
}
// WithSchemaURL sets the schema URL for the Tracer.
func WithSchemaURL(schemaURL string) TracerOption {
return tracerOptionFunc(func(cfg TracerConfig) TracerConfig {
cfg.schemaURL = schemaURL
return cfg
})
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package trace // import "go.opentelemetry.io/otel/trace"
import "context"
type traceContextKeyType int
const currentSpanKey traceContextKeyType = iota
// ContextWithSpan returns a copy of parent with span set as the current Span.
func ContextWithSpan(parent context.Context, span Span) context.Context {
return context.WithValue(parent, currentSpanKey, span)
}
// ContextWithSpanContext returns a copy of parent with sc as the current
// Span. The Span implementation that wraps sc is non-recording and performs
// no operations other than to return sc as the SpanContext from the
// SpanContext method.
func ContextWithSpanContext(parent context.Context, sc SpanContext) context.Context {
return ContextWithSpan(parent, nonRecordingSpan{sc: sc})
}
// ContextWithRemoteSpanContext returns a copy of parent with rsc set explicitly
// as a remote SpanContext and as the current Span. The Span implementation
// that wraps rsc is non-recording and performs no operations other than to
// return rsc as the SpanContext from the SpanContext method.
func ContextWithRemoteSpanContext(parent context.Context, rsc SpanContext) context.Context {
return ContextWithSpanContext(parent, rsc.WithRemote(true))
}
// SpanFromContext returns the current Span from ctx.
//
// If no Span is currently set in ctx an implementation of a Span that
// performs no operations is returned.
func SpanFromContext(ctx context.Context) Span {
if ctx == nil {
return noopSpanInstance
}
if span, ok := ctx.Value(currentSpanKey).(Span); ok {
return span
}
return noopSpanInstance
}
// SpanContextFromContext returns the current Span's SpanContext.
func SpanContextFromContext(ctx context.Context) SpanContext {
return SpanFromContext(ctx).SpanContext()
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package telemetry // import "go.opentelemetry.io/otel/trace/internal/telemetry"
// Attr is a key-value pair.
type Attr struct {
Key string `json:"key,omitempty"`
Value Value `json:"value,omitempty"`
}
// String returns an Attr for a string value.
func String(key, value string) Attr {
return Attr{key, StringValue(value)}
}
// Int64 returns an Attr for an int64 value.
func Int64(key string, value int64) Attr {
return Attr{key, Int64Value(value)}
}
// Int returns an Attr for an int value.
func Int(key string, value int) Attr {
return Int64(key, int64(value))
}
// Float64 returns an Attr for a float64 value.
func Float64(key string, value float64) Attr {
return Attr{key, Float64Value(value)}
}
// Bool returns an Attr for a bool value.
func Bool(key string, value bool) Attr {
return Attr{key, BoolValue(value)}
}
// Bytes returns an Attr for a []byte value.
// The passed slice must not be changed after it is passed.
func Bytes(key string, value []byte) Attr {
return Attr{key, BytesValue(value)}
}
// Slice returns an Attr for a []Value value.
// The passed slice must not be changed after it is passed.
func Slice(key string, value ...Value) Attr {
return Attr{key, SliceValue(value...)}
}
// Map returns an Attr for a map value.
// The passed slice must not be changed after it is passed.
func Map(key string, value ...Attr) Attr {
return Attr{key, MapValue(value...)}
}
// Equal reports whether a is equal to b.
func (a Attr) Equal(b Attr) bool {
return a.Key == b.Key && a.Value.Equal(b.Value)
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package telemetry // import "go.opentelemetry.io/otel/trace/internal/telemetry"
import (
"encoding/hex"
"errors"
"fmt"
)
const (
traceIDSize = 16
spanIDSize = 8
)
// TraceID is a custom data type that is used for all trace IDs.
type TraceID [traceIDSize]byte
// String returns the hex string representation form of a TraceID.
func (tid TraceID) String() string {
return hex.EncodeToString(tid[:])
}
// IsEmpty reports whether the TraceID contains only zero bytes.
func (tid TraceID) IsEmpty() bool {
return tid == [traceIDSize]byte{}
}
// MarshalJSON converts the trace ID into a hex string enclosed in quotes.
func (tid TraceID) MarshalJSON() ([]byte, error) {
if tid.IsEmpty() {
return []byte(`""`), nil
}
return marshalJSON(tid[:])
}
// UnmarshalJSON inflates the trace ID from hex string, possibly enclosed in
// quotes.
func (tid *TraceID) UnmarshalJSON(data []byte) error {
*tid = [traceIDSize]byte{}
return unmarshalJSON(tid[:], data)
}
// SpanID is a custom data type that is used for all span IDs.
type SpanID [spanIDSize]byte
// String returns the hex string representation form of a SpanID.
func (sid SpanID) String() string {
return hex.EncodeToString(sid[:])
}
// IsEmpty reports whether the SpanID contains only zero bytes.
func (sid SpanID) IsEmpty() bool {
return sid == [spanIDSize]byte{}
}
// MarshalJSON converts span ID into a hex string enclosed in quotes.
func (sid SpanID) MarshalJSON() ([]byte, error) {
if sid.IsEmpty() {
return []byte(`""`), nil
}
return marshalJSON(sid[:])
}
// UnmarshalJSON decodes span ID from hex string, possibly enclosed in quotes.
func (sid *SpanID) UnmarshalJSON(data []byte) error {
*sid = [spanIDSize]byte{}
return unmarshalJSON(sid[:], data)
}
// marshalJSON converts id into a hex string enclosed in quotes.
func marshalJSON(id []byte) ([]byte, error) {
// Plus 2 quote chars at the start and end.
hexLen := hex.EncodedLen(len(id)) + 2
b := make([]byte, hexLen)
hex.Encode(b[1:hexLen-1], id)
b[0], b[hexLen-1] = '"', '"'
return b, nil
}
// unmarshalJSON inflates trace id from hex string, possibly enclosed in quotes.
func unmarshalJSON(dst, src []byte) error {
if l := len(src); l >= 2 && src[0] == '"' && src[l-1] == '"' {
src = src[1 : l-1]
}
nLen := len(src)
if nLen == 0 {
return nil
}
if len(dst) != hex.DecodedLen(nLen) {
return errors.New("invalid length for ID")
}
_, err := hex.Decode(dst, src)
if err != nil {
return fmt.Errorf("cannot unmarshal ID from string '%s': %w", string(src), err)
}
return nil
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package telemetry // import "go.opentelemetry.io/otel/trace/internal/telemetry"
import (
"encoding/json"
"strconv"
)
// protoInt64 represents the protobuf encoding of integers which can be either
// strings or integers.
type protoInt64 int64
// Int64 returns the protoInt64 as an int64.
func (i *protoInt64) Int64() int64 { return int64(*i) }
// UnmarshalJSON decodes both strings and integers.
func (i *protoInt64) UnmarshalJSON(data []byte) error {
if data[0] == '"' {
var str string
if err := json.Unmarshal(data, &str); err != nil {
return err
}
parsedInt, err := strconv.ParseInt(str, 10, 64)
if err != nil {
return err
}
*i = protoInt64(parsedInt)
} else {
var parsedInt int64
if err := json.Unmarshal(data, &parsedInt); err != nil {
return err
}
*i = protoInt64(parsedInt)
}
return nil
}
// protoUint64 represents the protobuf encoding of integers which can be either
// strings or integers.
type protoUint64 uint64
// Int64 returns the protoUint64 as a uint64.
func (i *protoUint64) Uint64() uint64 { return uint64(*i) }
// UnmarshalJSON decodes both strings and integers.
func (i *protoUint64) UnmarshalJSON(data []byte) error {
if data[0] == '"' {
var str string
if err := json.Unmarshal(data, &str); err != nil {
return err
}
parsedUint, err := strconv.ParseUint(str, 10, 64)
if err != nil {
return err
}
*i = protoUint64(parsedUint)
} else {
var parsedUint uint64
if err := json.Unmarshal(data, &parsedUint); err != nil {
return err
}
*i = protoUint64(parsedUint)
}
return nil
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package telemetry // import "go.opentelemetry.io/otel/trace/internal/telemetry"
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
)
// Resource information.
type Resource struct {
// Attrs are the set of attributes that describe the resource. Attribute
// keys MUST be unique (it is not allowed to have more than one attribute
// with the same key).
Attrs []Attr `json:"attributes,omitempty"`
// DroppedAttrs is the number of dropped attributes. If the value
// is 0, then no attributes were dropped.
DroppedAttrs uint32 `json:"droppedAttributesCount,omitempty"`
}
// UnmarshalJSON decodes the OTLP formatted JSON contained in data into r.
func (r *Resource) UnmarshalJSON(data []byte) error {
decoder := json.NewDecoder(bytes.NewReader(data))
t, err := decoder.Token()
if err != nil {
return err
}
if t != json.Delim('{') {
return errors.New("invalid Resource type")
}
for decoder.More() {
keyIface, err := decoder.Token()
if err != nil {
if errors.Is(err, io.EOF) {
// Empty.
return nil
}
return err
}
key, ok := keyIface.(string)
if !ok {
return fmt.Errorf("invalid Resource field: %#v", keyIface)
}
switch key {
case "attributes":
err = decoder.Decode(&r.Attrs)
case "droppedAttributesCount", "dropped_attributes_count":
err = decoder.Decode(&r.DroppedAttrs)
default:
// Skip unknown.
}
if err != nil {
return err
}
}
return nil
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package telemetry // import "go.opentelemetry.io/otel/trace/internal/telemetry"
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
)
// Scope is the identifying values of the instrumentation scope.
type Scope struct {
Name string `json:"name,omitempty"`
Version string `json:"version,omitempty"`
Attrs []Attr `json:"attributes,omitempty"`
DroppedAttrs uint32 `json:"droppedAttributesCount,omitempty"`
}
// UnmarshalJSON decodes the OTLP formatted JSON contained in data into r.
func (s *Scope) UnmarshalJSON(data []byte) error {
decoder := json.NewDecoder(bytes.NewReader(data))
t, err := decoder.Token()
if err != nil {
return err
}
if t != json.Delim('{') {
return errors.New("invalid Scope type")
}
for decoder.More() {
keyIface, err := decoder.Token()
if err != nil {
if errors.Is(err, io.EOF) {
// Empty.
return nil
}
return err
}
key, ok := keyIface.(string)
if !ok {
return fmt.Errorf("invalid Scope field: %#v", keyIface)
}
switch key {
case "name":
err = decoder.Decode(&s.Name)
case "version":
err = decoder.Decode(&s.Version)
case "attributes":
err = decoder.Decode(&s.Attrs)
case "droppedAttributesCount", "dropped_attributes_count":
err = decoder.Decode(&s.DroppedAttrs)
default:
// Skip unknown.
}
if err != nil {
return err
}
}
return nil
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package telemetry // import "go.opentelemetry.io/otel/trace/internal/telemetry"
import (
"bytes"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"time"
)
// A Span represents a single operation performed by a single component of the
// system.
type Span struct {
// A unique identifier for a trace. All spans from the same trace share
// the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes OR
// of length other than 16 bytes is considered invalid (empty string in OTLP/JSON
// is zero-length and thus is also invalid).
//
// This field is required.
TraceID TraceID `json:"traceId,omitempty"`
// A unique identifier for a span within a trace, assigned when the span
// is created. The ID is an 8-byte array. An ID with all zeroes OR of length
// other than 8 bytes is considered invalid (empty string in OTLP/JSON
// is zero-length and thus is also invalid).
//
// This field is required.
SpanID SpanID `json:"spanId,omitempty"`
// trace_state conveys information about request position in multiple distributed tracing graphs.
// It is a trace_state in w3c-trace-context format: https://www.w3.org/TR/trace-context/#tracestate-header
// See also https://github.com/w3c/distributed-tracing for more details about this field.
TraceState string `json:"traceState,omitempty"`
// The `span_id` of this span's parent span. If this is a root span, then this
// field must be empty. The ID is an 8-byte array.
ParentSpanID SpanID `json:"parentSpanId,omitempty"`
// Flags, a bit field.
//
// Bits 0-7 (8 least significant bits) are the trace flags as defined in W3C Trace
// Context specification. To read the 8-bit W3C trace flag, use
// `flags & SPAN_FLAGS_TRACE_FLAGS_MASK`.
//
// See https://www.w3.org/TR/trace-context-2/#trace-flags for the flag definitions.
//
// Bits 8 and 9 represent the 3 states of whether a span's parent
// is remote. The states are (unknown, is not remote, is remote).
// To read whether the value is known, use `(flags & SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK) != 0`.
// To read whether the span is remote, use `(flags & SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK) != 0`.
//
// When creating span messages, if the message is logically forwarded from another source
// with an equivalent flags fields (i.e., usually another OTLP span message), the field SHOULD
// be copied as-is. If creating from a source that does not have an equivalent flags field
// (such as a runtime representation of an OpenTelemetry span), the high 22 bits MUST
// be set to zero.
// Readers MUST NOT assume that bits 10-31 (22 most significant bits) will be zero.
//
// [Optional].
Flags uint32 `json:"flags,omitempty"`
// A description of the span's operation.
//
// For example, the name can be a qualified method name or a file name
// and a line number where the operation is called. A best practice is to use
// the same display name at the same call point in an application.
// This makes it easier to correlate spans in different traces.
//
// This field is semantically required to be set to non-empty string.
// Empty value is equivalent to an unknown span name.
//
// This field is required.
Name string `json:"name"`
// Distinguishes between spans generated in a particular context. For example,
// two spans with the same name may be distinguished using `CLIENT` (caller)
// and `SERVER` (callee) to identify queueing latency associated with the span.
Kind SpanKind `json:"kind,omitempty"`
// start_time_unix_nano is the start time of the span. On the client side, this is the time
// kept by the local machine where the span execution starts. On the server side, this
// is the time when the server's application handler starts running.
// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970.
//
// This field is semantically required and it is expected that end_time >= start_time.
StartTime time.Time `json:"startTimeUnixNano,omitempty"`
// end_time_unix_nano is the end time of the span. On the client side, this is the time
// kept by the local machine where the span execution ends. On the server side, this
// is the time when the server application handler stops running.
// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970.
//
// This field is semantically required and it is expected that end_time >= start_time.
EndTime time.Time `json:"endTimeUnixNano,omitempty"`
// attributes is a collection of key/value pairs. Note, global attributes
// like server name can be set using the resource API. Examples of attributes:
//
// "/http/user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"
// "/http/server_latency": 300
// "example.com/myattribute": true
// "example.com/score": 10.239
//
// The OpenTelemetry API specification further restricts the allowed value types:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/README.md#attribute
// Attribute keys MUST be unique (it is not allowed to have more than one
// attribute with the same key).
Attrs []Attr `json:"attributes,omitempty"`
// dropped_attributes_count is the number of attributes that were discarded. Attributes
// can be discarded because their keys are too long or because there are too many
// attributes. If this value is 0, then no attributes were dropped.
DroppedAttrs uint32 `json:"droppedAttributesCount,omitempty"`
// events is a collection of Event items.
Events []*SpanEvent `json:"events,omitempty"`
// dropped_events_count is the number of dropped events. If the value is 0, then no
// events were dropped.
DroppedEvents uint32 `json:"droppedEventsCount,omitempty"`
// links is a collection of Links, which are references from this span to a span
// in the same or different trace.
Links []*SpanLink `json:"links,omitempty"`
// dropped_links_count is the number of dropped links after the maximum size was
// enforced. If this value is 0, then no links were dropped.
DroppedLinks uint32 `json:"droppedLinksCount,omitempty"`
// An optional final status for this span. Semantically when Status isn't set, it means
// span's status code is unset, i.e. assume STATUS_CODE_UNSET (code = 0).
Status *Status `json:"status,omitempty"`
}
// MarshalJSON encodes s into OTLP formatted JSON.
func (s Span) MarshalJSON() ([]byte, error) {
startT := s.StartTime.UnixNano()
if s.StartTime.IsZero() || startT < 0 {
startT = 0
}
endT := s.EndTime.UnixNano()
if s.EndTime.IsZero() || endT < 0 {
endT = 0
}
// Override non-empty default SpanID marshal and omitempty.
var parentSpanId string
if !s.ParentSpanID.IsEmpty() {
b := make([]byte, hex.EncodedLen(spanIDSize))
hex.Encode(b, s.ParentSpanID[:])
parentSpanId = string(b)
}
type Alias Span
return json.Marshal(struct {
Alias
ParentSpanID string `json:"parentSpanId,omitempty"`
StartTime uint64 `json:"startTimeUnixNano,omitempty"`
EndTime uint64 `json:"endTimeUnixNano,omitempty"`
}{
Alias: Alias(s),
ParentSpanID: parentSpanId,
StartTime: uint64(startT), // nolint:gosec // >0 checked above.
EndTime: uint64(endT), // nolint:gosec // >0 checked above.
})
}
// UnmarshalJSON decodes the OTLP formatted JSON contained in data into s.
func (s *Span) UnmarshalJSON(data []byte) error {
decoder := json.NewDecoder(bytes.NewReader(data))
t, err := decoder.Token()
if err != nil {
return err
}
if t != json.Delim('{') {
return errors.New("invalid Span type")
}
for decoder.More() {
keyIface, err := decoder.Token()
if err != nil {
if errors.Is(err, io.EOF) {
// Empty.
return nil
}
return err
}
key, ok := keyIface.(string)
if !ok {
return fmt.Errorf("invalid Span field: %#v", keyIface)
}
switch key {
case "traceId", "trace_id":
err = decoder.Decode(&s.TraceID)
case "spanId", "span_id":
err = decoder.Decode(&s.SpanID)
case "traceState", "trace_state":
err = decoder.Decode(&s.TraceState)
case "parentSpanId", "parent_span_id":
err = decoder.Decode(&s.ParentSpanID)
case "flags":
err = decoder.Decode(&s.Flags)
case "name":
err = decoder.Decode(&s.Name)
case "kind":
err = decoder.Decode(&s.Kind)
case "startTimeUnixNano", "start_time_unix_nano":
var val protoUint64
err = decoder.Decode(&val)
v := int64(min(val.Uint64(), math.MaxInt64)) // nolint: gosec // Overflow checked.
s.StartTime = time.Unix(0, v)
case "endTimeUnixNano", "end_time_unix_nano":
var val protoUint64
err = decoder.Decode(&val)
v := int64(min(val.Uint64(), math.MaxInt64)) // nolint: gosec // Overflow checked.
s.EndTime = time.Unix(0, v)
case "attributes":
err = decoder.Decode(&s.Attrs)
case "droppedAttributesCount", "dropped_attributes_count":
err = decoder.Decode(&s.DroppedAttrs)
case "events":
err = decoder.Decode(&s.Events)
case "droppedEventsCount", "dropped_events_count":
err = decoder.Decode(&s.DroppedEvents)
case "links":
err = decoder.Decode(&s.Links)
case "droppedLinksCount", "dropped_links_count":
err = decoder.Decode(&s.DroppedLinks)
case "status":
err = decoder.Decode(&s.Status)
default:
// Skip unknown.
}
if err != nil {
return err
}
}
return nil
}
// SpanFlags represents constants used to interpret the
// Span.flags field, which is protobuf 'fixed32' type and is to
// be used as bit-fields. Each non-zero value defined in this enum is
// a bit-mask. To extract the bit-field, for example, use an
// expression like:
//
// (span.flags & SPAN_FLAGS_TRACE_FLAGS_MASK)
//
// See https://www.w3.org/TR/trace-context-2/#trace-flags for the flag definitions.
//
// Note that Span flags were introduced in version 1.1 of the
// OpenTelemetry protocol. Older Span producers do not set this
// field, consequently consumers should not rely on the absence of a
// particular flag bit to indicate the presence of a particular feature.
type SpanFlags int32
const (
// SpanFlagsTraceFlagsMask is a mask for trace-flags.
//
// Bits 0-7 are used for trace flags.
SpanFlagsTraceFlagsMask SpanFlags = 255
// SpanFlagsContextHasIsRemoteMask is a mask for HAS_IS_REMOTE status.
//
// Bits 8 and 9 are used to indicate that the parent span or link span is
// remote. Bit 8 (`HAS_IS_REMOTE`) indicates whether the value is known.
SpanFlagsContextHasIsRemoteMask SpanFlags = 256
// SpanFlagsContextIsRemoteMask is a mask for IS_REMOTE status.
//
// Bits 8 and 9 are used to indicate that the parent span or link span is
// remote. Bit 9 (`IS_REMOTE`) indicates whether the span or link is
// remote.
SpanFlagsContextIsRemoteMask SpanFlags = 512
)
// SpanKind is the type of span. Can be used to specify additional relationships between spans
// in addition to a parent/child relationship.
type SpanKind int32
const (
// SpanKindInternal indicates that the span represents an internal
// operation within an application, as opposed to an operation happening at
// the boundaries.
SpanKindInternal SpanKind = 1
// SpanKindServer indicates that the span covers server-side handling of an
// RPC or other remote network request.
SpanKindServer SpanKind = 2
// SpanKindClient indicates that the span describes a request to some
// remote service.
SpanKindClient SpanKind = 3
// SpanKindProducer indicates that the span describes a producer sending a
// message to a broker. Unlike SpanKindClient and SpanKindServer, there is
// often no direct critical path latency relationship between producer and
// consumer spans. A SpanKindProducer span ends when the message was
// accepted by the broker while the logical processing of the message might
// span a much longer time.
SpanKindProducer SpanKind = 4
// SpanKindConsumer indicates that the span describes a consumer receiving
// a message from a broker. Like SpanKindProducer, there is often no direct
// critical path latency relationship between producer and consumer spans.
SpanKindConsumer SpanKind = 5
)
// SpanEvent is a time-stamped annotation of the span, consisting of
// user-supplied text description and key-value pairs.
type SpanEvent struct {
// time_unix_nano is the time the event occurred.
Time time.Time `json:"timeUnixNano,omitempty"`
// name of the event.
// This field is semantically required to be set to non-empty string.
Name string `json:"name,omitempty"`
// attributes is a collection of attribute key/value pairs on the event.
// Attribute keys MUST be unique (it is not allowed to have more than one
// attribute with the same key).
Attrs []Attr `json:"attributes,omitempty"`
// dropped_attributes_count is the number of dropped attributes. If the value is 0,
// then no attributes were dropped.
DroppedAttrs uint32 `json:"droppedAttributesCount,omitempty"`
}
// MarshalJSON encodes e into OTLP formatted JSON.
func (e SpanEvent) MarshalJSON() ([]byte, error) {
t := e.Time.UnixNano()
if e.Time.IsZero() || t < 0 {
t = 0
}
type Alias SpanEvent
return json.Marshal(struct {
Alias
Time uint64 `json:"timeUnixNano,omitempty"`
}{
Alias: Alias(e),
Time: uint64(t), // nolint: gosec // >0 checked above
})
}
// UnmarshalJSON decodes the OTLP formatted JSON contained in data into se.
func (se *SpanEvent) UnmarshalJSON(data []byte) error {
decoder := json.NewDecoder(bytes.NewReader(data))
t, err := decoder.Token()
if err != nil {
return err
}
if t != json.Delim('{') {
return errors.New("invalid SpanEvent type")
}
for decoder.More() {
keyIface, err := decoder.Token()
if err != nil {
if errors.Is(err, io.EOF) {
// Empty.
return nil
}
return err
}
key, ok := keyIface.(string)
if !ok {
return fmt.Errorf("invalid SpanEvent field: %#v", keyIface)
}
switch key {
case "timeUnixNano", "time_unix_nano":
var val protoUint64
err = decoder.Decode(&val)
v := int64(min(val.Uint64(), math.MaxInt64)) // nolint: gosec // Overflow checked.
se.Time = time.Unix(0, v)
case "name":
err = decoder.Decode(&se.Name)
case "attributes":
err = decoder.Decode(&se.Attrs)
case "droppedAttributesCount", "dropped_attributes_count":
err = decoder.Decode(&se.DroppedAttrs)
default:
// Skip unknown.
}
if err != nil {
return err
}
}
return nil
}
// SpanLink is a reference from the current span to another span in the same
// trace or in a different trace. For example, this can be used in batching
// operations, where a single batch handler processes multiple requests from
// different traces or when the handler receives a request from a different
// project.
type SpanLink struct {
// A unique identifier of a trace that this linked span is part of. The ID is a
// 16-byte array.
TraceID TraceID `json:"traceId,omitempty"`
// A unique identifier for the linked span. The ID is an 8-byte array.
SpanID SpanID `json:"spanId,omitempty"`
// The trace_state associated with the link.
TraceState string `json:"traceState,omitempty"`
// attributes is a collection of attribute key/value pairs on the link.
// Attribute keys MUST be unique (it is not allowed to have more than one
// attribute with the same key).
Attrs []Attr `json:"attributes,omitempty"`
// dropped_attributes_count is the number of dropped attributes. If the value is 0,
// then no attributes were dropped.
DroppedAttrs uint32 `json:"droppedAttributesCount,omitempty"`
// Flags, a bit field.
//
// Bits 0-7 (8 least significant bits) are the trace flags as defined in W3C Trace
// Context specification. To read the 8-bit W3C trace flag, use
// `flags & SPAN_FLAGS_TRACE_FLAGS_MASK`.
//
// See https://www.w3.org/TR/trace-context-2/#trace-flags for the flag definitions.
//
// Bits 8 and 9 represent the 3 states of whether the link is remote.
// The states are (unknown, is not remote, is remote).
// To read whether the value is known, use `(flags & SPAN_FLAGS_CONTEXT_HAS_IS_REMOTE_MASK) != 0`.
// To read whether the link is remote, use `(flags & SPAN_FLAGS_CONTEXT_IS_REMOTE_MASK) != 0`.
//
// Readers MUST NOT assume that bits 10-31 (22 most significant bits) will be zero.
// When creating new spans, bits 10-31 (most-significant 22-bits) MUST be zero.
//
// [Optional].
Flags uint32 `json:"flags,omitempty"`
}
// UnmarshalJSON decodes the OTLP formatted JSON contained in data into sl.
func (sl *SpanLink) UnmarshalJSON(data []byte) error {
decoder := json.NewDecoder(bytes.NewReader(data))
t, err := decoder.Token()
if err != nil {
return err
}
if t != json.Delim('{') {
return errors.New("invalid SpanLink type")
}
for decoder.More() {
keyIface, err := decoder.Token()
if err != nil {
if errors.Is(err, io.EOF) {
// Empty.
return nil
}
return err
}
key, ok := keyIface.(string)
if !ok {
return fmt.Errorf("invalid SpanLink field: %#v", keyIface)
}
switch key {
case "traceId", "trace_id":
err = decoder.Decode(&sl.TraceID)
case "spanId", "span_id":
err = decoder.Decode(&sl.SpanID)
case "traceState", "trace_state":
err = decoder.Decode(&sl.TraceState)
case "attributes":
err = decoder.Decode(&sl.Attrs)
case "droppedAttributesCount", "dropped_attributes_count":
err = decoder.Decode(&sl.DroppedAttrs)
case "flags":
err = decoder.Decode(&sl.Flags)
default:
// Skip unknown.
}
if err != nil {
return err
}
}
return nil
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package telemetry // import "go.opentelemetry.io/otel/trace/internal/telemetry"
// StatusCode is the status of a Span.
//
// For the semantics of status codes see
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#set-status
type StatusCode int32
const (
// StatusCodeUnset is the default status.
StatusCodeUnset StatusCode = 0
// StatusCodeOK is used when the Span has been validated by an Application
// developer or Operator to have completed successfully.
StatusCodeOK StatusCode = 1
// StatusCodeError is used when the Span contains an error.
StatusCodeError StatusCode = 2
)
var statusCodeStrings = []string{
"Unset",
"OK",
"Error",
}
func (s StatusCode) String() string {
if s >= 0 && int(s) < len(statusCodeStrings) {
return statusCodeStrings[s]
}
return "<unknown telemetry.StatusCode>"
}
// Status defines a logical error model that is suitable for different
// programming environments, including REST APIs and RPC APIs.
type Status struct {
// A developer-facing human readable error message.
Message string `json:"message,omitempty"`
// The status code.
Code StatusCode `json:"code,omitempty"`
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package telemetry // import "go.opentelemetry.io/otel/trace/internal/telemetry"
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
)
// Traces represents the traces data that can be stored in a persistent storage,
// OR can be embedded by other protocols that transfer OTLP traces data but do
// not implement the OTLP protocol.
//
// The main difference between this message and collector protocol is that
// in this message there will not be any "control" or "metadata" specific to
// OTLP protocol.
//
// When new fields are added into this message, the OTLP request MUST be updated
// as well.
type Traces struct {
// An array of ResourceSpans.
// For data coming from a single resource this array will typically contain
// one element. Intermediary nodes that receive data from multiple origins
// typically batch the data before forwarding further and in that case this
// array will contain multiple elements.
ResourceSpans []*ResourceSpans `json:"resourceSpans,omitempty"`
}
// UnmarshalJSON decodes the OTLP formatted JSON contained in data into td.
func (td *Traces) UnmarshalJSON(data []byte) error {
decoder := json.NewDecoder(bytes.NewReader(data))
t, err := decoder.Token()
if err != nil {
return err
}
if t != json.Delim('{') {
return errors.New("invalid TracesData type")
}
for decoder.More() {
keyIface, err := decoder.Token()
if err != nil {
if errors.Is(err, io.EOF) {
// Empty.
return nil
}
return err
}
key, ok := keyIface.(string)
if !ok {
return fmt.Errorf("invalid TracesData field: %#v", keyIface)
}
switch key {
case "resourceSpans", "resource_spans":
err = decoder.Decode(&td.ResourceSpans)
default:
// Skip unknown.
}
if err != nil {
return err
}
}
return nil
}
// ResourceSpans is a collection of ScopeSpans from a Resource.
type ResourceSpans struct {
// The resource for the spans in this message.
// If this field is not set then no resource info is known.
Resource Resource `json:"resource"`
// A list of ScopeSpans that originate from a resource.
ScopeSpans []*ScopeSpans `json:"scopeSpans,omitempty"`
// This schema_url applies to the data in the "resource" field. It does not apply
// to the data in the "scope_spans" field which have their own schema_url field.
SchemaURL string `json:"schemaUrl,omitempty"`
}
// UnmarshalJSON decodes the OTLP formatted JSON contained in data into rs.
func (rs *ResourceSpans) UnmarshalJSON(data []byte) error {
decoder := json.NewDecoder(bytes.NewReader(data))
t, err := decoder.Token()
if err != nil {
return err
}
if t != json.Delim('{') {
return errors.New("invalid ResourceSpans type")
}
for decoder.More() {
keyIface, err := decoder.Token()
if err != nil {
if errors.Is(err, io.EOF) {
// Empty.
return nil
}
return err
}
key, ok := keyIface.(string)
if !ok {
return fmt.Errorf("invalid ResourceSpans field: %#v", keyIface)
}
switch key {
case "resource":
err = decoder.Decode(&rs.Resource)
case "scopeSpans", "scope_spans":
err = decoder.Decode(&rs.ScopeSpans)
case "schemaUrl", "schema_url":
err = decoder.Decode(&rs.SchemaURL)
default:
// Skip unknown.
}
if err != nil {
return err
}
}
return nil
}
// ScopeSpans is a collection of Spans produced by an InstrumentationScope.
type ScopeSpans struct {
// The instrumentation scope information for the spans in this message.
// Semantically when InstrumentationScope isn't set, it is equivalent with
// an empty instrumentation scope name (unknown).
Scope *Scope `json:"scope"`
// A list of Spans that originate from an instrumentation scope.
Spans []*Span `json:"spans,omitempty"`
// The Schema URL, if known. This is the identifier of the Schema that the span data
// is recorded in. To learn more about Schema URL see
// https://opentelemetry.io/docs/specs/otel/schemas/#schema-url
// This schema_url applies to all spans and span events in the "spans" field.
SchemaURL string `json:"schemaUrl,omitempty"`
}
// UnmarshalJSON decodes the OTLP formatted JSON contained in data into ss.
func (ss *ScopeSpans) UnmarshalJSON(data []byte) error {
decoder := json.NewDecoder(bytes.NewReader(data))
t, err := decoder.Token()
if err != nil {
return err
}
if t != json.Delim('{') {
return errors.New("invalid ScopeSpans type")
}
for decoder.More() {
keyIface, err := decoder.Token()
if err != nil {
if errors.Is(err, io.EOF) {
// Empty.
return nil
}
return err
}
key, ok := keyIface.(string)
if !ok {
return fmt.Errorf("invalid ScopeSpans field: %#v", keyIface)
}
switch key {
case "scope":
err = decoder.Decode(&ss.Scope)
case "spans":
err = decoder.Decode(&ss.Spans)
case "schemaUrl", "schema_url":
err = decoder.Decode(&ss.SchemaURL)
default:
// Skip unknown.
}
if err != nil {
return err
}
}
return nil
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package telemetry // import "go.opentelemetry.io/otel/trace/internal/telemetry"
import (
"bytes"
"cmp"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"slices"
"strconv"
"unsafe"
)
// A Value represents a structured value.
// A zero value is valid and represents an empty value.
type Value struct {
// Ensure forward compatibility by explicitly making this not comparable.
noCmp [0]func() //nolint: unused // This is indeed used.
// num holds the value for Int64, Float64, and Bool. It holds the length
// for String, Bytes, Slice, Map.
num uint64
// any holds either the KindBool, KindInt64, KindFloat64, stringptr,
// bytesptr, sliceptr, or mapptr. If KindBool, KindInt64, or KindFloat64
// then the value of Value is in num as described above. Otherwise, it
// contains the value wrapped in the appropriate type.
any any
}
type (
// sliceptr represents a value in Value.any for KindString Values.
stringptr *byte
// bytesptr represents a value in Value.any for KindBytes Values.
bytesptr *byte
// sliceptr represents a value in Value.any for KindSlice Values.
sliceptr *Value
// mapptr represents a value in Value.any for KindMap Values.
mapptr *Attr
)
// ValueKind is the kind of a [Value].
type ValueKind int
// ValueKind values.
const (
ValueKindEmpty ValueKind = iota
ValueKindBool
ValueKindFloat64
ValueKindInt64
ValueKindString
ValueKindBytes
ValueKindSlice
ValueKindMap
)
var valueKindStrings = []string{
"Empty",
"Bool",
"Float64",
"Int64",
"String",
"Bytes",
"Slice",
"Map",
}
func (k ValueKind) String() string {
if k >= 0 && int(k) < len(valueKindStrings) {
return valueKindStrings[k]
}
return "<unknown telemetry.ValueKind>"
}
// StringValue returns a new [Value] for a string.
func StringValue(v string) Value {
return Value{
num: uint64(len(v)),
any: stringptr(unsafe.StringData(v)),
}
}
// IntValue returns a [Value] for an int.
func IntValue(v int) Value { return Int64Value(int64(v)) }
// Int64Value returns a [Value] for an int64.
func Int64Value(v int64) Value {
return Value{
num: uint64(v), // nolint: gosec // Store raw bytes.
any: ValueKindInt64,
}
}
// Float64Value returns a [Value] for a float64.
func Float64Value(v float64) Value {
return Value{num: math.Float64bits(v), any: ValueKindFloat64}
}
// BoolValue returns a [Value] for a bool.
func BoolValue(v bool) Value { //nolint:revive // Not a control flag.
var n uint64
if v {
n = 1
}
return Value{num: n, any: ValueKindBool}
}
// BytesValue returns a [Value] for a byte slice. The passed slice must not be
// changed after it is passed.
func BytesValue(v []byte) Value {
return Value{
num: uint64(len(v)),
any: bytesptr(unsafe.SliceData(v)),
}
}
// SliceValue returns a [Value] for a slice of [Value]. The passed slice must
// not be changed after it is passed.
func SliceValue(vs ...Value) Value {
return Value{
num: uint64(len(vs)),
any: sliceptr(unsafe.SliceData(vs)),
}
}
// MapValue returns a new [Value] for a slice of key-value pairs. The passed
// slice must not be changed after it is passed.
func MapValue(kvs ...Attr) Value {
return Value{
num: uint64(len(kvs)),
any: mapptr(unsafe.SliceData(kvs)),
}
}
// AsString returns the value held by v as a string.
func (v Value) AsString() string {
if sp, ok := v.any.(stringptr); ok {
return unsafe.String(sp, v.num)
}
// TODO: error handle
return ""
}
// asString returns the value held by v as a string. It will panic if the Value
// is not KindString.
func (v Value) asString() string {
return unsafe.String(v.any.(stringptr), v.num)
}
// AsInt64 returns the value held by v as an int64.
func (v Value) AsInt64() int64 {
if v.Kind() != ValueKindInt64 {
// TODO: error handle
return 0
}
return v.asInt64()
}
// asInt64 returns the value held by v as an int64. If v is not of KindInt64,
// this will return garbage.
func (v Value) asInt64() int64 {
// Assumes v.num was a valid int64 (overflow not checked).
return int64(v.num) // nolint: gosec
}
// AsBool returns the value held by v as a bool.
func (v Value) AsBool() bool {
if v.Kind() != ValueKindBool {
// TODO: error handle
return false
}
return v.asBool()
}
// asBool returns the value held by v as a bool. If v is not of KindBool, this
// will return garbage.
func (v Value) asBool() bool { return v.num == 1 }
// AsFloat64 returns the value held by v as a float64.
func (v Value) AsFloat64() float64 {
if v.Kind() != ValueKindFloat64 {
// TODO: error handle
return 0
}
return v.asFloat64()
}
// asFloat64 returns the value held by v as a float64. If v is not of
// KindFloat64, this will return garbage.
func (v Value) asFloat64() float64 { return math.Float64frombits(v.num) }
// AsBytes returns the value held by v as a []byte.
func (v Value) AsBytes() []byte {
if sp, ok := v.any.(bytesptr); ok {
return unsafe.Slice((*byte)(sp), v.num)
}
// TODO: error handle
return nil
}
// asBytes returns the value held by v as a []byte. It will panic if the Value
// is not KindBytes.
func (v Value) asBytes() []byte {
return unsafe.Slice((*byte)(v.any.(bytesptr)), v.num)
}
// AsSlice returns the value held by v as a []Value.
func (v Value) AsSlice() []Value {
if sp, ok := v.any.(sliceptr); ok {
return unsafe.Slice((*Value)(sp), v.num)
}
// TODO: error handle
return nil
}
// asSlice returns the value held by v as a []Value. It will panic if the Value
// is not KindSlice.
func (v Value) asSlice() []Value {
return unsafe.Slice((*Value)(v.any.(sliceptr)), v.num)
}
// AsMap returns the value held by v as a []Attr.
func (v Value) AsMap() []Attr {
if sp, ok := v.any.(mapptr); ok {
return unsafe.Slice((*Attr)(sp), v.num)
}
// TODO: error handle
return nil
}
// asMap returns the value held by v as a []Attr. It will panic if the
// Value is not KindMap.
func (v Value) asMap() []Attr {
return unsafe.Slice((*Attr)(v.any.(mapptr)), v.num)
}
// Kind returns the Kind of v.
func (v Value) Kind() ValueKind {
switch x := v.any.(type) {
case ValueKind:
return x
case stringptr:
return ValueKindString
case bytesptr:
return ValueKindBytes
case sliceptr:
return ValueKindSlice
case mapptr:
return ValueKindMap
default:
return ValueKindEmpty
}
}
// Empty reports whether v does not hold any value.
func (v Value) Empty() bool { return v.Kind() == ValueKindEmpty }
// Equal reports whether v is equal to w.
func (v Value) Equal(w Value) bool {
k1 := v.Kind()
k2 := w.Kind()
if k1 != k2 {
return false
}
switch k1 {
case ValueKindInt64, ValueKindBool:
return v.num == w.num
case ValueKindString:
return v.asString() == w.asString()
case ValueKindFloat64:
return v.asFloat64() == w.asFloat64()
case ValueKindSlice:
return slices.EqualFunc(v.asSlice(), w.asSlice(), Value.Equal)
case ValueKindMap:
sv := sortMap(v.asMap())
sw := sortMap(w.asMap())
return slices.EqualFunc(sv, sw, Attr.Equal)
case ValueKindBytes:
return bytes.Equal(v.asBytes(), w.asBytes())
case ValueKindEmpty:
return true
default:
// TODO: error handle
return false
}
}
func sortMap(m []Attr) []Attr {
sm := make([]Attr, len(m))
copy(sm, m)
slices.SortFunc(sm, func(a, b Attr) int {
return cmp.Compare(a.Key, b.Key)
})
return sm
}
// String returns Value's value as a string, formatted like [fmt.Sprint].
//
// The returned string is meant for debugging;
// the string representation is not stable.
func (v Value) String() string {
switch v.Kind() {
case ValueKindString:
return v.asString()
case ValueKindInt64:
// Assumes v.num was a valid int64 (overflow not checked).
return strconv.FormatInt(int64(v.num), 10) // nolint: gosec
case ValueKindFloat64:
return strconv.FormatFloat(v.asFloat64(), 'g', -1, 64)
case ValueKindBool:
return strconv.FormatBool(v.asBool())
case ValueKindBytes:
return string(v.asBytes())
case ValueKindMap:
return fmt.Sprint(v.asMap())
case ValueKindSlice:
return fmt.Sprint(v.asSlice())
case ValueKindEmpty:
return "<nil>"
default:
// Try to handle this as gracefully as possible.
//
// Don't panic here. The goal here is to have developers find this
// first if a slog.Kind is is not handled. It is
// preferable to have user's open issue asking why their attributes
// have a "unhandled: " prefix than say that their code is panicking.
return fmt.Sprintf("<unhandled telemetry.ValueKind: %s>", v.Kind())
}
}
// MarshalJSON encodes v into OTLP formatted JSON.
func (v *Value) MarshalJSON() ([]byte, error) {
switch v.Kind() {
case ValueKindString:
return json.Marshal(struct {
Value string `json:"stringValue"`
}{v.asString()})
case ValueKindInt64:
return json.Marshal(struct {
Value string `json:"intValue"`
}{strconv.FormatInt(int64(v.num), 10)}) // nolint: gosec // From raw bytes.
case ValueKindFloat64:
return json.Marshal(struct {
Value float64 `json:"doubleValue"`
}{v.asFloat64()})
case ValueKindBool:
return json.Marshal(struct {
Value bool `json:"boolValue"`
}{v.asBool()})
case ValueKindBytes:
return json.Marshal(struct {
Value []byte `json:"bytesValue"`
}{v.asBytes()})
case ValueKindMap:
return json.Marshal(struct {
Value struct {
Values []Attr `json:"values"`
} `json:"kvlistValue"`
}{struct {
Values []Attr `json:"values"`
}{v.asMap()}})
case ValueKindSlice:
return json.Marshal(struct {
Value struct {
Values []Value `json:"values"`
} `json:"arrayValue"`
}{struct {
Values []Value `json:"values"`
}{v.asSlice()}})
case ValueKindEmpty:
return nil, nil
default:
return nil, fmt.Errorf("unknown Value kind: %s", v.Kind().String())
}
}
// UnmarshalJSON decodes the OTLP formatted JSON contained in data into v.
func (v *Value) UnmarshalJSON(data []byte) error {
decoder := json.NewDecoder(bytes.NewReader(data))
t, err := decoder.Token()
if err != nil {
return err
}
if t != json.Delim('{') {
return errors.New("invalid Value type")
}
for decoder.More() {
keyIface, err := decoder.Token()
if err != nil {
if errors.Is(err, io.EOF) {
// Empty.
return nil
}
return err
}
key, ok := keyIface.(string)
if !ok {
return fmt.Errorf("invalid Value key: %#v", keyIface)
}
switch key {
case "stringValue", "string_value":
var val string
err = decoder.Decode(&val)
*v = StringValue(val)
case "boolValue", "bool_value":
var val bool
err = decoder.Decode(&val)
*v = BoolValue(val)
case "intValue", "int_value":
var val protoInt64
err = decoder.Decode(&val)
*v = Int64Value(val.Int64())
case "doubleValue", "double_value":
var val float64
err = decoder.Decode(&val)
*v = Float64Value(val)
case "bytesValue", "bytes_value":
var val64 string
if err := decoder.Decode(&val64); err != nil {
return err
}
var val []byte
val, err = base64.StdEncoding.DecodeString(val64)
*v = BytesValue(val)
case "arrayValue", "array_value":
var val struct{ Values []Value }
err = decoder.Decode(&val)
*v = SliceValue(val.Values...)
case "kvlistValue", "kvlist_value":
var val struct{ Values []Attr }
err = decoder.Decode(&val)
*v = MapValue(val.Values...)
default:
// Skip unknown.
continue
}
// Use first valid. Ignore the rest.
return err
}
// Only unknown fields. Return nil without unmarshaling any value.
return nil
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package trace // import "go.opentelemetry.io/otel/trace"
// nonRecordingSpan is a minimal implementation of a Span that wraps a
// SpanContext. It performs no operations other than to return the wrapped
// SpanContext.
type nonRecordingSpan struct {
noopSpan
sc SpanContext
}
// SpanContext returns the wrapped SpanContext.
func (s nonRecordingSpan) SpanContext() SpanContext { return s.sc }
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package trace // import "go.opentelemetry.io/otel/trace"
import (
"context"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace/embedded"
)
// NewNoopTracerProvider returns an implementation of TracerProvider that
// performs no operations. The Tracer and Spans created from the returned
// TracerProvider also perform no operations.
//
// Deprecated: Use [go.opentelemetry.io/otel/trace/noop.NewTracerProvider]
// instead.
func NewNoopTracerProvider() TracerProvider {
return noopTracerProvider{}
}
type noopTracerProvider struct{ embedded.TracerProvider }
var _ TracerProvider = noopTracerProvider{}
// Tracer returns noop implementation of Tracer.
func (noopTracerProvider) Tracer(string, ...TracerOption) Tracer {
return noopTracer{}
}
// noopTracer is an implementation of Tracer that performs no operations.
type noopTracer struct{ embedded.Tracer }
var _ Tracer = noopTracer{}
// Start carries forward a non-recording Span, if one is present in the context, otherwise it
// creates a no-op Span.
func (noopTracer) Start(ctx context.Context, _ string, _ ...SpanStartOption) (context.Context, Span) {
span := SpanFromContext(ctx)
if _, ok := span.(nonRecordingSpan); !ok {
// span is likely already a noopSpan, but let's be sure
span = noopSpanInstance
}
return ContextWithSpan(ctx, span), span
}
// noopSpan is an implementation of Span that performs no operations.
type noopSpan struct{ embedded.Span }
var noopSpanInstance Span = noopSpan{}
// SpanContext returns an empty span context.
func (noopSpan) SpanContext() SpanContext { return SpanContext{} }
// IsRecording always returns false.
func (noopSpan) IsRecording() bool { return false }
// SetStatus does nothing.
func (noopSpan) SetStatus(codes.Code, string) {}
// SetError does nothing.
func (noopSpan) SetError(bool) {}
// SetAttributes does nothing.
func (noopSpan) SetAttributes(...attribute.KeyValue) {}
// End does nothing.
func (noopSpan) End(...SpanEndOption) {}
// RecordError does nothing.
func (noopSpan) RecordError(error, ...EventOption) {}
// AddEvent does nothing.
func (noopSpan) AddEvent(string, ...EventOption) {}
// AddLink does nothing.
func (noopSpan) AddLink(Link) {}
// SetName does nothing.
func (noopSpan) SetName(string) {}
// TracerProvider returns a no-op TracerProvider.
func (s noopSpan) TracerProvider() TracerProvider {
return s.tracerProvider(autoInstEnabled)
}
// autoInstEnabled defines if the auto-instrumentation SDK is enabled.
//
// The auto-instrumentation is expected to overwrite this value to true when it
// attaches to the process.
var autoInstEnabled = new(bool)
// tracerProvider return a noopTracerProvider if autoEnabled is false,
// otherwise it will return a TracerProvider from the sdk package used in
// auto-instrumentation.
//
//go:noinline
func (noopSpan) tracerProvider(autoEnabled *bool) TracerProvider {
if *autoEnabled {
return newAutoTracerProvider()
}
return noopTracerProvider{}
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package trace // import "go.opentelemetry.io/otel/trace"
import (
"context"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace/embedded"
)
// Span is the individual component of a trace. It represents a single named
// and timed operation of a workflow that is traced. A Tracer is used to
// create a Span and it is then up to the operation the Span represents to
// properly end the Span when the operation itself ends.
//
// Warning: Methods may be added to this interface in minor releases. See
// package documentation on API implementation for information on how to set
// default behavior for unimplemented methods.
type Span interface {
// Users of the interface can ignore this. This embedded type is only used
// by implementations of this interface. See the "API Implementations"
// section of the package documentation for more information.
embedded.Span
// End completes the Span. The Span is considered complete and ready to be
// delivered through the rest of the telemetry pipeline after this method
// is called. Therefore, updates to the Span are not allowed after this
// method has been called.
End(options ...SpanEndOption)
// AddEvent adds an event with the provided name and options.
AddEvent(name string, options ...EventOption)
// AddLink adds a link.
// Adding links at span creation using WithLinks is preferred to calling AddLink
// later, for contexts that are available during span creation, because head
// sampling decisions can only consider information present during span creation.
AddLink(link Link)
// IsRecording returns the recording state of the Span. It will return
// true if the Span is active and events can be recorded.
IsRecording() bool
// RecordError will record err as an exception span event for this span. An
// additional call to SetStatus is required if the Status of the Span should
// be set to Error, as this method does not change the Span status. If this
// span is not being recorded or err is nil then this method does nothing.
RecordError(err error, options ...EventOption)
// SpanContext returns the SpanContext of the Span. The returned SpanContext
// is usable even after the End method has been called for the Span.
SpanContext() SpanContext
// SetStatus sets the status of the Span in the form of a code and a
// description, provided the status hasn't already been set to a higher
// value before (OK > Error > Unset). The description is only included in a
// status when the code is for an error.
SetStatus(code codes.Code, description string)
// SetName sets the Span name.
SetName(name string)
// SetAttributes sets kv as attributes of the Span. If a key from kv
// already exists for an attribute of the Span it will be overwritten with
// the value contained in kv.
//
// Note that adding attributes at span creation using [WithAttributes] is preferred
// to calling SetAttribute later, as samplers can only consider information
// already present during span creation.
SetAttributes(kv ...attribute.KeyValue)
// TracerProvider returns a TracerProvider that can be used to generate
// additional Spans on the same telemetry pipeline as the current Span.
TracerProvider() TracerProvider
}
// Link is the relationship between two Spans. The relationship can be within
// the same Trace or across different Traces.
//
// For example, a Link is used in the following situations:
//
// 1. Batch Processing: A batch of operations may contain operations
// associated with one or more traces/spans. Since there can only be one
// parent SpanContext, a Link is used to keep reference to the
// SpanContext of all operations in the batch.
// 2. Public Endpoint: A SpanContext for an in incoming client request on a
// public endpoint should be considered untrusted. In such a case, a new
// trace with its own identity and sampling decision needs to be created,
// but this new trace needs to be related to the original trace in some
// form. A Link is used to keep reference to the original SpanContext and
// track the relationship.
type Link struct {
// SpanContext of the linked Span.
SpanContext SpanContext
// Attributes describe the aspects of the link.
Attributes []attribute.KeyValue
}
// LinkFromContext returns a link encapsulating the SpanContext in the provided
// ctx.
func LinkFromContext(ctx context.Context, attrs ...attribute.KeyValue) Link {
return Link{
SpanContext: SpanContextFromContext(ctx),
Attributes: attrs,
}
}
// SpanKind is the role a Span plays in a Trace.
type SpanKind int
// As a convenience, these match the proto definition, see
// https://github.com/open-telemetry/opentelemetry-proto/blob/30d237e1ff3ab7aa50e0922b5bebdd93505090af/opentelemetry/proto/trace/v1/trace.proto#L101-L129
//
// The unspecified value is not a valid `SpanKind`. Use `ValidateSpanKind()`
// to coerce a span kind to a valid value.
const (
// SpanKindUnspecified is an unspecified SpanKind and is not a valid
// SpanKind. SpanKindUnspecified should be replaced with SpanKindInternal
// if it is received.
SpanKindUnspecified SpanKind = 0
// SpanKindInternal is a SpanKind for a Span that represents an internal
// operation within an application.
SpanKindInternal SpanKind = 1
// SpanKindServer is a SpanKind for a Span that represents the operation
// of handling a request from a client.
SpanKindServer SpanKind = 2
// SpanKindClient is a SpanKind for a Span that represents the operation
// of client making a request to a server.
SpanKindClient SpanKind = 3
// SpanKindProducer is a SpanKind for a Span that represents the operation
// of a producer sending a message to a message broker. Unlike
// SpanKindClient and SpanKindServer, there is often no direct
// relationship between this kind of Span and a SpanKindConsumer kind. A
// SpanKindProducer Span will end once the message is accepted by the
// message broker which might not overlap with the processing of that
// message.
SpanKindProducer SpanKind = 4
// SpanKindConsumer is a SpanKind for a Span that represents the operation
// of a consumer receiving a message from a message broker. Like
// SpanKindProducer Spans, there is often no direct relationship between
// this Span and the Span that produced the message.
SpanKindConsumer SpanKind = 5
)
// ValidateSpanKind returns a valid span kind value. This will coerce
// invalid values into the default value, SpanKindInternal.
func ValidateSpanKind(spanKind SpanKind) SpanKind {
switch spanKind {
case SpanKindInternal,
SpanKindServer,
SpanKindClient,
SpanKindProducer,
SpanKindConsumer:
// valid
return spanKind
default:
return SpanKindInternal
}
}
// String returns the specified name of the SpanKind in lower-case.
func (sk SpanKind) String() string {
switch sk {
case SpanKindInternal:
return "internal"
case SpanKindServer:
return "server"
case SpanKindClient:
return "client"
case SpanKindProducer:
return "producer"
case SpanKindConsumer:
return "consumer"
default:
return "unspecified"
}
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package trace // import "go.opentelemetry.io/otel/trace"
import (
"encoding/json"
)
const (
// FlagsSampled is a bitmask with the sampled bit set. A SpanContext
// with the sampling bit set means the span is sampled.
FlagsSampled = TraceFlags(0x01)
errInvalidHexID errorConst = "trace-id and span-id can only contain [0-9a-f] characters, all lowercase"
errInvalidTraceIDLength errorConst = "hex encoded trace-id must have length equals to 32"
errNilTraceID errorConst = "trace-id can't be all zero"
errInvalidSpanIDLength errorConst = "hex encoded span-id must have length equals to 16"
errNilSpanID errorConst = "span-id can't be all zero"
)
type errorConst string
func (e errorConst) Error() string {
return string(e)
}
// TraceID is a unique identity of a trace.
// nolint:revive // revive complains about stutter of `trace.TraceID`.
type TraceID [16]byte
var (
nilTraceID TraceID
_ json.Marshaler = nilTraceID
)
// IsValid reports whether the trace TraceID is valid. A valid trace ID does
// not consist of zeros only.
func (t TraceID) IsValid() bool {
return t != nilTraceID
}
// MarshalJSON implements a custom marshal function to encode TraceID
// as a hex string.
func (t TraceID) MarshalJSON() ([]byte, error) {
b := [32 + 2]byte{0: '"', 33: '"'}
h := t.hexBytes()
copy(b[1:], h[:])
return b[:], nil
}
// String returns the hex string representation form of a TraceID.
func (t TraceID) String() string {
h := t.hexBytes()
return string(h[:])
}
// hexBytes returns the hex string representation form of a TraceID.
func (t TraceID) hexBytes() [32]byte {
return [32]byte{
hexLU[t[0x0]>>4], hexLU[t[0x0]&0xf],
hexLU[t[0x1]>>4], hexLU[t[0x1]&0xf],
hexLU[t[0x2]>>4], hexLU[t[0x2]&0xf],
hexLU[t[0x3]>>4], hexLU[t[0x3]&0xf],
hexLU[t[0x4]>>4], hexLU[t[0x4]&0xf],
hexLU[t[0x5]>>4], hexLU[t[0x5]&0xf],
hexLU[t[0x6]>>4], hexLU[t[0x6]&0xf],
hexLU[t[0x7]>>4], hexLU[t[0x7]&0xf],
hexLU[t[0x8]>>4], hexLU[t[0x8]&0xf],
hexLU[t[0x9]>>4], hexLU[t[0x9]&0xf],
hexLU[t[0xa]>>4], hexLU[t[0xa]&0xf],
hexLU[t[0xb]>>4], hexLU[t[0xb]&0xf],
hexLU[t[0xc]>>4], hexLU[t[0xc]&0xf],
hexLU[t[0xd]>>4], hexLU[t[0xd]&0xf],
hexLU[t[0xe]>>4], hexLU[t[0xe]&0xf],
hexLU[t[0xf]>>4], hexLU[t[0xf]&0xf],
}
}
// SpanID is a unique identity of a span in a trace.
type SpanID [8]byte
var (
nilSpanID SpanID
_ json.Marshaler = nilSpanID
)
// IsValid reports whether the SpanID is valid. A valid SpanID does not consist
// of zeros only.
func (s SpanID) IsValid() bool {
return s != nilSpanID
}
// MarshalJSON implements a custom marshal function to encode SpanID
// as a hex string.
func (s SpanID) MarshalJSON() ([]byte, error) {
b := [16 + 2]byte{0: '"', 17: '"'}
h := s.hexBytes()
copy(b[1:], h[:])
return b[:], nil
}
// String returns the hex string representation form of a SpanID.
func (s SpanID) String() string {
b := s.hexBytes()
return string(b[:])
}
func (s SpanID) hexBytes() [16]byte {
return [16]byte{
hexLU[s[0]>>4], hexLU[s[0]&0xf],
hexLU[s[1]>>4], hexLU[s[1]&0xf],
hexLU[s[2]>>4], hexLU[s[2]&0xf],
hexLU[s[3]>>4], hexLU[s[3]&0xf],
hexLU[s[4]>>4], hexLU[s[4]&0xf],
hexLU[s[5]>>4], hexLU[s[5]&0xf],
hexLU[s[6]>>4], hexLU[s[6]&0xf],
hexLU[s[7]>>4], hexLU[s[7]&0xf],
}
}
// TraceIDFromHex returns a TraceID from a hex string if it is compliant with
// the W3C trace-context specification. See more at
// https://www.w3.org/TR/trace-context/#trace-id
// nolint:revive // revive complains about stutter of `trace.TraceIDFromHex`.
func TraceIDFromHex(h string) (TraceID, error) {
if len(h) != 32 {
return [16]byte{}, errInvalidTraceIDLength
}
var b [16]byte
invalidMark := byte(0)
for i := 0; i < len(h); i += 4 {
b[i/2] = (hexRev[h[i]] << 4) | hexRev[h[i+1]]
b[i/2+1] = (hexRev[h[i+2]] << 4) | hexRev[h[i+3]]
invalidMark |= hexRev[h[i]] | hexRev[h[i+1]] | hexRev[h[i+2]] | hexRev[h[i+3]]
}
// If the upper 4 bits of any byte are not zero, there was an invalid hex
// character since invalid hex characters are 0xff in hexRev.
if invalidMark&0xf0 != 0 {
return [16]byte{}, errInvalidHexID
}
// If we didn't set any bits, then h was all zeros.
if invalidMark == 0 {
return [16]byte{}, errNilTraceID
}
return b, nil
}
// SpanIDFromHex returns a SpanID from a hex string if it is compliant
// with the w3c trace-context specification.
// See more at https://www.w3.org/TR/trace-context/#parent-id
func SpanIDFromHex(h string) (SpanID, error) {
if len(h) != 16 {
return [8]byte{}, errInvalidSpanIDLength
}
var b [8]byte
invalidMark := byte(0)
for i := 0; i < len(h); i += 4 {
b[i/2] = (hexRev[h[i]] << 4) | hexRev[h[i+1]]
b[i/2+1] = (hexRev[h[i+2]] << 4) | hexRev[h[i+3]]
invalidMark |= hexRev[h[i]] | hexRev[h[i+1]] | hexRev[h[i+2]] | hexRev[h[i+3]]
}
// If the upper 4 bits of any byte are not zero, there was an invalid hex
// character since invalid hex characters are 0xff in hexRev.
if invalidMark&0xf0 != 0 {
return [8]byte{}, errInvalidHexID
}
// If we didn't set any bits, then h was all zeros.
if invalidMark == 0 {
return [8]byte{}, errNilSpanID
}
return b, nil
}
// TraceFlags contains flags that can be set on a SpanContext.
type TraceFlags byte //nolint:revive // revive complains about stutter of `trace.TraceFlags`.
// IsSampled reports whether the sampling bit is set in the TraceFlags.
func (tf TraceFlags) IsSampled() bool {
return tf&FlagsSampled == FlagsSampled
}
// WithSampled sets the sampling bit in a new copy of the TraceFlags.
func (tf TraceFlags) WithSampled(sampled bool) TraceFlags { // nolint:revive // sampled is not a control flag.
if sampled {
return tf | FlagsSampled
}
return tf &^ FlagsSampled
}
// MarshalJSON implements a custom marshal function to encode TraceFlags
// as a hex string.
func (tf TraceFlags) MarshalJSON() ([]byte, error) {
b := [2 + 2]byte{0: '"', 3: '"'}
h := tf.hexBytes()
copy(b[1:], h[:])
return b[:], nil
}
// String returns the hex string representation form of TraceFlags.
func (tf TraceFlags) String() string {
h := tf.hexBytes()
return string(h[:])
}
func (tf TraceFlags) hexBytes() [2]byte {
return [2]byte{hexLU[tf>>4], hexLU[tf&0xf]}
}
// SpanContextConfig contains mutable fields usable for constructing
// an immutable SpanContext.
type SpanContextConfig struct {
TraceID TraceID
SpanID SpanID
TraceFlags TraceFlags
TraceState TraceState
Remote bool
}
// NewSpanContext constructs a SpanContext using values from the provided
// SpanContextConfig.
func NewSpanContext(config SpanContextConfig) SpanContext {
return SpanContext{
traceID: config.TraceID,
spanID: config.SpanID,
traceFlags: config.TraceFlags,
traceState: config.TraceState,
remote: config.Remote,
}
}
// SpanContext contains identifying trace information about a Span.
type SpanContext struct {
traceID TraceID
spanID SpanID
traceFlags TraceFlags
traceState TraceState
remote bool
}
var _ json.Marshaler = SpanContext{}
// IsValid reports whether the SpanContext is valid. A valid span context has a
// valid TraceID and SpanID.
func (sc SpanContext) IsValid() bool {
return sc.HasTraceID() && sc.HasSpanID()
}
// IsRemote reports whether the SpanContext represents a remotely-created Span.
func (sc SpanContext) IsRemote() bool {
return sc.remote
}
// WithRemote returns a copy of sc with the Remote property set to remote.
func (sc SpanContext) WithRemote(remote bool) SpanContext {
return SpanContext{
traceID: sc.traceID,
spanID: sc.spanID,
traceFlags: sc.traceFlags,
traceState: sc.traceState,
remote: remote,
}
}
// TraceID returns the TraceID from the SpanContext.
func (sc SpanContext) TraceID() TraceID {
return sc.traceID
}
// HasTraceID reports whether the SpanContext has a valid TraceID.
func (sc SpanContext) HasTraceID() bool {
return sc.traceID.IsValid()
}
// WithTraceID returns a new SpanContext with the TraceID replaced.
func (sc SpanContext) WithTraceID(traceID TraceID) SpanContext {
return SpanContext{
traceID: traceID,
spanID: sc.spanID,
traceFlags: sc.traceFlags,
traceState: sc.traceState,
remote: sc.remote,
}
}
// SpanID returns the SpanID from the SpanContext.
func (sc SpanContext) SpanID() SpanID {
return sc.spanID
}
// HasSpanID reports whether the SpanContext has a valid SpanID.
func (sc SpanContext) HasSpanID() bool {
return sc.spanID.IsValid()
}
// WithSpanID returns a new SpanContext with the SpanID replaced.
func (sc SpanContext) WithSpanID(spanID SpanID) SpanContext {
return SpanContext{
traceID: sc.traceID,
spanID: spanID,
traceFlags: sc.traceFlags,
traceState: sc.traceState,
remote: sc.remote,
}
}
// TraceFlags returns the flags from the SpanContext.
func (sc SpanContext) TraceFlags() TraceFlags {
return sc.traceFlags
}
// IsSampled reports whether the sampling bit is set in the SpanContext's TraceFlags.
func (sc SpanContext) IsSampled() bool {
return sc.traceFlags.IsSampled()
}
// WithTraceFlags returns a new SpanContext with the TraceFlags replaced.
func (sc SpanContext) WithTraceFlags(flags TraceFlags) SpanContext {
return SpanContext{
traceID: sc.traceID,
spanID: sc.spanID,
traceFlags: flags,
traceState: sc.traceState,
remote: sc.remote,
}
}
// TraceState returns the TraceState from the SpanContext.
func (sc SpanContext) TraceState() TraceState {
return sc.traceState
}
// WithTraceState returns a new SpanContext with the TraceState replaced.
func (sc SpanContext) WithTraceState(state TraceState) SpanContext {
return SpanContext{
traceID: sc.traceID,
spanID: sc.spanID,
traceFlags: sc.traceFlags,
traceState: state,
remote: sc.remote,
}
}
// Equal reports whether two SpanContext values are equal.
func (sc SpanContext) Equal(other SpanContext) bool {
return sc.traceID == other.traceID &&
sc.spanID == other.spanID &&
sc.traceFlags == other.traceFlags &&
sc.traceState.String() == other.traceState.String() &&
sc.remote == other.remote
}
// MarshalJSON implements a custom marshal function to encode a SpanContext.
func (sc SpanContext) MarshalJSON() ([]byte, error) {
return json.Marshal(SpanContextConfig{
TraceID: sc.traceID,
SpanID: sc.spanID,
TraceFlags: sc.traceFlags,
TraceState: sc.traceState,
Remote: sc.remote,
})
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package trace // import "go.opentelemetry.io/otel/trace"
import (
"encoding/json"
"fmt"
"strings"
)
const (
maxListMembers = 32
listDelimiters = ","
memberDelimiter = "="
errInvalidKey errorConst = "invalid tracestate key"
errInvalidValue errorConst = "invalid tracestate value"
errInvalidMember errorConst = "invalid tracestate list-member"
errMemberNumber errorConst = "too many list-members in tracestate"
errDuplicate errorConst = "duplicate list-member in tracestate"
)
type member struct {
Key string
Value string
}
// according to (chr = %x20 / (nblk-char = %x21-2B / %x2D-3C / %x3E-7E) )
// means (chr = %x20-2B / %x2D-3C / %x3E-7E) .
func checkValueChar(v byte) bool {
return v >= '\x20' && v <= '\x7e' && v != '\x2c' && v != '\x3d'
}
// according to (nblk-chr = %x21-2B / %x2D-3C / %x3E-7E) .
func checkValueLast(v byte) bool {
return v >= '\x21' && v <= '\x7e' && v != '\x2c' && v != '\x3d'
}
// based on the W3C Trace Context specification
//
// value = (0*255(chr)) nblk-chr
// nblk-chr = %x21-2B / %x2D-3C / %x3E-7E
// chr = %x20 / nblk-chr
//
// see https://www.w3.org/TR/trace-context-1/#value
func checkValue(val string) bool {
n := len(val)
if n == 0 || n > 256 {
return false
}
for i := 0; i < n-1; i++ {
if !checkValueChar(val[i]) {
return false
}
}
return checkValueLast(val[n-1])
}
func checkKeyRemain(key string) bool {
// ( lcalpha / DIGIT / "_" / "-"/ "*" / "/" )
for _, v := range key {
if isAlphaNum(byte(v)) {
continue
}
switch v {
case '_', '-', '*', '/':
continue
}
return false
}
return true
}
// according to
//
// simple-key = lcalpha (0*255( lcalpha / DIGIT / "_" / "-"/ "*" / "/" ))
// system-id = lcalpha (0*13( lcalpha / DIGIT / "_" / "-"/ "*" / "/" ))
//
// param n is remain part length, should be 255 in simple-key or 13 in system-id.
func checkKeyPart(key string, n int) bool {
if key == "" {
return false
}
first := key[0] // key's first char
ret := len(key[1:]) <= n
ret = ret && first >= 'a' && first <= 'z'
return ret && checkKeyRemain(key[1:])
}
func isAlphaNum(c byte) bool {
if c >= 'a' && c <= 'z' {
return true
}
return c >= '0' && c <= '9'
}
// according to
//
// tenant-id = ( lcalpha / DIGIT ) 0*240( lcalpha / DIGIT / "_" / "-"/ "*" / "/" )
//
// param n is remain part length, should be 240 exactly.
func checkKeyTenant(key string, n int) bool {
if key == "" {
return false
}
return isAlphaNum(key[0]) && len(key[1:]) <= n && checkKeyRemain(key[1:])
}
// based on the W3C Trace Context specification
//
// key = simple-key / multi-tenant-key
// simple-key = lcalpha (0*255( lcalpha / DIGIT / "_" / "-"/ "*" / "/" ))
// multi-tenant-key = tenant-id "@" system-id
// tenant-id = ( lcalpha / DIGIT ) (0*240( lcalpha / DIGIT / "_" / "-"/ "*" / "/" ))
// system-id = lcalpha (0*13( lcalpha / DIGIT / "_" / "-"/ "*" / "/" ))
// lcalpha = %x61-7A ; a-z
//
// see https://www.w3.org/TR/trace-context-1/#tracestate-header.
func checkKey(key string) bool {
tenant, system, ok := strings.Cut(key, "@")
if !ok {
return checkKeyPart(key, 255)
}
return checkKeyTenant(tenant, 240) && checkKeyPart(system, 13)
}
func newMember(key, value string) (member, error) {
if !checkKey(key) {
return member{}, errInvalidKey
}
if !checkValue(value) {
return member{}, errInvalidValue
}
return member{Key: key, Value: value}, nil
}
func parseMember(m string) (member, error) {
key, val, ok := strings.Cut(m, memberDelimiter)
if !ok {
return member{}, fmt.Errorf("%w: %s", errInvalidMember, m)
}
key = strings.TrimLeft(key, " \t")
val = strings.TrimRight(val, " \t")
result, e := newMember(key, val)
if e != nil {
return member{}, fmt.Errorf("%w: %s", errInvalidMember, m)
}
return result, nil
}
// String encodes member into a string compliant with the W3C Trace Context
// specification.
func (m member) String() string {
return m.Key + "=" + m.Value
}
// TraceState provides additional vendor-specific trace identification
// information across different distributed tracing systems. It represents an
// immutable list consisting of key/value pairs, each pair is referred to as a
// list-member.
//
// TraceState conforms to the W3C Trace Context specification
// (https://www.w3.org/TR/trace-context-1). All operations that create or copy
// a TraceState do so by validating all input and will only produce TraceState
// that conform to the specification. Specifically, this means that all
// list-member's key/value pairs are valid, no duplicate list-members exist,
// and the maximum number of list-members (32) is not exceeded.
type TraceState struct { //nolint:revive // revive complains about stutter of `trace.TraceState`
// list is the members in order.
list []member
}
var _ json.Marshaler = TraceState{}
// ParseTraceState attempts to decode a TraceState from the passed
// string. It returns an error if the input is invalid according to the W3C
// Trace Context specification.
func ParseTraceState(ts string) (TraceState, error) {
if ts == "" {
return TraceState{}, nil
}
wrapErr := func(err error) error {
return fmt.Errorf("failed to parse tracestate: %w", err)
}
var members []member
found := make(map[string]struct{})
for ts != "" {
var memberStr string
memberStr, ts, _ = strings.Cut(ts, listDelimiters)
if memberStr == "" {
continue
}
m, err := parseMember(memberStr)
if err != nil {
return TraceState{}, wrapErr(err)
}
if _, ok := found[m.Key]; ok {
return TraceState{}, wrapErr(errDuplicate)
}
found[m.Key] = struct{}{}
members = append(members, m)
if n := len(members); n > maxListMembers {
return TraceState{}, wrapErr(errMemberNumber)
}
}
return TraceState{list: members}, nil
}
// MarshalJSON marshals the TraceState into JSON.
func (ts TraceState) MarshalJSON() ([]byte, error) {
return json.Marshal(ts.String())
}
// String encodes the TraceState into a string compliant with the W3C
// Trace Context specification. The returned string will be invalid if the
// TraceState contains any invalid members.
func (ts TraceState) String() string {
if len(ts.list) == 0 {
return ""
}
var n int
n += len(ts.list) // member delimiters: '='
n += len(ts.list) - 1 // list delimiters: ','
for _, mem := range ts.list {
n += len(mem.Key)
n += len(mem.Value)
}
var sb strings.Builder
sb.Grow(n)
_, _ = sb.WriteString(ts.list[0].Key)
_ = sb.WriteByte('=')
_, _ = sb.WriteString(ts.list[0].Value)
for i := 1; i < len(ts.list); i++ {
_ = sb.WriteByte(listDelimiters[0])
_, _ = sb.WriteString(ts.list[i].Key)
_ = sb.WriteByte('=')
_, _ = sb.WriteString(ts.list[i].Value)
}
return sb.String()
}
// Get returns the value paired with key from the corresponding TraceState
// list-member if it exists, otherwise an empty string is returned.
func (ts TraceState) Get(key string) string {
for _, member := range ts.list {
if member.Key == key {
return member.Value
}
}
return ""
}
// Walk walks all key value pairs in the TraceState by calling f
// Iteration stops if f returns false.
func (ts TraceState) Walk(f func(key, value string) bool) {
for _, m := range ts.list {
if !f(m.Key, m.Value) {
break
}
}
}
// Insert adds a new list-member defined by the key/value pair to the
// TraceState. If a list-member already exists for the given key, that
// list-member's value is updated. The new or updated list-member is always
// moved to the beginning of the TraceState as specified by the W3C Trace
// Context specification.
//
// If key or value are invalid according to the W3C Trace Context
// specification an error is returned with the original TraceState.
//
// If adding a new list-member means the TraceState would have more members
// then is allowed, the new list-member will be inserted and the right-most
// list-member will be dropped in the returned TraceState.
func (ts TraceState) Insert(key, value string) (TraceState, error) {
m, err := newMember(key, value)
if err != nil {
return ts, err
}
n := len(ts.list)
found := n
for i := range ts.list {
if ts.list[i].Key == key {
found = i
}
}
cTS := TraceState{}
if found == n && n < maxListMembers {
cTS.list = make([]member, n+1)
} else {
cTS.list = make([]member, n)
}
cTS.list[0] = m
// When the number of members exceeds capacity, drop the "right-most".
copy(cTS.list[1:], ts.list[0:found])
if found < n {
copy(cTS.list[1+found:], ts.list[found+1:])
}
return cTS, nil
}
// Delete returns a copy of the TraceState with the list-member identified by
// key removed.
func (ts TraceState) Delete(key string) TraceState {
members := make([]member, ts.Len())
copy(members, ts.list)
for i, member := range ts.list {
if member.Key == key {
members = append(members[:i], members[i+1:]...)
// TraceState should contain no duplicate members.
break
}
}
return TraceState{list: members}
}
// Len returns the number of list-members in the TraceState.
func (ts TraceState) Len() int {
return len(ts.list)
}