// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import (
"context"
"go.opentelemetry.io/otel/trace"
)
// Filter determines if a measurement should be offered.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
type Filter func(context.Context) bool
// TraceBasedFilter is a [Filter] that will only offer measurements
// if the passed context associated with the measurement contains a sampled
// [go.opentelemetry.io/otel/trace.SpanContext].
func TraceBasedFilter(ctx context.Context) bool {
return trace.SpanContextFromContext(ctx).IsSampled()
}
// AlwaysOnFilter is a [Filter] that always offers measurements.
func AlwaysOnFilter(context.Context) bool {
return true
}
// AlwaysOffFilter is a [Filter] that never offers measurements.
func AlwaysOffFilter(context.Context) bool {
return false
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import (
"context"
"math"
"math/rand/v2"
"time"
"go.opentelemetry.io/otel/attribute"
)
// FixedSizeReservoirProvider returns a provider of [FixedSizeReservoir].
func FixedSizeReservoirProvider(k int) ReservoirProvider {
return func(attribute.Set) Reservoir {
return NewFixedSizeReservoir(k)
}
}
// NewFixedSizeReservoir returns a [FixedSizeReservoir] that samples at most
// k exemplars. If there are k or less measurements made, the Reservoir will
// sample each one. If there are more than k, the Reservoir will then randomly
// sample all additional measurement with a decreasing probability.
func NewFixedSizeReservoir(k int) *FixedSizeReservoir {
return newFixedSizeReservoir(newStorage(k))
}
var _ Reservoir = &FixedSizeReservoir{}
// FixedSizeReservoir is a [Reservoir] that samples at most k exemplars. If
// there are k or less measurements made, the Reservoir will sample each one.
// If there are more than k, the Reservoir will then randomly sample all
// additional measurement with a decreasing probability.
type FixedSizeReservoir struct {
*storage
// count is the number of measurement seen.
count int64
// next is the next count that will store a measurement at a random index
// once the reservoir has been filled.
next int64
// w is the largest random number in a distribution that is used to compute
// the next next.
w float64
}
func newFixedSizeReservoir(s *storage) *FixedSizeReservoir {
r := &FixedSizeReservoir{
storage: s,
}
r.reset()
return r
}
// randomFloat64 returns, as a float64, a uniform pseudo-random number in the
// open interval (0.0,1.0).
func (*FixedSizeReservoir) randomFloat64() float64 {
// TODO: Use an algorithm that avoids rejection sampling. For example:
//
// const precision = 1 << 53 // 2^53
// // Generate an integer in [1, 2^53 - 1]
// v := rand.Uint64() % (precision - 1) + 1
// return float64(v) / float64(precision)
f := rand.Float64()
for f == 0 {
f = rand.Float64()
}
return f
}
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the Reservoir decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
//
// The time t is the time when the measurement was made. The v and a
// parameters are the value and dropped (filtered) attributes of the
// measurement respectively.
func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) {
// The following algorithm is "Algorithm L" from Li, Kim-Hung (4 December
// 1994). "Reservoir-Sampling Algorithms of Time Complexity
// O(n(1+log(N/n)))". ACM Transactions on Mathematical Software. 20 (4):
// 481–493 (https://dl.acm.org/doi/10.1145/198429.198435).
//
// A high-level overview of "Algorithm L":
// 0) Pre-calculate the random count greater than the storage size when
// an exemplar will be replaced.
// 1) Accept all measurements offered until the configured storage size is
// reached.
// 2) Loop:
// a) When the pre-calculate count is reached, replace a random
// existing exemplar with the offered measurement.
// b) Calculate the next random count greater than the existing one
// which will replace another exemplars
//
// The way a "replacement" count is computed is by looking at `n` number of
// independent random numbers each corresponding to an offered measurement.
// Of these numbers the smallest `k` (the same size as the storage
// capacity) of them are kept as a subset. The maximum value in this
// subset, called `w` is used to weight another random number generation
// for the next count that will be considered.
//
// By weighting the next count computation like described, it is able to
// perform a uniformly-weighted sampling algorithm based on the number of
// samples the reservoir has seen so far. The sampling will "slow down" as
// more and more samples are offered so as to reduce a bias towards those
// offered just prior to the end of the collection.
//
// This algorithm is preferred because of its balance of simplicity and
// performance. It will compute three random numbers (the bulk of
// computation time) for each item that becomes part of the reservoir, but
// it does not spend any time on items that do not. In particular it has an
// asymptotic runtime of O(k(1 + log(n/k)) where n is the number of
// measurements offered and k is the reservoir size.
//
// See https://en.wikipedia.org/wiki/Reservoir_sampling for an overview of
// this and other reservoir sampling algorithms. See
// https://github.com/MrAlias/reservoir-sampling for a performance
// comparison of reservoir sampling algorithms.
if int(r.count) < cap(r.store) {
r.store[r.count] = newMeasurement(ctx, t, n, a)
} else if r.count == r.next {
// Overwrite a random existing measurement with the one offered.
idx := int(rand.Int64N(int64(cap(r.store))))
r.store[idx] = newMeasurement(ctx, t, n, a)
r.advance()
}
r.count++
}
// reset resets r to the initial state.
func (r *FixedSizeReservoir) reset() {
// This resets the number of exemplars known.
r.count = 0
// Random index inserts should only happen after the storage is full.
r.next = int64(cap(r.store))
// Initial random number in the series used to generate r.next.
//
// This is set before r.advance to reset or initialize the random number
// series. Without doing so it would always be 0 or never restart a new
// random number series.
//
// This maps the uniform random number in (0,1) to a geometric distribution
// over the same interval. The mean of the distribution is inversely
// proportional to the storage capacity.
r.w = math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.store)))
r.advance()
}
// advance updates the count at which the offered measurement will overwrite an
// existing exemplar.
func (r *FixedSizeReservoir) advance() {
// Calculate the next value in the random number series.
//
// The current value of r.w is based on the max of a distribution of random
// numbers (i.e. `w = max(u_1,u_2,...,u_k)` for `k` equal to the capacity
// of the storage and each `u` in the interval (0,w)). To calculate the
// next r.w we use the fact that when the next exemplar is selected to be
// included in the storage an existing one will be dropped, and the
// corresponding random number in the set used to calculate r.w will also
// be replaced. The replacement random number will also be within (0,w),
// therefore the next r.w will be based on the same distribution (i.e.
// `max(u_1,u_2,...,u_k)`). Therefore, we can sample the next r.w by
// computing the next random number `u` and take r.w as `w * u^(1/k)`.
r.w *= math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.store)))
// Use the new random number in the series to calculate the count of the
// next measurement that will be stored.
//
// Given 0 < r.w < 1, each iteration will result in subsequent r.w being
// smaller. This translates here into the next next being selected against
// a distribution with a higher mean (i.e. the expected value will increase
// and replacements become less likely)
//
// Important to note, the new r.next will always be at least 1 more than
// the last r.next.
r.next += int64(math.Log(r.randomFloat64())/math.Log(1-r.w)) + 1
}
// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) {
r.storage.Collect(dest)
// Call reset here even though it will reset r.count and restart the random
// number series. This will persist any old exemplars as long as no new
// measurements are offered, but it will also prioritize those new
// measurements that are made over the older collection cycle ones.
r.reset()
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import (
"context"
"slices"
"sort"
"time"
"go.opentelemetry.io/otel/attribute"
)
// HistogramReservoirProvider is a provider of [HistogramReservoir].
func HistogramReservoirProvider(bounds []float64) ReservoirProvider {
cp := slices.Clone(bounds)
slices.Sort(cp)
return func(attribute.Set) Reservoir {
return NewHistogramReservoir(cp)
}
}
// NewHistogramReservoir returns a [HistogramReservoir] that samples the last
// measurement that falls within a histogram bucket. The histogram bucket
// upper-boundaries are define by bounds.
//
// The passed bounds must be sorted before calling this function.
func NewHistogramReservoir(bounds []float64) *HistogramReservoir {
return &HistogramReservoir{
bounds: bounds,
storage: newStorage(len(bounds) + 1),
}
}
var _ Reservoir = &HistogramReservoir{}
// HistogramReservoir is a [Reservoir] that samples the last measurement that
// falls within a histogram bucket. The histogram bucket upper-boundaries are
// define by bounds.
type HistogramReservoir struct {
*storage
// bounds are bucket bounds in ascending order.
bounds []float64
}
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the Reservoir decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
//
// The time t is the time when the measurement was made. The v and a
// parameters are the value and dropped (filtered) attributes of the
// measurement respectively.
func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a []attribute.KeyValue) {
var x float64
switch v.Type() {
case Int64ValueType:
x = float64(v.Int64())
case Float64ValueType:
x = v.Float64()
default:
panic("unknown value type")
}
r.store[sort.SearchFloat64s(r.bounds, x)] = newMeasurement(ctx, t, v, a)
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// storage is an exemplar storage for [Reservoir] implementations.
type storage struct {
// store are the measurements sampled.
//
// This does not use []metricdata.Exemplar because it potentially would
// require an allocation for trace and span IDs in the hot path of Offer.
store []measurement
}
func newStorage(n int) *storage {
return &storage{store: make([]measurement, n)}
}
// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
func (r *storage) Collect(dest *[]Exemplar) {
*dest = reset(*dest, len(r.store), len(r.store))
var n int
for _, m := range r.store {
if !m.valid {
continue
}
m.exemplar(&(*dest)[n])
n++
}
*dest = (*dest)[:n]
}
// measurement is a measurement made by a telemetry system.
type measurement struct {
// FilteredAttributes are the attributes dropped during the measurement.
FilteredAttributes []attribute.KeyValue
// Time is the time when the measurement was made.
Time time.Time
// Value is the value of the measurement.
Value Value
// SpanContext is the SpanContext active when a measurement was made.
SpanContext trace.SpanContext
valid bool
}
// newMeasurement returns a new non-empty Measurement.
func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []attribute.KeyValue) measurement {
return measurement{
FilteredAttributes: droppedAttr,
Time: ts,
Value: v,
SpanContext: trace.SpanContextFromContext(ctx),
valid: true,
}
}
// exemplar returns m as an [Exemplar].
func (m measurement) exemplar(dest *Exemplar) {
dest.FilteredAttributes = m.FilteredAttributes
dest.Time = m.Time
dest.Value = m.Value
if m.SpanContext.HasTraceID() {
traceID := m.SpanContext.TraceID()
dest.TraceID = traceID[:]
} else {
dest.TraceID = dest.TraceID[:0]
}
if m.SpanContext.HasSpanID() {
spanID := m.SpanContext.SpanID()
dest.SpanID = spanID[:]
} else {
dest.SpanID = dest.SpanID[:0]
}
}
func reset[T any](s []T, length, capacity int) []T {
if cap(s) < capacity {
return make([]T, length, capacity)
}
return s[:length]
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import "math"
// ValueType identifies the type of value used in exemplar data.
type ValueType uint8
const (
// UnknownValueType should not be used. It represents a misconfigured
// Value.
UnknownValueType ValueType = 0
// Int64ValueType represents a Value with int64 data.
Int64ValueType ValueType = 1
// Float64ValueType represents a Value with float64 data.
Float64ValueType ValueType = 2
)
// Value is the value of data held by an exemplar.
type Value struct {
t ValueType
val uint64
}
// NewValue returns a new [Value] for the provided value.
func NewValue[N int64 | float64](value N) Value {
switch v := any(value).(type) {
case int64:
// This can be later converted back to int64 (overflow not checked).
return Value{t: Int64ValueType, val: uint64(v)} // nolint:gosec
case float64:
return Value{t: Float64ValueType, val: math.Float64bits(v)}
}
return Value{}
}
// Type returns the [ValueType] of data held by v.
func (v Value) Type() ValueType { return v.t }
// Int64 returns the value of v as an int64. If the ValueType of v is not an
// Int64ValueType, 0 is returned.
func (v Value) Int64() int64 {
if v.t == Int64ValueType {
// Assumes the correct int64 was stored in v.val based on type.
return int64(v.val) // nolint: gosec
}
return 0
}
// Float64 returns the value of v as an float64. If the ValueType of v is not
// an Float64ValueType, 0 is returned.
func (v Value) Float64() float64 {
if v.t == Float64ValueType {
return math.Float64frombits(v.val)
}
return 0
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// now is used to return the current local time while allowing tests to
// override the default time.Now function.
var now = time.Now
// Measure receives measurements to be aggregated.
type Measure[N int64 | float64] func(context.Context, N, attribute.Set)
// ComputeAggregation stores the aggregate of measurements into dest and
// returns the number of aggregate data-points output.
type ComputeAggregation func(dest *metricdata.Aggregation) int
// Builder builds an aggregate function.
type Builder[N int64 | float64] struct {
// Temporality is the temporality used for the returned aggregate function.
//
// If this is not provided a default of cumulative will be used (except for
// the last-value aggregate function where delta is the only appropriate
// temporality).
Temporality metricdata.Temporality
// Filter is the attribute filter the aggregate function will use on the
// input of measurements.
Filter attribute.Filter
// ReservoirFunc is the factory function used by aggregate functions to
// create new exemplar reservoirs for a new seen attribute set.
//
// If this is not provided a default factory function that returns an
// dropReservoir reservoir will be used.
ReservoirFunc func(attribute.Set) FilteredExemplarReservoir[N]
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
// attribute.
//
// If AggregationLimit is less than or equal to zero there will not be an
// aggregation limit imposed (i.e. unlimited attribute sets).
AggregationLimit int
}
func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}
return dropReservoir
}
type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)
func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] {
if b.Filter != nil {
fltr := b.Filter // Copy to make it immutable after assignment.
return func(ctx context.Context, n N, a attribute.Set) {
fAttr, dropped := a.Filter(fltr)
f(ctx, n, fAttr, dropped)
}
}
return func(ctx context.Context, n N, a attribute.Set) {
f(ctx, n, a, nil)
}
}
// LastValue returns a last-value aggregate function input and output.
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
lv := newLastValue[N](b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(lv.measure), lv.delta
default:
return b.filter(lv.measure), lv.cumulative
}
}
// PrecomputedLastValue returns a last-value aggregate function input and
// output. The aggregation returned from the returned ComputeAggregation
// function will always only return values from the previous collection cycle.
func (b Builder[N]) PrecomputedLastValue() (Measure[N], ComputeAggregation) {
lv := newPrecomputedLastValue[N](b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(lv.measure), lv.delta
default:
return b.filter(lv.measure), lv.cumulative
}
}
// PrecomputedSum returns a sum aggregate function input and output. The
// arguments passed to the input are expected to be the precomputed sum values.
func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) {
s := newPrecomputedSum[N](monotonic, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
default:
return b.filter(s.measure), s.cumulative
}
}
// Sum returns a sum aggregate function input and output.
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
s := newSum[N](monotonic, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
default:
return b.filter(s.measure), s.cumulative
}
}
// ExplicitBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExplicitBucketHistogram(
boundaries []float64,
noMinMax, noSum bool,
) (Measure[N], ComputeAggregation) {
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
default:
return b.filter(h.measure), h.cumulative
}
}
// ExponentialBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExponentialBucketHistogram(
maxSize, maxScale int32,
noMinMax, noSum bool,
) (Measure[N], ComputeAggregation) {
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
default:
return b.filter(h.measure), h.cumulative
}
}
// reset ensures s has capacity and sets it length. If the capacity of s too
// small, a new slice is returned with the specified capacity and length.
func reset[T any](s []T, length, capacity int) []T {
if cap(s) < capacity {
return make([]T, length, capacity)
}
return s[:length]
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)
// dropReservoir returns a [FilteredReservoir] that drops all measurements it is offered.
func dropReservoir[N int64 | float64](attribute.Set) FilteredExemplarReservoir[N] {
return &dropRes[N]{}
}
type dropRes[N int64 | float64] struct{}
// Offer does nothing, all measurements offered will be dropped.
func (*dropRes[N]) Offer(context.Context, N, []attribute.KeyValue) {}
// Collect resets dest. No exemplars will ever be returned.
func (*dropRes[N]) Collect(dest *[]exemplar.Exemplar) {
clear(*dest) // Erase elements to let GC collect objects
*dest = (*dest)[:0]
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"sync"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
var exemplarPool = sync.Pool{
New: func() any { return new([]exemplar.Exemplar) },
}
func collectExemplars[N int64 | float64](out *[]metricdata.Exemplar[N], f func(*[]exemplar.Exemplar)) {
dest := exemplarPool.Get().(*[]exemplar.Exemplar)
defer func() {
clear(*dest) // Erase elements to let GC collect objects.
*dest = (*dest)[:0]
exemplarPool.Put(dest)
}()
*dest = reset(*dest, len(*out), cap(*out))
f(dest)
*out = reset(*out, len(*dest), cap(*dest))
for i, e := range *dest {
(*out)[i].FilteredAttributes = e.FilteredAttributes
(*out)[i].Time = e.Time
(*out)[i].SpanID = e.SpanID
(*out)[i].TraceID = e.TraceID
switch e.Value.Type() {
case exemplar.Int64ValueType:
(*out)[i].Value = N(e.Value.Int64())
case exemplar.Float64ValueType:
(*out)[i].Value = N(e.Value.Float64())
}
}
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"errors"
"math"
"sync"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
const (
expoMaxScale = 20
expoMinScale = -10
smallestNonZeroNormalFloat64 = 0x1p-1022
// These redefine the Math constants with a type, so the compiler won't coerce
// them into an int on 32 bit platforms.
maxInt64 int64 = math.MaxInt64
minInt64 int64 = math.MinInt64
)
// expoHistogramDataPoint is a single data point in an exponential histogram.
type expoHistogramDataPoint[N int64 | float64] struct {
attrs attribute.Set
res FilteredExemplarReservoir[N]
count uint64
min N
max N
sum N
maxSize int
noMinMax bool
noSum bool
scale int32
posBuckets expoBuckets
negBuckets expoBuckets
zeroCount uint64
}
func newExpoHistogramDataPoint[N int64 | float64](
attrs attribute.Set,
maxSize int,
maxScale int32,
noMinMax, noSum bool,
) *expoHistogramDataPoint[N] { // nolint:revive // we need this control flag
f := math.MaxFloat64
ma := N(f) // if N is int64, max will overflow to -9223372036854775808
mi := N(-f)
if N(maxInt64) > N(f) {
ma = N(maxInt64)
mi = N(minInt64)
}
return &expoHistogramDataPoint[N]{
attrs: attrs,
min: ma,
max: mi,
maxSize: maxSize,
noMinMax: noMinMax,
noSum: noSum,
scale: maxScale,
}
}
// record adds a new measurement to the histogram. It will rescale the buckets if needed.
func (p *expoHistogramDataPoint[N]) record(v N) {
p.count++
if !p.noMinMax {
if v < p.min {
p.min = v
}
if v > p.max {
p.max = v
}
}
if !p.noSum {
p.sum += v
}
absV := math.Abs(float64(v))
if float64(absV) == 0.0 {
p.zeroCount++
return
}
bin := p.getBin(absV)
bucket := &p.posBuckets
if v < 0 {
bucket = &p.negBuckets
}
// If the new bin would make the counts larger than maxScale, we need to
// downscale current measurements.
if scaleDelta := p.scaleChange(bin, bucket.startBin, len(bucket.counts)); scaleDelta > 0 {
if p.scale-scaleDelta < expoMinScale {
// With a scale of -10 there is only two buckets for the whole range of float64 values.
// This can only happen if there is a max size of 1.
otel.Handle(errors.New("exponential histogram scale underflow"))
return
}
// Downscale
p.scale -= scaleDelta
p.posBuckets.downscale(scaleDelta)
p.negBuckets.downscale(scaleDelta)
bin = p.getBin(absV)
}
bucket.record(bin)
}
// getBin returns the bin v should be recorded into.
func (p *expoHistogramDataPoint[N]) getBin(v float64) int32 {
frac, expInt := math.Frexp(v)
// 11-bit exponential.
exp := int32(expInt) // nolint: gosec
if p.scale <= 0 {
// Because of the choice of fraction is always 1 power of two higher than we want.
var correction int32 = 1
if frac == .5 {
// If v is an exact power of two the frac will be .5 and the exp
// will be one higher than we want.
correction = 2
}
return (exp - correction) >> (-p.scale)
}
return exp<<p.scale + int32(math.Log(frac)*scaleFactors[p.scale]) - 1
}
// scaleFactors are constants used in calculating the logarithm index. They are
// equivalent to 2^index/log(2).
var scaleFactors = [21]float64{
math.Ldexp(math.Log2E, 0),
math.Ldexp(math.Log2E, 1),
math.Ldexp(math.Log2E, 2),
math.Ldexp(math.Log2E, 3),
math.Ldexp(math.Log2E, 4),
math.Ldexp(math.Log2E, 5),
math.Ldexp(math.Log2E, 6),
math.Ldexp(math.Log2E, 7),
math.Ldexp(math.Log2E, 8),
math.Ldexp(math.Log2E, 9),
math.Ldexp(math.Log2E, 10),
math.Ldexp(math.Log2E, 11),
math.Ldexp(math.Log2E, 12),
math.Ldexp(math.Log2E, 13),
math.Ldexp(math.Log2E, 14),
math.Ldexp(math.Log2E, 15),
math.Ldexp(math.Log2E, 16),
math.Ldexp(math.Log2E, 17),
math.Ldexp(math.Log2E, 18),
math.Ldexp(math.Log2E, 19),
math.Ldexp(math.Log2E, 20),
}
// scaleChange returns the magnitude of the scale change needed to fit bin in
// the bucket. If no scale change is needed 0 is returned.
func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin int32, length int) int32 {
if length == 0 {
// No need to rescale if there are no buckets.
return 0
}
low := int(startBin)
high := int(bin)
if startBin >= bin {
low = int(bin)
high = int(startBin) + length - 1
}
var count int32
for high-low >= p.maxSize {
low >>= 1
high >>= 1
count++
if count > expoMaxScale-expoMinScale {
return count
}
}
return count
}
// expoBuckets is a set of buckets in an exponential histogram.
type expoBuckets struct {
startBin int32
counts []uint64
}
// record increments the count for the given bin, and expands the buckets if needed.
// Size changes must be done before calling this function.
func (b *expoBuckets) record(bin int32) {
if len(b.counts) == 0 {
b.counts = []uint64{1}
b.startBin = bin
return
}
endBin := int(b.startBin) + len(b.counts) - 1
// if the new bin is inside the current range
if bin >= b.startBin && int(bin) <= endBin {
b.counts[bin-b.startBin]++
return
}
// if the new bin is before the current start add spaces to the counts
if bin < b.startBin {
origLen := len(b.counts)
newLength := endBin - int(bin) + 1
shift := b.startBin - bin
if newLength > cap(b.counts) {
b.counts = append(b.counts, make([]uint64, newLength-len(b.counts))...)
}
copy(b.counts[shift:origLen+int(shift)], b.counts)
b.counts = b.counts[:newLength]
for i := 1; i < int(shift); i++ {
b.counts[i] = 0
}
b.startBin = bin
b.counts[0] = 1
return
}
// if the new is after the end add spaces to the end
if int(bin) > endBin {
if int(bin-b.startBin) < cap(b.counts) {
b.counts = b.counts[:bin-b.startBin+1]
for i := endBin + 1 - int(b.startBin); i < len(b.counts); i++ {
b.counts[i] = 0
}
b.counts[bin-b.startBin] = 1
return
}
end := make([]uint64, int(bin-b.startBin)-len(b.counts)+1)
b.counts = append(b.counts, end...)
b.counts[bin-b.startBin] = 1
}
}
// downscale shrinks a bucket by a factor of 2*s. It will sum counts into the
// correct lower resolution bucket.
func (b *expoBuckets) downscale(delta int32) {
// Example
// delta = 2
// Original offset: -6
// Counts: [ 3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
// bins: -6 -5, -4, -3, -2, -1, 0, 1, 2, 3, 4
// new bins:-2, -2, -1, -1, -1, -1, 0, 0, 0, 0, 1
// new Offset: -2
// new Counts: [4, 14, 30, 10]
if len(b.counts) <= 1 || delta < 1 {
b.startBin >>= delta
return
}
steps := int32(1) << delta
offset := b.startBin % steps
offset = (offset + steps) % steps // to make offset positive
for i := 1; i < len(b.counts); i++ {
idx := i + int(offset)
if idx%int(steps) == 0 {
b.counts[idx/int(steps)] = b.counts[i]
continue
}
b.counts[idx/int(steps)] += b.counts[i]
}
lastIdx := (len(b.counts) - 1 + int(offset)) / int(steps)
b.counts = b.counts[:lastIdx+1]
b.startBin >>= delta
}
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](
maxSize, maxScale int32,
noMinMax, noSum bool,
limit int,
r func(attribute.Set) FilteredExemplarReservoir[N],
) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
maxSize: int(maxSize),
maxScale: maxScale,
newRes: r,
limit: newLimiter[*expoHistogramDataPoint[N]](limit),
values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]),
start: now(),
}
}
// expoHistogram summarizes a set of measurements as an histogram with exponentially
// defined buckets.
type expoHistogram[N int64 | float64] struct {
noSum bool
noMinMax bool
maxSize int
maxScale int32
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex
start time.Time
}
func (e *expoHistogram[N]) measure(
ctx context.Context,
value N,
fltrAttr attribute.Set,
droppedAttr []attribute.KeyValue,
) {
// Ignore NaN and infinity.
if math.IsInf(float64(value), 0) || math.IsNaN(float64(value)) {
return
}
e.valuesMu.Lock()
defer e.valuesMu.Unlock()
attr := e.limit.Attributes(fltrAttr, e.values)
v, ok := e.values[attr.Equivalent()]
if !ok {
v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum)
v.res = e.newRes(attr)
e.values[attr.Equivalent()] = v
}
v.record(value)
v.res.Offer(ctx, value, droppedAttr)
}
func (e *expoHistogram[N]) delta(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// If *dest is not a metricdata.ExponentialHistogram, memory reuse is missed.
// In that case, use the zero-value h and hope for better alignment next cycle.
h, _ := (*dest).(metricdata.ExponentialHistogram[N])
h.Temporality = metricdata.DeltaTemporality
e.valuesMu.Lock()
defer e.valuesMu.Unlock()
n := len(e.values)
hDPts := reset(h.DataPoints, n, n)
var i int
for _, val := range e.values {
hDPts[i].Attributes = val.attrs
hDPts[i].StartTime = e.start
hDPts[i].Time = t
hDPts[i].Count = val.count
hDPts[i].Scale = val.scale
hDPts[i].ZeroCount = val.zeroCount
hDPts[i].ZeroThreshold = 0.0
hDPts[i].PositiveBucket.Offset = val.posBuckets.startBin
hDPts[i].PositiveBucket.Counts = reset(
hDPts[i].PositiveBucket.Counts,
len(val.posBuckets.counts),
len(val.posBuckets.counts),
)
copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts)
hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin
hDPts[i].NegativeBucket.Counts = reset(
hDPts[i].NegativeBucket.Counts,
len(val.negBuckets.counts),
len(val.negBuckets.counts),
)
copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts)
if !e.noSum {
hDPts[i].Sum = val.sum
}
if !e.noMinMax {
hDPts[i].Min = metricdata.NewExtrema(val.min)
hDPts[i].Max = metricdata.NewExtrema(val.max)
}
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
i++
}
// Unused attribute sets do not report.
clear(e.values)
e.start = t
h.DataPoints = hDPts
*dest = h
return n
}
func (e *expoHistogram[N]) cumulative(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// If *dest is not a metricdata.ExponentialHistogram, memory reuse is missed.
// In that case, use the zero-value h and hope for better alignment next cycle.
h, _ := (*dest).(metricdata.ExponentialHistogram[N])
h.Temporality = metricdata.CumulativeTemporality
e.valuesMu.Lock()
defer e.valuesMu.Unlock()
n := len(e.values)
hDPts := reset(h.DataPoints, n, n)
var i int
for _, val := range e.values {
hDPts[i].Attributes = val.attrs
hDPts[i].StartTime = e.start
hDPts[i].Time = t
hDPts[i].Count = val.count
hDPts[i].Scale = val.scale
hDPts[i].ZeroCount = val.zeroCount
hDPts[i].ZeroThreshold = 0.0
hDPts[i].PositiveBucket.Offset = val.posBuckets.startBin
hDPts[i].PositiveBucket.Counts = reset(
hDPts[i].PositiveBucket.Counts,
len(val.posBuckets.counts),
len(val.posBuckets.counts),
)
copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts)
hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin
hDPts[i].NegativeBucket.Counts = reset(
hDPts[i].NegativeBucket.Counts,
len(val.negBuckets.counts),
len(val.negBuckets.counts),
)
copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts)
if !e.noSum {
hDPts[i].Sum = val.sum
}
if !e.noMinMax {
hDPts[i].Min = metricdata.NewExtrema(val.min)
hDPts[i].Max = metricdata.NewExtrema(val.max)
}
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
i++
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
}
h.DataPoints = hDPts
*dest = h
return n
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)
// FilteredExemplarReservoir wraps a [exemplar.Reservoir] with a filter.
type FilteredExemplarReservoir[N int64 | float64] interface {
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the filter decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
Offer(ctx context.Context, val N, attr []attribute.KeyValue)
// Collect returns all the held exemplars in the reservoir.
Collect(dest *[]exemplar.Exemplar)
}
// filteredExemplarReservoir handles the pre-sampled exemplar of measurements made.
type filteredExemplarReservoir[N int64 | float64] struct {
filter exemplar.Filter
reservoir exemplar.Reservoir
}
// NewFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which only offers values
// that are allowed by the filter.
func NewFilteredExemplarReservoir[N int64 | float64](
f exemplar.Filter,
r exemplar.Reservoir,
) FilteredExemplarReservoir[N] {
return &filteredExemplarReservoir[N]{
filter: f,
reservoir: r,
}
}
func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
if f.filter(ctx) {
// only record the current time if we are sampling this measurement.
f.reservoir.Offer(ctx, time.Now(), exemplar.NewValue(val), attr)
}
}
func (f *filteredExemplarReservoir[N]) Collect(dest *[]exemplar.Exemplar) { f.reservoir.Collect(dest) }
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"slices"
"sort"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
type buckets[N int64 | float64] struct {
attrs attribute.Set
res FilteredExemplarReservoir[N]
counts []uint64
count uint64
total N
min, max N
}
// newBuckets returns buckets with n bins.
func newBuckets[N int64 | float64](attrs attribute.Set, n int) *buckets[N] {
return &buckets[N]{attrs: attrs, counts: make([]uint64, n)}
}
func (b *buckets[N]) sum(value N) { b.total += value }
func (b *buckets[N]) bin(idx int) {
b.counts[idx]++
b.count++
}
func (b *buckets[N]) minMax(value N) {
if value < b.min {
b.min = value
} else if value > b.max {
b.max = value
}
}
// histValues summarizes a set of measurements as an histValues with
// explicitly defined buckets.
type histValues[N int64 | float64] struct {
noMinMax bool
noSum bool
bounds []float64
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[*buckets[N]]
values map[attribute.Distinct]*buckets[N]
valuesMu sync.Mutex
}
func newHistValues[N int64 | float64](
bounds []float64,
noMinMax, noSum bool,
limit int,
r func(attribute.Set) FilteredExemplarReservoir[N],
) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
// complete control over the fix.
b := slices.Clone(bounds)
slices.Sort(b)
return &histValues[N]{
noMinMax: noMinMax,
noSum: noSum,
bounds: b,
newRes: r,
limit: newLimiter[*buckets[N]](limit),
values: make(map[attribute.Distinct]*buckets[N]),
}
}
// Aggregate records the measurement value, scoped by attr, and aggregates it
// into a histogram.
func (s *histValues[N]) measure(
ctx context.Context,
value N,
fltrAttr attribute.Set,
droppedAttr []attribute.KeyValue,
) {
// This search will return an index in the range [0, len(s.bounds)], where
// it will return len(s.bounds) if value is greater than the last element
// of s.bounds. This aligns with the buckets in that the length of buckets
// is len(s.bounds)+1, with the last bucket representing:
// (s.bounds[len(s.bounds)-1], +∞).
idx := sort.SearchFloat64s(s.bounds, float64(value))
s.valuesMu.Lock()
defer s.valuesMu.Unlock()
attr := s.limit.Attributes(fltrAttr, s.values)
b, ok := s.values[attr.Equivalent()]
if !ok {
// N+1 buckets. For example:
//
// bounds = [0, 5, 10]
//
// Then,
//
// buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
b = newBuckets[N](attr, len(s.bounds)+1)
b.res = s.newRes(attr)
// Ensure min and max are recorded values (not zero), for new buckets.
b.min, b.max = value, value
s.values[attr.Equivalent()] = b
}
b.bin(idx)
if !s.noMinMax {
b.minMax(value)
}
if !s.noSum {
b.sum(value)
}
b.res.Offer(ctx, value, droppedAttr)
}
// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
func newHistogram[N int64 | float64](
boundaries []float64,
noMinMax, noSum bool,
limit int,
r func(attribute.Set) FilteredExemplarReservoir[N],
) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](boundaries, noMinMax, noSum, limit, r),
start: now(),
}
}
// histogram summarizes a set of measurements as an histogram with explicitly
// defined buckets.
type histogram[N int64 | float64] struct {
*histValues[N]
start time.Time
}
func (s *histogram[N]) delta(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// If *dest is not a metricdata.Histogram, memory reuse is missed. In that
// case, use the zero-value h and hope for better alignment next cycle.
h, _ := (*dest).(metricdata.Histogram[N])
h.Temporality = metricdata.DeltaTemporality
s.valuesMu.Lock()
defer s.valuesMu.Unlock()
// Do not allow modification of our copy of bounds.
bounds := slices.Clone(s.bounds)
n := len(s.values)
hDPts := reset(h.DataPoints, n, n)
var i int
for _, val := range s.values {
hDPts[i].Attributes = val.attrs
hDPts[i].StartTime = s.start
hDPts[i].Time = t
hDPts[i].Count = val.count
hDPts[i].Bounds = bounds
hDPts[i].BucketCounts = val.counts
if !s.noSum {
hDPts[i].Sum = val.total
}
if !s.noMinMax {
hDPts[i].Min = metricdata.NewExtrema(val.min)
hDPts[i].Max = metricdata.NewExtrema(val.max)
}
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
i++
}
// Unused attribute sets do not report.
clear(s.values)
// The delta collection cycle resets.
s.start = t
h.DataPoints = hDPts
*dest = h
return n
}
func (s *histogram[N]) cumulative(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// If *dest is not a metricdata.Histogram, memory reuse is missed. In that
// case, use the zero-value h and hope for better alignment next cycle.
h, _ := (*dest).(metricdata.Histogram[N])
h.Temporality = metricdata.CumulativeTemporality
s.valuesMu.Lock()
defer s.valuesMu.Unlock()
// Do not allow modification of our copy of bounds.
bounds := slices.Clone(s.bounds)
n := len(s.values)
hDPts := reset(h.DataPoints, n, n)
var i int
for _, val := range s.values {
hDPts[i].Attributes = val.attrs
hDPts[i].StartTime = s.start
hDPts[i].Time = t
hDPts[i].Count = val.count
hDPts[i].Bounds = bounds
// The HistogramDataPoint field values returned need to be copies of
// the buckets value as we will keep updating them.
//
// TODO (#3047): Making copies for bounds and counts incurs a large
// memory allocation footprint. Alternatives should be explored.
hDPts[i].BucketCounts = slices.Clone(val.counts)
if !s.noSum {
hDPts[i].Sum = val.total
}
if !s.noMinMax {
hDPts[i].Min = metricdata.NewExtrema(val.min)
hDPts[i].Max = metricdata.NewExtrema(val.max)
}
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
i++
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
}
h.DataPoints = hDPts
*dest = h
return n
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// datapoint is timestamped measurement data.
type datapoint[N int64 | float64] struct {
attrs attribute.Set
value N
res FilteredExemplarReservoir[N]
}
func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *lastValue[N] {
return &lastValue[N]{
newRes: r,
limit: newLimiter[datapoint[N]](limit),
values: make(map[attribute.Distinct]datapoint[N]),
start: now(),
}
}
// lastValue summarizes a set of measurements as the last one made.
type lastValue[N int64 | float64] struct {
sync.Mutex
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[datapoint[N]]
values map[attribute.Distinct]datapoint[N]
start time.Time
}
func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
s.Lock()
defer s.Unlock()
attr := s.limit.Attributes(fltrAttr, s.values)
d, ok := s.values[attr.Equivalent()]
if !ok {
d.res = s.newRes(attr)
}
d.attrs = attr
d.value = value
d.res.Offer(ctx, value, droppedAttr)
s.values[attr.Equivalent()] = d
}
func (s *lastValue[N]) delta(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
s.Lock()
defer s.Unlock()
n := s.copyDpts(&gData.DataPoints, t)
// Do not report stale values.
clear(s.values)
// Update start time for delta temporality.
s.start = t
*dest = gData
return n
}
func (s *lastValue[N]) cumulative(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
s.Lock()
defer s.Unlock()
n := s.copyDpts(&gData.DataPoints, t)
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
*dest = gData
return n
}
// copyDpts copies the datapoints held by s into dest. The number of datapoints
// copied is returned.
func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) int {
n := len(s.values)
*dest = reset(*dest, n, n)
var i int
for _, v := range s.values {
(*dest)[i].Attributes = v.attrs
(*dest)[i].StartTime = s.start
(*dest)[i].Time = t
(*dest)[i].Value = v.value
collectExemplars(&(*dest)[i].Exemplars, v.res.Collect)
i++
}
return n
}
// newPrecomputedLastValue returns an aggregator that summarizes a set of
// observations as the last one made.
func newPrecomputedLastValue[N int64 | float64](
limit int,
r func(attribute.Set) FilteredExemplarReservoir[N],
) *precomputedLastValue[N] {
return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)}
}
// precomputedLastValue summarizes a set of observations as the last one made.
type precomputedLastValue[N int64 | float64] struct {
*lastValue[N]
}
func (s *precomputedLastValue[N]) delta(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
s.Lock()
defer s.Unlock()
n := s.copyDpts(&gData.DataPoints, t)
// Do not report stale values.
clear(s.values)
// Update start time for delta temporality.
s.start = t
*dest = gData
return n
}
func (s *precomputedLastValue[N]) cumulative(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
s.Lock()
defer s.Unlock()
n := s.copyDpts(&gData.DataPoints, t)
// Do not report stale values.
clear(s.values)
*dest = gData
return n
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import "go.opentelemetry.io/otel/attribute"
// overflowSet is the attribute set used to record a measurement when adding
// another distinct attribute set to the aggregate would exceed the aggregate
// limit.
var overflowSet = attribute.NewSet(attribute.Bool("otel.metric.overflow", true))
// limiter limits aggregate values.
type limiter[V any] struct {
// aggLimit is the maximum number of metric streams that can be aggregated.
//
// Any metric stream with attributes distinct from any set already
// aggregated once the aggLimit will be meet will instead be aggregated
// into an "overflow" metric stream. That stream will only contain the
// "otel.metric.overflow"=true attribute.
aggLimit int
}
// newLimiter returns a new Limiter with the provided aggregation limit.
func newLimiter[V any](aggregation int) limiter[V] {
return limiter[V]{aggLimit: aggregation}
}
// Attributes checks if adding a measurement for attrs will exceed the
// aggregation cardinality limit for the existing measurements. If it will,
// overflowSet is returned. Otherwise, if it will not exceed the limit, or the
// limit is not set (limit <= 0), attr is returned.
func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]V) attribute.Set {
if l.aggLimit > 0 {
_, exists := measurements[attrs.Equivalent()]
if !exists && len(measurements) >= l.aggLimit-1 {
return overflowSet
}
}
return attrs
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
type sumValue[N int64 | float64] struct {
n N
res FilteredExemplarReservoir[N]
attrs attribute.Set
}
// valueMap is the storage for sums.
type valueMap[N int64 | float64] struct {
sync.Mutex
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[sumValue[N]]
values map[attribute.Distinct]sumValue[N]
}
func newValueMap[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *valueMap[N] {
return &valueMap[N]{
newRes: r,
limit: newLimiter[sumValue[N]](limit),
values: make(map[attribute.Distinct]sumValue[N]),
}
}
func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
s.Lock()
defer s.Unlock()
attr := s.limit.Attributes(fltrAttr, s.values)
v, ok := s.values[attr.Equivalent()]
if !ok {
v.res = s.newRes(attr)
}
v.attrs = attr
v.n += value
v.res.Offer(ctx, value, droppedAttr)
s.values[attr.Equivalent()] = v
}
// newSum returns an aggregator that summarizes a set of measurements as their
// arithmetic sum. Each sum is scoped by attributes and the aggregation cycle
// the measurements were made in.
func newSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *sum[N] {
return &sum[N]{
valueMap: newValueMap[N](limit, r),
monotonic: monotonic,
start: now(),
}
}
// sum summarizes a set of measurements made as their arithmetic sum.
type sum[N int64 | float64] struct {
*valueMap[N]
monotonic bool
start time.Time
}
func (s *sum[N]) delta(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
// use the zero-value sData and hope for better alignment next cycle.
sData, _ := (*dest).(metricdata.Sum[N])
sData.Temporality = metricdata.DeltaTemporality
sData.IsMonotonic = s.monotonic
s.Lock()
defer s.Unlock()
n := len(s.values)
dPts := reset(sData.DataPoints, n, n)
var i int
for _, val := range s.values {
dPts[i].Attributes = val.attrs
dPts[i].StartTime = s.start
dPts[i].Time = t
dPts[i].Value = val.n
collectExemplars(&dPts[i].Exemplars, val.res.Collect)
i++
}
// Do not report stale values.
clear(s.values)
// The delta collection cycle resets.
s.start = t
sData.DataPoints = dPts
*dest = sData
return n
}
func (s *sum[N]) cumulative(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
// use the zero-value sData and hope for better alignment next cycle.
sData, _ := (*dest).(metricdata.Sum[N])
sData.Temporality = metricdata.CumulativeTemporality
sData.IsMonotonic = s.monotonic
s.Lock()
defer s.Unlock()
n := len(s.values)
dPts := reset(sData.DataPoints, n, n)
var i int
for _, value := range s.values {
dPts[i].Attributes = value.attrs
dPts[i].StartTime = s.start
dPts[i].Time = t
dPts[i].Value = value.n
collectExemplars(&dPts[i].Exemplars, value.res.Collect)
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
i++
}
sData.DataPoints = dPts
*dest = sData
return n
}
// newPrecomputedSum returns an aggregator that summarizes a set of
// observations as their arithmetic sum. Each sum is scoped by attributes and
// the aggregation cycle the measurements were made in.
func newPrecomputedSum[N int64 | float64](
monotonic bool,
limit int,
r func(attribute.Set) FilteredExemplarReservoir[N],
) *precomputedSum[N] {
return &precomputedSum[N]{
valueMap: newValueMap[N](limit, r),
monotonic: monotonic,
start: now(),
}
}
// precomputedSum summarizes a set of observations as their arithmetic sum.
type precomputedSum[N int64 | float64] struct {
*valueMap[N]
monotonic bool
start time.Time
reported map[attribute.Distinct]N
}
func (s *precomputedSum[N]) delta(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
newReported := make(map[attribute.Distinct]N)
// If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
// use the zero-value sData and hope for better alignment next cycle.
sData, _ := (*dest).(metricdata.Sum[N])
sData.Temporality = metricdata.DeltaTemporality
sData.IsMonotonic = s.monotonic
s.Lock()
defer s.Unlock()
n := len(s.values)
dPts := reset(sData.DataPoints, n, n)
var i int
for key, value := range s.values {
delta := value.n - s.reported[key]
dPts[i].Attributes = value.attrs
dPts[i].StartTime = s.start
dPts[i].Time = t
dPts[i].Value = delta
collectExemplars(&dPts[i].Exemplars, value.res.Collect)
newReported[key] = value.n
i++
}
// Unused attribute sets do not report.
clear(s.values)
s.reported = newReported
// The delta collection cycle resets.
s.start = t
sData.DataPoints = dPts
*dest = sData
return n
}
func (s *precomputedSum[N]) cumulative(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
// If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
// use the zero-value sData and hope for better alignment next cycle.
sData, _ := (*dest).(metricdata.Sum[N])
sData.Temporality = metricdata.CumulativeTemporality
sData.IsMonotonic = s.monotonic
s.Lock()
defer s.Unlock()
n := len(s.values)
dPts := reset(sData.DataPoints, n, n)
var i int
for _, val := range s.values {
dPts[i].Attributes = val.attrs
dPts[i].StartTime = s.start
dPts[i].Time = t
dPts[i].Value = val.n
collectExemplars(&dPts[i].Exemplars, val.res.Collect)
i++
}
// Unused attribute sets do not report.
clear(s.values)
sData.DataPoints = dPts
*dest = sData
return n
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package metricdata provides types for the metric SDK data model.
package metricdata // import "go.opentelemetry.io/otel/sdk/metric/metricdata"
import (
"encoding/json"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/resource"
)
// ResourceMetrics is a collection of ScopeMetrics and the associated Resource
// that created them.
type ResourceMetrics struct {
// Resource represents the entity that collected the metrics.
Resource *resource.Resource
// ScopeMetrics are the collection of metrics with unique Scopes.
ScopeMetrics []ScopeMetrics
}
// ScopeMetrics is a collection of Metrics Produces by a Meter.
type ScopeMetrics struct {
// Scope is the Scope that the Meter was created with.
Scope instrumentation.Scope
// Metrics are a list of aggregations created by the Meter.
Metrics []Metrics
}
// Metrics is a collection of one or more aggregated timeseries from an Instrument.
type Metrics struct {
// Name is the name of the Instrument that created this data.
Name string
// Description is the description of the Instrument, which can be used in documentation.
Description string
// Unit is the unit in which the Instrument reports.
Unit string
// Data is the aggregated data from an Instrument.
Data Aggregation
}
// Aggregation is the store of data reported by an Instrument.
// It will be one of: Gauge, Sum, Histogram.
type Aggregation interface {
privateAggregation()
}
// Gauge represents a measurement of the current value of an instrument.
type Gauge[N int64 | float64] struct {
// DataPoints are the individual aggregated measurements with unique
// Attributes.
DataPoints []DataPoint[N]
}
func (Gauge[N]) privateAggregation() {}
// Sum represents the sum of all measurements of values from an instrument.
type Sum[N int64 | float64] struct {
// DataPoints are the individual aggregated measurements with unique
// Attributes.
DataPoints []DataPoint[N]
// Temporality describes if the aggregation is reported as the change from the
// last report time, or the cumulative changes since a fixed start time.
Temporality Temporality
// IsMonotonic represents if this aggregation only increases or decreases.
IsMonotonic bool
}
func (Sum[N]) privateAggregation() {}
// DataPoint is a single data point in a timeseries.
type DataPoint[N int64 | float64] struct {
// Attributes is the set of key value pairs that uniquely identify the
// timeseries.
Attributes attribute.Set
// StartTime is when the timeseries was started. (optional)
StartTime time.Time `json:",omitempty"`
// Time is the time when the timeseries was recorded. (optional)
Time time.Time `json:",omitempty"`
// Value is the value of this data point.
Value N
// Exemplars is the sampled Exemplars collected during the timeseries.
Exemplars []Exemplar[N] `json:",omitempty"`
}
// Histogram represents the histogram of all measurements of values from an instrument.
type Histogram[N int64 | float64] struct {
// DataPoints are the individual aggregated measurements with unique
// Attributes.
DataPoints []HistogramDataPoint[N]
// Temporality describes if the aggregation is reported as the change from the
// last report time, or the cumulative changes since a fixed start time.
Temporality Temporality
}
func (Histogram[N]) privateAggregation() {}
// HistogramDataPoint is a single histogram data point in a timeseries.
type HistogramDataPoint[N int64 | float64] struct {
// Attributes is the set of key value pairs that uniquely identify the
// timeseries.
Attributes attribute.Set
// StartTime is when the timeseries was started.
StartTime time.Time
// Time is the time when the timeseries was recorded.
Time time.Time
// Count is the number of updates this histogram has been calculated with.
Count uint64
// Bounds are the upper bounds of the buckets of the histogram. Because the
// last boundary is +infinity this one is implied.
Bounds []float64
// BucketCounts is the count of each of the buckets.
BucketCounts []uint64
// Min is the minimum value recorded. (optional)
Min Extrema[N]
// Max is the maximum value recorded. (optional)
Max Extrema[N]
// Sum is the sum of the values recorded.
Sum N
// Exemplars is the sampled Exemplars collected during the timeseries.
Exemplars []Exemplar[N] `json:",omitempty"`
}
// ExponentialHistogram represents the histogram of all measurements of values from an instrument.
type ExponentialHistogram[N int64 | float64] struct {
// DataPoints are the individual aggregated measurements with unique
// attributes.
DataPoints []ExponentialHistogramDataPoint[N]
// Temporality describes if the aggregation is reported as the change from the
// last report time, or the cumulative changes since a fixed start time.
Temporality Temporality
}
func (ExponentialHistogram[N]) privateAggregation() {}
// ExponentialHistogramDataPoint is a single exponential histogram data point in a timeseries.
type ExponentialHistogramDataPoint[N int64 | float64] struct {
// Attributes is the set of key value pairs that uniquely identify the
// timeseries.
Attributes attribute.Set
// StartTime is when the timeseries was started.
StartTime time.Time
// Time is the time when the timeseries was recorded.
Time time.Time
// Count is the number of updates this histogram has been calculated with.
Count uint64
// Min is the minimum value recorded. (optional)
Min Extrema[N]
// Max is the maximum value recorded. (optional)
Max Extrema[N]
// Sum is the sum of the values recorded.
Sum N
// Scale describes the resolution of the histogram. Boundaries are
// located at powers of the base, where:
//
// base = 2 ^ (2 ^ -Scale)
Scale int32
// ZeroCount is the number of values whose absolute value
// is less than or equal to [ZeroThreshold].
// When ZeroThreshold is 0, this is the number of values that
// cannot be expressed using the standard exponential formula
// as well as values that have been rounded to zero.
// ZeroCount represents the special zero count bucket.
ZeroCount uint64
// PositiveBucket is range of positive value bucket counts.
PositiveBucket ExponentialBucket
// NegativeBucket is range of negative value bucket counts.
NegativeBucket ExponentialBucket
// ZeroThreshold is the width of the zero region. Where the zero region is
// defined as the closed interval [-ZeroThreshold, ZeroThreshold].
ZeroThreshold float64
// Exemplars is the sampled Exemplars collected during the timeseries.
Exemplars []Exemplar[N] `json:",omitempty"`
}
// ExponentialBucket are a set of bucket counts, encoded in a contiguous array
// of counts.
type ExponentialBucket struct {
// Offset is the bucket index of the first entry in the Counts slice.
Offset int32
// Counts is an slice where Counts[i] carries the count of the bucket at
// index (Offset+i). Counts[i] is the count of values greater than
// base^(Offset+i) and less than or equal to base^(Offset+i+1).
Counts []uint64
}
// Extrema is the minimum or maximum value of a dataset.
type Extrema[N int64 | float64] struct {
value N
valid bool
}
// MarshalText converts the Extrema value to text.
func (e Extrema[N]) MarshalText() ([]byte, error) {
if !e.valid {
return json.Marshal(nil)
}
return json.Marshal(e.value)
}
// MarshalJSON converts the Extrema value to JSON number.
func (e *Extrema[N]) MarshalJSON() ([]byte, error) {
return e.MarshalText()
}
// NewExtrema returns an Extrema set to v.
func NewExtrema[N int64 | float64](v N) Extrema[N] {
return Extrema[N]{value: v, valid: true}
}
// Value returns the Extrema value and true if the Extrema is defined.
// Otherwise, if the Extrema is its zero-value, defined will be false.
func (e Extrema[N]) Value() (v N, defined bool) {
return e.value, e.valid
}
// Exemplar is a measurement sampled from a timeseries providing a typical
// example.
type Exemplar[N int64 | float64] struct {
// FilteredAttributes are the attributes recorded with the measurement but
// filtered out of the timeseries' aggregated data.
FilteredAttributes []attribute.KeyValue
// Time is the time when the measurement was recorded.
Time time.Time
// Value is the measured value.
Value N
// SpanID is the ID of the span that was active during the measurement. If
// no span was active or the span was not sampled this will be empty.
SpanID []byte `json:",omitempty"`
// TraceID is the ID of the trace the active span belonged to during the
// measurement. If no span was active or the span was not sampled this will
// be empty.
TraceID []byte `json:",omitempty"`
}
// Summary metric data are used to convey quantile summaries,
// a Prometheus (see: https://prometheus.io/docs/concepts/metric_types/#summary)
// data type.
//
// These data points cannot always be merged in a meaningful way. The Summary
// type is only used by bridges from other metrics libraries, and cannot be
// produced using OpenTelemetry instrumentation.
type Summary struct {
// DataPoints are the individual aggregated measurements with unique
// attributes.
DataPoints []SummaryDataPoint
}
func (Summary) privateAggregation() {}
// SummaryDataPoint is a single data point in a timeseries that describes the
// time-varying values of a Summary metric.
type SummaryDataPoint struct {
// Attributes is the set of key value pairs that uniquely identify the
// timeseries.
Attributes attribute.Set
// StartTime is when the timeseries was started.
StartTime time.Time
// Time is the time when the timeseries was recorded.
Time time.Time
// Count is the number of updates this summary has been calculated with.
Count uint64
// Sum is the sum of the values recorded.
Sum float64
// (Optional) list of values at different quantiles of the distribution calculated
// from the current snapshot. The quantiles must be strictly increasing.
QuantileValues []QuantileValue
}
// QuantileValue is the value at a given quantile of a summary.
type QuantileValue struct {
// Quantile is the quantile of this value.
//
// Must be in the interval [0.0, 1.0].
Quantile float64
// Value is the value at the given quantile of a summary.
//
// Quantile values must NOT be negative.
Value float64
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package metricdatatest provides testing functionality for use with the
// metricdata package.
package metricdatatest // import "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
import (
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// Datatypes are the concrete data-types the metricdata package provides.
type Datatypes interface {
metricdata.DataPoint[float64] |
metricdata.DataPoint[int64] |
metricdata.Gauge[float64] |
metricdata.Gauge[int64] |
metricdata.Histogram[float64] |
metricdata.Histogram[int64] |
metricdata.HistogramDataPoint[float64] |
metricdata.HistogramDataPoint[int64] |
metricdata.Extrema[int64] |
metricdata.Extrema[float64] |
metricdata.Metrics |
metricdata.ResourceMetrics |
metricdata.ScopeMetrics |
metricdata.Sum[float64] |
metricdata.Sum[int64] |
metricdata.Exemplar[float64] |
metricdata.Exemplar[int64] |
metricdata.ExponentialHistogram[float64] |
metricdata.ExponentialHistogram[int64] |
metricdata.ExponentialHistogramDataPoint[float64] |
metricdata.ExponentialHistogramDataPoint[int64] |
metricdata.ExponentialBucket |
metricdata.Summary |
metricdata.SummaryDataPoint |
metricdata.QuantileValue
// Interface types are not allowed in union types, therefore the
// Aggregation and Value type from metricdata are not included here.
}
// TestingT is an interface that implements [testing.T], but without the
// private method of [testing.TB], so other testing packages can rely on it as
// well.
// The methods in this interface must match the [testing.TB] interface.
type TestingT interface {
Helper()
// DO NOT CHANGE: any modification will not be backwards compatible and
// must never be done outside of a new major release.
Error(...any)
// DO NOT CHANGE: any modification will not be backwards compatible and
// must never be done outside of a new major release.
}
type config struct {
ignoreTimestamp bool
ignoreExemplars bool
ignoreValue bool
}
func newConfig(opts []Option) config {
var cfg config
for _, opt := range opts {
cfg = opt.apply(cfg)
}
return cfg
}
// Option allows for fine grain control over how AssertEqual operates.
type Option interface {
apply(cfg config) config
}
type fnOption func(cfg config) config
func (fn fnOption) apply(cfg config) config {
return fn(cfg)
}
// IgnoreTimestamp disables checking if timestamps are different.
func IgnoreTimestamp() Option {
return fnOption(func(cfg config) config {
cfg.ignoreTimestamp = true
return cfg
})
}
// IgnoreExemplars disables checking if Exemplars are different.
func IgnoreExemplars() Option {
return fnOption(func(cfg config) config {
cfg.ignoreExemplars = true
return cfg
})
}
// IgnoreValue disables checking if values are different. This can be
// useful for non-deterministic values, like measured durations.
//
// This will ignore the value and trace information for Exemplars;
// the buckets, zero count, scale, sum, max, min, and counts of
// ExponentialHistogramDataPoints; the buckets, sum, count, max,
// and min of HistogramDataPoints; the value of DataPoints.
func IgnoreValue() Option {
return fnOption(func(cfg config) config {
cfg.ignoreValue = true
return cfg
})
}
// AssertEqual asserts that the two concrete data-types from the metricdata
// package are equal.
func AssertEqual[T Datatypes](t TestingT, expected, actual T, opts ...Option) bool {
t.Helper()
cfg := newConfig(opts)
// Generic types cannot be type asserted. Use an interface instead.
aIface := any(actual)
var r []string
switch e := any(expected).(type) {
case metricdata.Exemplar[int64]:
r = equalExemplars(e, aIface.(metricdata.Exemplar[int64]), cfg)
case metricdata.Exemplar[float64]:
r = equalExemplars(e, aIface.(metricdata.Exemplar[float64]), cfg)
case metricdata.DataPoint[int64]:
r = equalDataPoints(e, aIface.(metricdata.DataPoint[int64]), cfg)
case metricdata.DataPoint[float64]:
r = equalDataPoints(e, aIface.(metricdata.DataPoint[float64]), cfg)
case metricdata.Gauge[int64]:
r = equalGauges(e, aIface.(metricdata.Gauge[int64]), cfg)
case metricdata.Gauge[float64]:
r = equalGauges(e, aIface.(metricdata.Gauge[float64]), cfg)
case metricdata.Histogram[float64]:
r = equalHistograms(e, aIface.(metricdata.Histogram[float64]), cfg)
case metricdata.Histogram[int64]:
r = equalHistograms(e, aIface.(metricdata.Histogram[int64]), cfg)
case metricdata.HistogramDataPoint[float64]:
r = equalHistogramDataPoints(e, aIface.(metricdata.HistogramDataPoint[float64]), cfg)
case metricdata.HistogramDataPoint[int64]:
r = equalHistogramDataPoints(e, aIface.(metricdata.HistogramDataPoint[int64]), cfg)
case metricdata.Extrema[int64]:
r = equalExtrema(e, aIface.(metricdata.Extrema[int64]), cfg)
case metricdata.Extrema[float64]:
r = equalExtrema(e, aIface.(metricdata.Extrema[float64]), cfg)
case metricdata.Metrics:
r = equalMetrics(e, aIface.(metricdata.Metrics), cfg)
case metricdata.ResourceMetrics:
r = equalResourceMetrics(e, aIface.(metricdata.ResourceMetrics), cfg)
case metricdata.ScopeMetrics:
r = equalScopeMetrics(e, aIface.(metricdata.ScopeMetrics), cfg)
case metricdata.Sum[int64]:
r = equalSums(e, aIface.(metricdata.Sum[int64]), cfg)
case metricdata.Sum[float64]:
r = equalSums(e, aIface.(metricdata.Sum[float64]), cfg)
case metricdata.ExponentialHistogram[float64]:
r = equalExponentialHistograms(e, aIface.(metricdata.ExponentialHistogram[float64]), cfg)
case metricdata.ExponentialHistogram[int64]:
r = equalExponentialHistograms(e, aIface.(metricdata.ExponentialHistogram[int64]), cfg)
case metricdata.ExponentialHistogramDataPoint[float64]:
r = equalExponentialHistogramDataPoints(e, aIface.(metricdata.ExponentialHistogramDataPoint[float64]), cfg)
case metricdata.ExponentialHistogramDataPoint[int64]:
r = equalExponentialHistogramDataPoints(e, aIface.(metricdata.ExponentialHistogramDataPoint[int64]), cfg)
case metricdata.ExponentialBucket:
r = equalExponentialBuckets(e, aIface.(metricdata.ExponentialBucket), cfg)
case metricdata.Summary:
r = equalSummary(e, aIface.(metricdata.Summary), cfg)
case metricdata.SummaryDataPoint:
r = equalSummaryDataPoint(e, aIface.(metricdata.SummaryDataPoint), cfg)
case metricdata.QuantileValue:
r = equalQuantileValue(e, aIface.(metricdata.QuantileValue), cfg)
default:
// We control all types passed to this, panic to signal developers
// early they changed things in an incompatible way.
panic(fmt.Sprintf("unknown types: %T", expected))
}
if len(r) > 0 {
t.Error(r)
return false
}
return true
}
// AssertAggregationsEqual asserts that two Aggregations are equal.
func AssertAggregationsEqual(t TestingT, expected, actual metricdata.Aggregation, opts ...Option) bool {
t.Helper()
cfg := newConfig(opts)
if r := equalAggregations(expected, actual, cfg); len(r) > 0 {
t.Error(r)
return false
}
return true
}
// AssertHasAttributes asserts that all Datapoints or HistogramDataPoints have all passed attrs.
func AssertHasAttributes[T Datatypes](t TestingT, actual T, attrs ...attribute.KeyValue) bool {
t.Helper()
var reasons []string
switch e := any(actual).(type) {
case metricdata.Exemplar[int64]:
reasons = hasAttributesExemplars(e, attrs...)
case metricdata.Exemplar[float64]:
reasons = hasAttributesExemplars(e, attrs...)
case metricdata.DataPoint[int64]:
reasons = hasAttributesDataPoints(e, attrs...)
case metricdata.DataPoint[float64]:
reasons = hasAttributesDataPoints(e, attrs...)
case metricdata.Gauge[int64]:
reasons = hasAttributesGauge(e, attrs...)
case metricdata.Gauge[float64]:
reasons = hasAttributesGauge(e, attrs...)
case metricdata.Sum[int64]:
reasons = hasAttributesSum(e, attrs...)
case metricdata.Sum[float64]:
reasons = hasAttributesSum(e, attrs...)
case metricdata.HistogramDataPoint[int64]:
reasons = hasAttributesHistogramDataPoints(e, attrs...)
case metricdata.HistogramDataPoint[float64]:
reasons = hasAttributesHistogramDataPoints(e, attrs...)
case metricdata.Extrema[int64], metricdata.Extrema[float64]:
// Nothing to check.
case metricdata.Histogram[int64]:
reasons = hasAttributesHistogram(e, attrs...)
case metricdata.Histogram[float64]:
reasons = hasAttributesHistogram(e, attrs...)
case metricdata.Metrics:
reasons = hasAttributesMetrics(e, attrs...)
case metricdata.ScopeMetrics:
reasons = hasAttributesScopeMetrics(e, attrs...)
case metricdata.ResourceMetrics:
reasons = hasAttributesResourceMetrics(e, attrs...)
case metricdata.ExponentialHistogram[int64]:
reasons = hasAttributesExponentialHistogram(e, attrs...)
case metricdata.ExponentialHistogram[float64]:
reasons = hasAttributesExponentialHistogram(e, attrs...)
case metricdata.ExponentialHistogramDataPoint[int64]:
reasons = hasAttributesExponentialHistogramDataPoints(e, attrs...)
case metricdata.ExponentialHistogramDataPoint[float64]:
reasons = hasAttributesExponentialHistogramDataPoints(e, attrs...)
case metricdata.ExponentialBucket:
// Nothing to check.
case metricdata.Summary:
reasons = hasAttributesSummary(e, attrs...)
case metricdata.SummaryDataPoint:
reasons = hasAttributesSummaryDataPoint(e, attrs...)
case metricdata.QuantileValue:
// Nothing to check.
default:
// We control all types passed to this, panic to signal developers
// early they changed things in an incompatible way.
panic(fmt.Sprintf("unknown types: %T", actual))
}
if len(reasons) > 0 {
t.Error(reasons)
return false
}
return true
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package metricdatatest // import "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
import (
"bytes"
"fmt"
"reflect"
"slices"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// equalResourceMetrics returns reasons ResourceMetrics are not equal. If they
// are equal, the returned reasons will be empty.
//
// The ScopeMetrics each ResourceMetrics contains are compared based on
// containing the same ScopeMetrics, not the order they are stored in.
func equalResourceMetrics(a, b metricdata.ResourceMetrics, cfg config) (reasons []string) {
if !a.Resource.Equal(b.Resource) {
reasons = append(reasons, notEqualStr("Resources", a.Resource, b.Resource))
}
r := compareDiff(diffSlices(
a.ScopeMetrics,
b.ScopeMetrics,
func(a, b metricdata.ScopeMetrics) bool {
r := equalScopeMetrics(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, "ResourceMetrics ScopeMetrics not equal:\n"+r)
}
return reasons
}
// equalScopeMetrics returns reasons ScopeMetrics are not equal. If they are
// equal, the returned reasons will be empty.
//
// The Metrics each ScopeMetrics contains are compared based on containing the
// same Metrics, not the order they are stored in.
func equalScopeMetrics(a, b metricdata.ScopeMetrics, cfg config) (reasons []string) {
if a.Scope != b.Scope {
reasons = append(reasons, notEqualStr("Scope", a.Scope, b.Scope))
}
r := compareDiff(diffSlices(
a.Metrics,
b.Metrics,
func(a, b metricdata.Metrics) bool {
r := equalMetrics(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, "ScopeMetrics Metrics not equal:\n"+r)
}
return reasons
}
// equalMetrics returns reasons Metrics are not equal. If they are equal, the
// returned reasons will be empty.
func equalMetrics(a, b metricdata.Metrics, cfg config) (reasons []string) {
if a.Name != b.Name {
reasons = append(reasons, notEqualStr("Name", a.Name, b.Name))
}
if a.Description != b.Description {
reasons = append(reasons, notEqualStr("Description", a.Description, b.Description))
}
if a.Unit != b.Unit {
reasons = append(reasons, notEqualStr("Unit", a.Unit, b.Unit))
}
r := equalAggregations(a.Data, b.Data, cfg)
if len(r) > 0 {
reasons = append(reasons, "Metrics Data not equal:")
reasons = append(reasons, r...)
}
return reasons
}
// equalAggregations returns reasons a and b are not equal. If they are equal,
// the returned reasons will be empty.
func equalAggregations(a, b metricdata.Aggregation, cfg config) (reasons []string) {
if a == nil || b == nil {
if a != b {
return []string{notEqualStr("Aggregation", a, b)}
}
return reasons
}
if reflect.TypeOf(a) != reflect.TypeOf(b) {
return []string{fmt.Sprintf("Aggregation types not equal:\nexpected: %T\nactual: %T", a, b)}
}
switch v := a.(type) {
case metricdata.Gauge[int64]:
r := equalGauges(v, b.(metricdata.Gauge[int64]), cfg)
if len(r) > 0 {
reasons = append(reasons, "Gauge[int64] not equal:")
reasons = append(reasons, r...)
}
case metricdata.Gauge[float64]:
r := equalGauges(v, b.(metricdata.Gauge[float64]), cfg)
if len(r) > 0 {
reasons = append(reasons, "Gauge[float64] not equal:")
reasons = append(reasons, r...)
}
case metricdata.Sum[int64]:
r := equalSums(v, b.(metricdata.Sum[int64]), cfg)
if len(r) > 0 {
reasons = append(reasons, "Sum[int64] not equal:")
reasons = append(reasons, r...)
}
case metricdata.Sum[float64]:
r := equalSums(v, b.(metricdata.Sum[float64]), cfg)
if len(r) > 0 {
reasons = append(reasons, "Sum[float64] not equal:")
reasons = append(reasons, r...)
}
case metricdata.Histogram[int64]:
r := equalHistograms(v, b.(metricdata.Histogram[int64]), cfg)
if len(r) > 0 {
reasons = append(reasons, "Histogram not equal:")
reasons = append(reasons, r...)
}
case metricdata.Histogram[float64]:
r := equalHistograms(v, b.(metricdata.Histogram[float64]), cfg)
if len(r) > 0 {
reasons = append(reasons, "Histogram not equal:")
reasons = append(reasons, r...)
}
case metricdata.ExponentialHistogram[int64]:
r := equalExponentialHistograms(v, b.(metricdata.ExponentialHistogram[int64]), cfg)
if len(r) > 0 {
reasons = append(reasons, "ExponentialHistogram not equal:")
reasons = append(reasons, r...)
}
case metricdata.ExponentialHistogram[float64]:
r := equalExponentialHistograms(v, b.(metricdata.ExponentialHistogram[float64]), cfg)
if len(r) > 0 {
reasons = append(reasons, "ExponentialHistogram not equal:")
reasons = append(reasons, r...)
}
case metricdata.Summary:
r := equalSummary(v, b.(metricdata.Summary), cfg)
if len(r) > 0 {
reasons = append(reasons, "Summary not equal:")
reasons = append(reasons, r...)
}
default:
reasons = append(reasons, fmt.Sprintf("Aggregation of unknown types %T", a))
}
return reasons
}
// equalGauges returns reasons Gauges are not equal. If they are equal, the
// returned reasons will be empty.
//
// The DataPoints each Gauge contains are compared based on containing the
// same DataPoints, not the order they are stored in.
func equalGauges[N int64 | float64](a, b metricdata.Gauge[N], cfg config) (reasons []string) {
r := compareDiff(diffSlices(
a.DataPoints,
b.DataPoints,
func(a, b metricdata.DataPoint[N]) bool {
r := equalDataPoints(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, "Gauge DataPoints not equal:\n"+r)
}
return reasons
}
// equalSums returns reasons Sums are not equal. If they are equal, the
// returned reasons will be empty.
//
// The DataPoints each Sum contains are compared based on containing the same
// DataPoints, not the order they are stored in.
func equalSums[N int64 | float64](a, b metricdata.Sum[N], cfg config) (reasons []string) {
if a.Temporality != b.Temporality {
reasons = append(reasons, notEqualStr("Temporality", a.Temporality, b.Temporality))
}
if a.IsMonotonic != b.IsMonotonic {
reasons = append(reasons, notEqualStr("IsMonotonic", a.IsMonotonic, b.IsMonotonic))
}
r := compareDiff(diffSlices(
a.DataPoints,
b.DataPoints,
func(a, b metricdata.DataPoint[N]) bool {
r := equalDataPoints(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, "Sum DataPoints not equal:\n"+r)
}
return reasons
}
// equalHistograms returns reasons Histograms are not equal. If they are
// equal, the returned reasons will be empty.
//
// The DataPoints each Histogram contains are compared based on containing the
// same HistogramDataPoint, not the order they are stored in.
func equalHistograms[N int64 | float64](a, b metricdata.Histogram[N], cfg config) (reasons []string) {
if a.Temporality != b.Temporality {
reasons = append(reasons, notEqualStr("Temporality", a.Temporality, b.Temporality))
}
r := compareDiff(diffSlices(
a.DataPoints,
b.DataPoints,
func(a, b metricdata.HistogramDataPoint[N]) bool {
r := equalHistogramDataPoints(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, "Histogram DataPoints not equal:\n"+r)
}
return reasons
}
// equalDataPoints returns reasons DataPoints are not equal. If they are
// equal, the returned reasons will be empty.
func equalDataPoints[N int64 | float64](
a, b metricdata.DataPoint[N],
cfg config,
) (reasons []string) { // nolint: revive // Intentional internal control flag
if !a.Attributes.Equals(&b.Attributes) {
reasons = append(reasons, notEqualStr(
"Attributes",
a.Attributes.Encoded(attribute.DefaultEncoder()),
b.Attributes.Encoded(attribute.DefaultEncoder()),
))
}
if !cfg.ignoreTimestamp {
if !a.StartTime.Equal(b.StartTime) {
reasons = append(reasons, notEqualStr("StartTime", a.StartTime.UnixNano(), b.StartTime.UnixNano()))
}
if !a.Time.Equal(b.Time) {
reasons = append(reasons, notEqualStr("Time", a.Time.UnixNano(), b.Time.UnixNano()))
}
}
if !cfg.ignoreValue {
if a.Value != b.Value {
reasons = append(reasons, notEqualStr("Value", a.Value, b.Value))
}
}
if !cfg.ignoreExemplars {
r := compareDiff(diffSlices(
a.Exemplars,
b.Exemplars,
func(a, b metricdata.Exemplar[N]) bool {
r := equalExemplars(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, "Exemplars not equal:\n"+r)
}
}
return reasons
}
// equalHistogramDataPoints returns reasons HistogramDataPoints are not equal.
// If they are equal, the returned reasons will be empty.
func equalHistogramDataPoints[N int64 | float64](
a, b metricdata.HistogramDataPoint[N],
cfg config,
) (reasons []string) { // nolint: revive // Intentional internal control flag
if !a.Attributes.Equals(&b.Attributes) {
reasons = append(reasons, notEqualStr(
"Attributes",
a.Attributes.Encoded(attribute.DefaultEncoder()),
b.Attributes.Encoded(attribute.DefaultEncoder()),
))
}
if !cfg.ignoreTimestamp {
if !a.StartTime.Equal(b.StartTime) {
reasons = append(reasons, notEqualStr("StartTime", a.StartTime.UnixNano(), b.StartTime.UnixNano()))
}
if !a.Time.Equal(b.Time) {
reasons = append(reasons, notEqualStr("Time", a.Time.UnixNano(), b.Time.UnixNano()))
}
}
if !cfg.ignoreValue {
if a.Count != b.Count {
reasons = append(reasons, notEqualStr("Count", a.Count, b.Count))
}
if !slices.Equal(a.Bounds, b.Bounds) {
reasons = append(reasons, notEqualStr("Bounds", a.Bounds, b.Bounds))
}
if !slices.Equal(a.BucketCounts, b.BucketCounts) {
reasons = append(reasons, notEqualStr("BucketCounts", a.BucketCounts, b.BucketCounts))
}
if !eqExtrema(a.Min, b.Min) {
reasons = append(reasons, notEqualStr("Min", a.Min, b.Min))
}
if !eqExtrema(a.Max, b.Max) {
reasons = append(reasons, notEqualStr("Max", a.Max, b.Max))
}
if a.Sum != b.Sum {
reasons = append(reasons, notEqualStr("Sum", a.Sum, b.Sum))
}
}
if !cfg.ignoreExemplars {
r := compareDiff(diffSlices(
a.Exemplars,
b.Exemplars,
func(a, b metricdata.Exemplar[N]) bool {
r := equalExemplars(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, "Exemplars not equal:\n"+r)
}
}
return reasons
}
// equalExponentialHistograms returns reasons exponential Histograms are not equal. If they are
// equal, the returned reasons will be empty.
//
// The DataPoints each Histogram contains are compared based on containing the
// same HistogramDataPoint, not the order they are stored in.
func equalExponentialHistograms[N int64 | float64](
a, b metricdata.ExponentialHistogram[N],
cfg config,
) (reasons []string) {
if a.Temporality != b.Temporality {
reasons = append(reasons, notEqualStr("Temporality", a.Temporality, b.Temporality))
}
r := compareDiff(diffSlices(
a.DataPoints,
b.DataPoints,
func(a, b metricdata.ExponentialHistogramDataPoint[N]) bool {
r := equalExponentialHistogramDataPoints(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, "Histogram DataPoints not equal:\n"+r)
}
return reasons
}
// equalExponentialHistogramDataPoints returns reasons HistogramDataPoints are not equal.
// If they are equal, the returned reasons will be empty.
func equalExponentialHistogramDataPoints[N int64 | float64](
a, b metricdata.ExponentialHistogramDataPoint[N],
cfg config,
) (reasons []string) { // nolint: revive // Intentional internal control flag
if !a.Attributes.Equals(&b.Attributes) {
reasons = append(reasons, notEqualStr(
"Attributes",
a.Attributes.Encoded(attribute.DefaultEncoder()),
b.Attributes.Encoded(attribute.DefaultEncoder()),
))
}
if !cfg.ignoreTimestamp {
if !a.StartTime.Equal(b.StartTime) {
reasons = append(reasons, notEqualStr("StartTime", a.StartTime.UnixNano(), b.StartTime.UnixNano()))
}
if !a.Time.Equal(b.Time) {
reasons = append(reasons, notEqualStr("Time", a.Time.UnixNano(), b.Time.UnixNano()))
}
}
if !cfg.ignoreValue {
if a.Count != b.Count {
reasons = append(reasons, notEqualStr("Count", a.Count, b.Count))
}
if !eqExtrema(a.Min, b.Min) {
reasons = append(reasons, notEqualStr("Min", a.Min, b.Min))
}
if !eqExtrema(a.Max, b.Max) {
reasons = append(reasons, notEqualStr("Max", a.Max, b.Max))
}
if a.Sum != b.Sum {
reasons = append(reasons, notEqualStr("Sum", a.Sum, b.Sum))
}
if a.Scale != b.Scale {
reasons = append(reasons, notEqualStr("Scale", a.Scale, b.Scale))
}
if a.ZeroCount != b.ZeroCount {
reasons = append(reasons, notEqualStr("ZeroCount", a.ZeroCount, b.ZeroCount))
}
r := equalExponentialBuckets(a.PositiveBucket, b.PositiveBucket, cfg)
if len(r) > 0 {
reasons = append(reasons, r...)
}
r = equalExponentialBuckets(a.NegativeBucket, b.NegativeBucket, cfg)
if len(r) > 0 {
reasons = append(reasons, r...)
}
}
if !cfg.ignoreExemplars {
r := compareDiff(diffSlices(
a.Exemplars,
b.Exemplars,
func(a, b metricdata.Exemplar[N]) bool {
r := equalExemplars(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, "Exemplars not equal:\n"+r)
}
}
return reasons
}
func equalExponentialBuckets(a, b metricdata.ExponentialBucket, _ config) (reasons []string) {
if a.Offset != b.Offset {
reasons = append(reasons, notEqualStr("Offset", a.Offset, b.Offset))
}
if !slices.Equal(a.Counts, b.Counts) {
reasons = append(reasons, notEqualStr("Counts", a.Counts, b.Counts))
}
return reasons
}
func equalSummary(a, b metricdata.Summary, cfg config) (reasons []string) {
r := compareDiff(diffSlices(
a.DataPoints,
b.DataPoints,
func(a, b metricdata.SummaryDataPoint) bool {
r := equalSummaryDataPoint(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, "Summary DataPoints not equal:\n"+r)
}
return reasons
}
func equalSummaryDataPoint(a, b metricdata.SummaryDataPoint, cfg config) (reasons []string) {
if !a.Attributes.Equals(&b.Attributes) {
reasons = append(reasons, notEqualStr(
"Attributes",
a.Attributes.Encoded(attribute.DefaultEncoder()),
b.Attributes.Encoded(attribute.DefaultEncoder()),
))
}
if !cfg.ignoreTimestamp {
if !a.StartTime.Equal(b.StartTime) {
reasons = append(reasons, notEqualStr("StartTime", a.StartTime.UnixNano(), b.StartTime.UnixNano()))
}
if !a.Time.Equal(b.Time) {
reasons = append(reasons, notEqualStr("Time", a.Time.UnixNano(), b.Time.UnixNano()))
}
}
if !cfg.ignoreValue {
if a.Count != b.Count {
reasons = append(reasons, notEqualStr("Count", a.Count, b.Count))
}
if a.Sum != b.Sum {
reasons = append(reasons, notEqualStr("Sum", a.Sum, b.Sum))
}
r := compareDiff(diffSlices(
a.QuantileValues,
b.QuantileValues,
func(a, b metricdata.QuantileValue) bool {
r := equalQuantileValue(a, b, cfg)
return len(r) == 0
},
))
if r != "" {
reasons = append(reasons, r)
}
}
return reasons
}
func equalQuantileValue(a, b metricdata.QuantileValue, _ config) (reasons []string) {
if a.Quantile != b.Quantile {
reasons = append(reasons, notEqualStr("Quantile", a.Quantile, b.Quantile))
}
if a.Value != b.Value {
reasons = append(reasons, notEqualStr("Value", a.Value, b.Value))
}
return reasons
}
func notEqualStr(prefix string, expected, actual any) string {
return fmt.Sprintf("%s not equal:\nexpected: %v\nactual: %v", prefix, expected, actual)
}
func equalExtrema[N int64 | float64](a, b metricdata.Extrema[N], _ config) (reasons []string) {
if !eqExtrema(a, b) {
reasons = append(reasons, notEqualStr("Extrema", a, b))
}
return reasons
}
func eqExtrema[N int64 | float64](a, b metricdata.Extrema[N]) bool {
aV, aOk := a.Value()
bV, bOk := b.Value()
if !aOk || !bOk {
return aOk == bOk
}
return aV == bV
}
func equalKeyValue(a, b attribute.KeyValue) bool {
if a.Key != b.Key {
return false
}
if a.Value.Type() != b.Value.Type() {
return false
}
switch a.Value.Type() {
case attribute.BOOL:
if a.Value.AsBool() != b.Value.AsBool() {
return false
}
case attribute.INT64:
if a.Value.AsInt64() != b.Value.AsInt64() {
return false
}
case attribute.FLOAT64:
if a.Value.AsFloat64() != b.Value.AsFloat64() {
return false
}
case attribute.STRING:
if a.Value.AsString() != b.Value.AsString() {
return false
}
case attribute.BOOLSLICE:
if ok := slices.Equal(a.Value.AsBoolSlice(), b.Value.AsBoolSlice()); !ok {
return false
}
case attribute.INT64SLICE:
if ok := slices.Equal(a.Value.AsInt64Slice(), b.Value.AsInt64Slice()); !ok {
return false
}
case attribute.FLOAT64SLICE:
if ok := slices.Equal(a.Value.AsFloat64Slice(), b.Value.AsFloat64Slice()); !ok {
return false
}
case attribute.STRINGSLICE:
if ok := slices.Equal(a.Value.AsStringSlice(), b.Value.AsStringSlice()); !ok {
return false
}
default:
// We control all types passed to this, panic to signal developers
// early they changed things in an incompatible way.
panic(fmt.Sprintf("unknown attribute value type: %s", a.Value.Type()))
}
return true
}
func equalExemplars[N int64 | float64](a, b metricdata.Exemplar[N], cfg config) (reasons []string) {
if !slices.EqualFunc(a.FilteredAttributes, b.FilteredAttributes, equalKeyValue) {
reasons = append(reasons, notEqualStr("FilteredAttributes", a.FilteredAttributes, b.FilteredAttributes))
}
if !cfg.ignoreTimestamp {
if !a.Time.Equal(b.Time) {
reasons = append(reasons, notEqualStr("Time", a.Time.UnixNano(), b.Time.UnixNano()))
}
}
if !cfg.ignoreValue {
if a.Value != b.Value {
reasons = append(reasons, notEqualStr("Value", a.Value, b.Value))
}
}
if !slices.Equal(a.SpanID, b.SpanID) {
reasons = append(reasons, notEqualStr("SpanID", a.SpanID, b.SpanID))
}
if !slices.Equal(a.TraceID, b.TraceID) {
reasons = append(reasons, notEqualStr("TraceID", a.TraceID, b.TraceID))
}
return reasons
}
func diffSlices[T any](a, b []T, equal func(T, T) bool) (extraA, extraB []T) {
visited := make([]bool, len(b))
for i := range a {
found := false
for j := range b {
if visited[j] {
continue
}
if equal(a[i], b[j]) {
visited[j] = true
found = true
break
}
}
if !found {
extraA = append(extraA, a[i])
}
}
for j := range b {
if visited[j] {
continue
}
extraB = append(extraB, b[j])
}
return extraA, extraB
}
func compareDiff[T any](extraExpected, extraActual []T) string {
if len(extraExpected) == 0 && len(extraActual) == 0 {
return ""
}
formatter := func(v T) string {
return fmt.Sprintf("%#v", v)
}
var msg bytes.Buffer
if len(extraExpected) > 0 {
_, _ = msg.WriteString("missing expected values:\n")
for _, v := range extraExpected {
_, _ = msg.WriteString(formatter(v) + "\n")
}
}
if len(extraActual) > 0 {
_, _ = msg.WriteString("unexpected additional values:\n")
for _, v := range extraActual {
_, _ = msg.WriteString(formatter(v) + "\n")
}
}
return msg.String()
}
func missingAttrStr(name string) string {
return "missing attribute " + name
}
func hasAttributesExemplars[T int64 | float64](
exemplar metricdata.Exemplar[T],
attrs ...attribute.KeyValue,
) (reasons []string) {
s := attribute.NewSet(exemplar.FilteredAttributes...)
for _, attr := range attrs {
val, ok := s.Value(attr.Key)
if !ok {
reasons = append(reasons, missingAttrStr(string(attr.Key)))
continue
}
if val != attr.Value {
reasons = append(reasons, notEqualStr(string(attr.Key), attr.Value.Emit(), val.Emit()))
}
}
return reasons
}
func hasAttributesDataPoints[T int64 | float64](
dp metricdata.DataPoint[T],
attrs ...attribute.KeyValue,
) (reasons []string) {
for _, attr := range attrs {
val, ok := dp.Attributes.Value(attr.Key)
if !ok {
reasons = append(reasons, missingAttrStr(string(attr.Key)))
continue
}
if val != attr.Value {
reasons = append(reasons, notEqualStr(string(attr.Key), attr.Value.Emit(), val.Emit()))
}
}
return reasons
}
func hasAttributesGauge[T int64 | float64](gauge metricdata.Gauge[T], attrs ...attribute.KeyValue) (reasons []string) {
for n, dp := range gauge.DataPoints {
reas := hasAttributesDataPoints(dp, attrs...)
if len(reas) > 0 {
reasons = append(reasons, fmt.Sprintf("gauge datapoint %d attributes:\n", n))
reasons = append(reasons, reas...)
}
}
return reasons
}
func hasAttributesSum[T int64 | float64](sum metricdata.Sum[T], attrs ...attribute.KeyValue) (reasons []string) {
for n, dp := range sum.DataPoints {
reas := hasAttributesDataPoints(dp, attrs...)
if len(reas) > 0 {
reasons = append(reasons, fmt.Sprintf("sum datapoint %d attributes:\n", n))
reasons = append(reasons, reas...)
}
}
return reasons
}
func hasAttributesHistogramDataPoints[T int64 | float64](
dp metricdata.HistogramDataPoint[T],
attrs ...attribute.KeyValue,
) (reasons []string) {
for _, attr := range attrs {
val, ok := dp.Attributes.Value(attr.Key)
if !ok {
reasons = append(reasons, missingAttrStr(string(attr.Key)))
continue
}
if val != attr.Value {
reasons = append(reasons, notEqualStr(string(attr.Key), attr.Value.Emit(), val.Emit()))
}
}
return reasons
}
func hasAttributesHistogram[T int64 | float64](
histogram metricdata.Histogram[T],
attrs ...attribute.KeyValue,
) (reasons []string) {
for n, dp := range histogram.DataPoints {
reas := hasAttributesHistogramDataPoints(dp, attrs...)
if len(reas) > 0 {
reasons = append(reasons, fmt.Sprintf("histogram datapoint %d attributes:\n", n))
reasons = append(reasons, reas...)
}
}
return reasons
}
func hasAttributesExponentialHistogramDataPoints[T int64 | float64](
dp metricdata.ExponentialHistogramDataPoint[T],
attrs ...attribute.KeyValue,
) (reasons []string) {
for _, attr := range attrs {
val, ok := dp.Attributes.Value(attr.Key)
if !ok {
reasons = append(reasons, missingAttrStr(string(attr.Key)))
continue
}
if val != attr.Value {
reasons = append(reasons, notEqualStr(string(attr.Key), attr.Value.Emit(), val.Emit()))
}
}
return reasons
}
func hasAttributesExponentialHistogram[T int64 | float64](
histogram metricdata.ExponentialHistogram[T],
attrs ...attribute.KeyValue,
) (reasons []string) {
for n, dp := range histogram.DataPoints {
reas := hasAttributesExponentialHistogramDataPoints(dp, attrs...)
if len(reas) > 0 {
reasons = append(reasons, fmt.Sprintf("histogram datapoint %d attributes:\n", n))
reasons = append(reasons, reas...)
}
}
return reasons
}
func hasAttributesAggregation(agg metricdata.Aggregation, attrs ...attribute.KeyValue) (reasons []string) {
switch agg := agg.(type) {
case metricdata.Gauge[int64]:
reasons = hasAttributesGauge(agg, attrs...)
case metricdata.Gauge[float64]:
reasons = hasAttributesGauge(agg, attrs...)
case metricdata.Sum[int64]:
reasons = hasAttributesSum(agg, attrs...)
case metricdata.Sum[float64]:
reasons = hasAttributesSum(agg, attrs...)
case metricdata.Histogram[int64]:
reasons = hasAttributesHistogram(agg, attrs...)
case metricdata.Histogram[float64]:
reasons = hasAttributesHistogram(agg, attrs...)
case metricdata.ExponentialHistogram[int64]:
reasons = hasAttributesExponentialHistogram(agg, attrs...)
case metricdata.ExponentialHistogram[float64]:
reasons = hasAttributesExponentialHistogram(agg, attrs...)
case metricdata.Summary:
reasons = hasAttributesSummary(agg, attrs...)
default:
reasons = []string{fmt.Sprintf("unknown aggregation %T", agg)}
}
return reasons
}
func hasAttributesMetrics(metrics metricdata.Metrics, attrs ...attribute.KeyValue) (reasons []string) {
reas := hasAttributesAggregation(metrics.Data, attrs...)
if len(reas) > 0 {
reasons = append(reasons, fmt.Sprintf("Metric %s:\n", metrics.Name))
reasons = append(reasons, reas...)
}
return reasons
}
func hasAttributesScopeMetrics(sm metricdata.ScopeMetrics, attrs ...attribute.KeyValue) (reasons []string) {
for n, metrics := range sm.Metrics {
reas := hasAttributesMetrics(metrics, attrs...)
if len(reas) > 0 {
reasons = append(reasons, fmt.Sprintf("ScopeMetrics %s Metrics %d:\n", sm.Scope.Name, n))
reasons = append(reasons, reas...)
}
}
return reasons
}
func hasAttributesResourceMetrics(rm metricdata.ResourceMetrics, attrs ...attribute.KeyValue) (reasons []string) {
for n, sm := range rm.ScopeMetrics {
reas := hasAttributesScopeMetrics(sm, attrs...)
if len(reas) > 0 {
reasons = append(reasons, fmt.Sprintf("ResourceMetrics ScopeMetrics %d:\n", n))
reasons = append(reasons, reas...)
}
}
return reasons
}
func hasAttributesSummary(summary metricdata.Summary, attrs ...attribute.KeyValue) (reasons []string) {
for n, dp := range summary.DataPoints {
reas := hasAttributesSummaryDataPoint(dp, attrs...)
if len(reas) > 0 {
reasons = append(reasons, fmt.Sprintf("summary datapoint %d attributes:\n", n))
reasons = append(reasons, reas...)
}
}
return reasons
}
func hasAttributesSummaryDataPoint(dp metricdata.SummaryDataPoint, attrs ...attribute.KeyValue) (reasons []string) {
for _, attr := range attrs {
val, ok := dp.Attributes.Value(attr.Key)
if !ok {
reasons = append(reasons, missingAttrStr(string(attr.Key)))
continue
}
if val != attr.Value {
reasons = append(reasons, notEqualStr(string(attr.Key), attr.Value.Emit(), val.Emit()))
}
}
return reasons
}
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
//go:generate stringer -type=Temporality
package metricdata // import "go.opentelemetry.io/otel/sdk/metric/metricdata"
// Temporality defines the window that an aggregation was calculated over.
type Temporality uint8
const (
// undefinedTemporality represents an unset Temporality.
//nolint:unused
undefinedTemporality Temporality = iota
// CumulativeTemporality defines a measurement interval that continues to
// expand forward in time from a starting point. New measurements are
// added to all previous measurements since a start time.
CumulativeTemporality
// DeltaTemporality defines a measurement interval that resets each cycle.
// Measurements from one cycle are recorded independently, measurements
// from other cycles do not affect them.
DeltaTemporality
)
// MarshalText returns the byte encoded of t.
func (t Temporality) MarshalText() ([]byte, error) {
return []byte(t.String()), nil
}
// Code generated by "stringer -type=Temporality"; DO NOT EDIT.
package metricdata
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[undefinedTemporality-0]
_ = x[CumulativeTemporality-1]
_ = x[DeltaTemporality-2]
}
const _Temporality_name = "undefinedTemporalityCumulativeTemporalityDeltaTemporality"
var _Temporality_index = [...]uint8{0, 20, 41, 57}
func (i Temporality) String() string {
if i >= Temporality(len(_Temporality_index)-1) {
return "Temporality(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _Temporality_name[_Temporality_index[i]:_Temporality_index[i+1]]
}