// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" import ( "context" "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) // now is used to return the current local time while allowing tests to // override the default time.Now function. var now = time.Now // Measure receives measurements to be aggregated. type Measure[N int64 | float64] func(context.Context, N, attribute.Set) // ComputeAggregation stores the aggregate of measurements into dest and // returns the number of aggregate data-points output. type ComputeAggregation func(dest *metricdata.Aggregation) int // Builder builds an aggregate function. type Builder[N int64 | float64] struct { // Temporality is the temporality used for the returned aggregate function. // // If this is not provided a default of cumulative will be used (except for // the last-value aggregate function where delta is the only appropriate // temporality). Temporality metricdata.Temporality // Filter is the attribute filter the aggregate function will use on the // input of measurements. Filter attribute.Filter // ReservoirFunc is the factory function used by aggregate functions to // create new exemplar reservoirs for a new seen attribute set. // // If this is not provided a default factory function that returns an // dropReservoir reservoir will be used. ReservoirFunc func(attribute.Set) FilteredExemplarReservoir[N] // AggregationLimit is the cardinality limit of measurement attributes. Any // measurement for new attributes once the limit has been reached will be // aggregated into a single aggregate for the "otel.metric.overflow" // attribute. // // If AggregationLimit is less than or equal to zero there will not be an // aggregation limit imposed (i.e. unlimited attribute sets). AggregationLimit int } func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] { if b.ReservoirFunc != nil { return b.ReservoirFunc } return dropReservoir } type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] { if b.Filter != nil { fltr := b.Filter // Copy to make it immutable after assignment. return func(ctx context.Context, n N, a attribute.Set) { fAttr, dropped := a.Filter(fltr) f(ctx, n, fAttr, dropped) } } return func(ctx context.Context, n N, a attribute.Set) { f(ctx, n, a, nil) } } // LastValue returns a last-value aggregate function input and output. func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) { lv := newLastValue[N](b.AggregationLimit, b.resFunc()) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(lv.measure), lv.delta default: return b.filter(lv.measure), lv.cumulative } } // PrecomputedLastValue returns a last-value aggregate function input and // output. The aggregation returned from the returned ComputeAggregation // function will always only return values from the previous collection cycle. func (b Builder[N]) PrecomputedLastValue() (Measure[N], ComputeAggregation) { lv := newPrecomputedLastValue[N](b.AggregationLimit, b.resFunc()) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(lv.measure), lv.delta default: return b.filter(lv.measure), lv.cumulative } } // PrecomputedSum returns a sum aggregate function input and output. The // arguments passed to the input are expected to be the precomputed sum values. func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) { s := newPrecomputedSum[N](monotonic, b.AggregationLimit, b.resFunc()) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(s.measure), s.delta default: return b.filter(s.measure), s.cumulative } } // Sum returns a sum aggregate function input and output. func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) { s := newSum[N](monotonic, b.AggregationLimit, b.resFunc()) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(s.measure), s.delta default: return b.filter(s.measure), s.cumulative } } // ExplicitBucketHistogram returns a histogram aggregate function input and // output. func (b Builder[N]) ExplicitBucketHistogram( boundaries []float64, noMinMax, noSum bool, ) (Measure[N], ComputeAggregation) { h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc()) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(h.measure), h.delta default: return b.filter(h.measure), h.cumulative } } // ExponentialBucketHistogram returns a histogram aggregate function input and // output. func (b Builder[N]) ExponentialBucketHistogram( maxSize, maxScale int32, noMinMax, noSum bool, ) (Measure[N], ComputeAggregation) { h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit, b.resFunc()) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(h.measure), h.delta default: return b.filter(h.measure), h.cumulative } } // reset ensures s has capacity and sets it length. If the capacity of s too // small, a new slice is returned with the specified capacity and length. func reset[T any](s []T, length, capacity int) []T { if cap(s) < capacity { return make([]T, length, capacity) } return s[:length] }
// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" import ( "context" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/exemplar" ) // dropReservoir returns a [FilteredReservoir] that drops all measurements it is offered. func dropReservoir[N int64 | float64](attribute.Set) FilteredExemplarReservoir[N] { return &dropRes[N]{} } type dropRes[N int64 | float64] struct{} // Offer does nothing, all measurements offered will be dropped. func (r *dropRes[N]) Offer(context.Context, N, []attribute.KeyValue) {} // Collect resets dest. No exemplars will ever be returned. func (r *dropRes[N]) Collect(dest *[]exemplar.Exemplar) { clear(*dest) // Erase elements to let GC collect objects *dest = (*dest)[:0] }
// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" import ( "sync" "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) var exemplarPool = sync.Pool{ New: func() any { return new([]exemplar.Exemplar) }, } func collectExemplars[N int64 | float64](out *[]metricdata.Exemplar[N], f func(*[]exemplar.Exemplar)) { dest := exemplarPool.Get().(*[]exemplar.Exemplar) defer func() { clear(*dest) // Erase elements to let GC collect objects. *dest = (*dest)[:0] exemplarPool.Put(dest) }() *dest = reset(*dest, len(*out), cap(*out)) f(dest) *out = reset(*out, len(*dest), cap(*dest)) for i, e := range *dest { (*out)[i].FilteredAttributes = e.FilteredAttributes (*out)[i].Time = e.Time (*out)[i].SpanID = e.SpanID (*out)[i].TraceID = e.TraceID switch e.Value.Type() { case exemplar.Int64ValueType: (*out)[i].Value = N(e.Value.Int64()) case exemplar.Float64ValueType: (*out)[i].Value = N(e.Value.Float64()) } } }
// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" import ( "context" "errors" "math" "sync" "time" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) const ( expoMaxScale = 20 expoMinScale = -10 smallestNonZeroNormalFloat64 = 0x1p-1022 // These redefine the Math constants with a type, so the compiler won't coerce // them into an int on 32 bit platforms. maxInt64 int64 = math.MaxInt64 minInt64 int64 = math.MinInt64 ) // expoHistogramDataPoint is a single data point in an exponential histogram. type expoHistogramDataPoint[N int64 | float64] struct { attrs attribute.Set res FilteredExemplarReservoir[N] count uint64 min N max N sum N maxSize int noMinMax bool noSum bool scale int32 posBuckets expoBuckets negBuckets expoBuckets zeroCount uint64 } func newExpoHistogramDataPoint[N int64 | float64]( attrs attribute.Set, maxSize int, maxScale int32, noMinMax, noSum bool, ) *expoHistogramDataPoint[N] { // nolint:revive // we need this control flag f := math.MaxFloat64 ma := N(f) // if N is int64, max will overflow to -9223372036854775808 mi := N(-f) if N(maxInt64) > N(f) { ma = N(maxInt64) mi = N(minInt64) } return &expoHistogramDataPoint[N]{ attrs: attrs, min: ma, max: mi, maxSize: maxSize, noMinMax: noMinMax, noSum: noSum, scale: maxScale, } } // record adds a new measurement to the histogram. It will rescale the buckets if needed. func (p *expoHistogramDataPoint[N]) record(v N) { p.count++ if !p.noMinMax { if v < p.min { p.min = v } if v > p.max { p.max = v } } if !p.noSum { p.sum += v } absV := math.Abs(float64(v)) if float64(absV) == 0.0 { p.zeroCount++ return } bin := p.getBin(absV) bucket := &p.posBuckets if v < 0 { bucket = &p.negBuckets } // If the new bin would make the counts larger than maxScale, we need to // downscale current measurements. if scaleDelta := p.scaleChange(bin, bucket.startBin, len(bucket.counts)); scaleDelta > 0 { if p.scale-scaleDelta < expoMinScale { // With a scale of -10 there is only two buckets for the whole range of float64 values. // This can only happen if there is a max size of 1. otel.Handle(errors.New("exponential histogram scale underflow")) return } // Downscale p.scale -= scaleDelta p.posBuckets.downscale(scaleDelta) p.negBuckets.downscale(scaleDelta) bin = p.getBin(absV) } bucket.record(bin) } // getBin returns the bin v should be recorded into. func (p *expoHistogramDataPoint[N]) getBin(v float64) int32 { frac, expInt := math.Frexp(v) // 11-bit exponential. exp := int32(expInt) // nolint: gosec if p.scale <= 0 { // Because of the choice of fraction is always 1 power of two higher than we want. var correction int32 = 1 if frac == .5 { // If v is an exact power of two the frac will be .5 and the exp // will be one higher than we want. correction = 2 } return (exp - correction) >> (-p.scale) } return exp<<p.scale + int32(math.Log(frac)*scaleFactors[p.scale]) - 1 } // scaleFactors are constants used in calculating the logarithm index. They are // equivalent to 2^index/log(2). var scaleFactors = [21]float64{ math.Ldexp(math.Log2E, 0), math.Ldexp(math.Log2E, 1), math.Ldexp(math.Log2E, 2), math.Ldexp(math.Log2E, 3), math.Ldexp(math.Log2E, 4), math.Ldexp(math.Log2E, 5), math.Ldexp(math.Log2E, 6), math.Ldexp(math.Log2E, 7), math.Ldexp(math.Log2E, 8), math.Ldexp(math.Log2E, 9), math.Ldexp(math.Log2E, 10), math.Ldexp(math.Log2E, 11), math.Ldexp(math.Log2E, 12), math.Ldexp(math.Log2E, 13), math.Ldexp(math.Log2E, 14), math.Ldexp(math.Log2E, 15), math.Ldexp(math.Log2E, 16), math.Ldexp(math.Log2E, 17), math.Ldexp(math.Log2E, 18), math.Ldexp(math.Log2E, 19), math.Ldexp(math.Log2E, 20), } // scaleChange returns the magnitude of the scale change needed to fit bin in // the bucket. If no scale change is needed 0 is returned. func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin int32, length int) int32 { if length == 0 { // No need to rescale if there are no buckets. return 0 } low := int(startBin) high := int(bin) if startBin >= bin { low = int(bin) high = int(startBin) + length - 1 } var count int32 for high-low >= p.maxSize { low = low >> 1 high = high >> 1 count++ if count > expoMaxScale-expoMinScale { return count } } return count } // expoBuckets is a set of buckets in an exponential histogram. type expoBuckets struct { startBin int32 counts []uint64 } // record increments the count for the given bin, and expands the buckets if needed. // Size changes must be done before calling this function. func (b *expoBuckets) record(bin int32) { if len(b.counts) == 0 { b.counts = []uint64{1} b.startBin = bin return } endBin := int(b.startBin) + len(b.counts) - 1 // if the new bin is inside the current range if bin >= b.startBin && int(bin) <= endBin { b.counts[bin-b.startBin]++ return } // if the new bin is before the current start add spaces to the counts if bin < b.startBin { origLen := len(b.counts) newLength := endBin - int(bin) + 1 shift := b.startBin - bin if newLength > cap(b.counts) { b.counts = append(b.counts, make([]uint64, newLength-len(b.counts))...) } copy(b.counts[shift:origLen+int(shift)], b.counts[:]) b.counts = b.counts[:newLength] for i := 1; i < int(shift); i++ { b.counts[i] = 0 } b.startBin = bin b.counts[0] = 1 return } // if the new is after the end add spaces to the end if int(bin) > endBin { if int(bin-b.startBin) < cap(b.counts) { b.counts = b.counts[:bin-b.startBin+1] for i := endBin + 1 - int(b.startBin); i < len(b.counts); i++ { b.counts[i] = 0 } b.counts[bin-b.startBin] = 1 return } end := make([]uint64, int(bin-b.startBin)-len(b.counts)+1) b.counts = append(b.counts, end...) b.counts[bin-b.startBin] = 1 } } // downscale shrinks a bucket by a factor of 2*s. It will sum counts into the // correct lower resolution bucket. func (b *expoBuckets) downscale(delta int32) { // Example // delta = 2 // Original offset: -6 // Counts: [ 3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] // bins: -6 -5, -4, -3, -2, -1, 0, 1, 2, 3, 4 // new bins:-2, -2, -1, -1, -1, -1, 0, 0, 0, 0, 1 // new Offset: -2 // new Counts: [4, 14, 30, 10] if len(b.counts) <= 1 || delta < 1 { b.startBin = b.startBin >> delta return } steps := int32(1) << delta offset := b.startBin % steps offset = (offset + steps) % steps // to make offset positive for i := 1; i < len(b.counts); i++ { idx := i + int(offset) if idx%int(steps) == 0 { b.counts[idx/int(steps)] = b.counts[i] continue } b.counts[idx/int(steps)] += b.counts[i] } lastIdx := (len(b.counts) - 1 + int(offset)) / int(steps) b.counts = b.counts[:lastIdx+1] b.startBin = b.startBin >> delta } // newExponentialHistogram returns an Aggregator that summarizes a set of // measurements as an exponential histogram. Each histogram is scoped by attributes // and the aggregation cycle the measurements were made in. func newExponentialHistogram[N int64 | float64]( maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N], ) *expoHistogram[N] { return &expoHistogram[N]{ noSum: noSum, noMinMax: noMinMax, maxSize: int(maxSize), maxScale: maxScale, newRes: r, limit: newLimiter[*expoHistogramDataPoint[N]](limit), values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]), start: now(), } } // expoHistogram summarizes a set of measurements as an histogram with exponentially // defined buckets. type expoHistogram[N int64 | float64] struct { noSum bool noMinMax bool maxSize int maxScale int32 newRes func(attribute.Set) FilteredExemplarReservoir[N] limit limiter[*expoHistogramDataPoint[N]] values map[attribute.Distinct]*expoHistogramDataPoint[N] valuesMu sync.Mutex start time.Time } func (e *expoHistogram[N]) measure( ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue, ) { // Ignore NaN and infinity. if math.IsInf(float64(value), 0) || math.IsNaN(float64(value)) { return } e.valuesMu.Lock() defer e.valuesMu.Unlock() attr := e.limit.Attributes(fltrAttr, e.values) v, ok := e.values[attr.Equivalent()] if !ok { v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum) v.res = e.newRes(attr) e.values[attr.Equivalent()] = v } v.record(value) v.res.Offer(ctx, value, droppedAttr) } func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int { t := now() // If *dest is not a metricdata.ExponentialHistogram, memory reuse is missed. // In that case, use the zero-value h and hope for better alignment next cycle. h, _ := (*dest).(metricdata.ExponentialHistogram[N]) h.Temporality = metricdata.DeltaTemporality e.valuesMu.Lock() defer e.valuesMu.Unlock() n := len(e.values) hDPts := reset(h.DataPoints, n, n) var i int for _, val := range e.values { hDPts[i].Attributes = val.attrs hDPts[i].StartTime = e.start hDPts[i].Time = t hDPts[i].Count = val.count hDPts[i].Scale = val.scale hDPts[i].ZeroCount = val.zeroCount hDPts[i].ZeroThreshold = 0.0 hDPts[i].PositiveBucket.Offset = val.posBuckets.startBin hDPts[i].PositiveBucket.Counts = reset( hDPts[i].PositiveBucket.Counts, len(val.posBuckets.counts), len(val.posBuckets.counts), ) copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts) hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin hDPts[i].NegativeBucket.Counts = reset( hDPts[i].NegativeBucket.Counts, len(val.negBuckets.counts), len(val.negBuckets.counts), ) copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts) if !e.noSum { hDPts[i].Sum = val.sum } if !e.noMinMax { hDPts[i].Min = metricdata.NewExtrema(val.min) hDPts[i].Max = metricdata.NewExtrema(val.max) } collectExemplars(&hDPts[i].Exemplars, val.res.Collect) i++ } // Unused attribute sets do not report. clear(e.values) e.start = t h.DataPoints = hDPts *dest = h return n } func (e *expoHistogram[N]) cumulative(dest *metricdata.Aggregation) int { t := now() // If *dest is not a metricdata.ExponentialHistogram, memory reuse is missed. // In that case, use the zero-value h and hope for better alignment next cycle. h, _ := (*dest).(metricdata.ExponentialHistogram[N]) h.Temporality = metricdata.CumulativeTemporality e.valuesMu.Lock() defer e.valuesMu.Unlock() n := len(e.values) hDPts := reset(h.DataPoints, n, n) var i int for _, val := range e.values { hDPts[i].Attributes = val.attrs hDPts[i].StartTime = e.start hDPts[i].Time = t hDPts[i].Count = val.count hDPts[i].Scale = val.scale hDPts[i].ZeroCount = val.zeroCount hDPts[i].ZeroThreshold = 0.0 hDPts[i].PositiveBucket.Offset = val.posBuckets.startBin hDPts[i].PositiveBucket.Counts = reset( hDPts[i].PositiveBucket.Counts, len(val.posBuckets.counts), len(val.posBuckets.counts), ) copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts) hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin hDPts[i].NegativeBucket.Counts = reset( hDPts[i].NegativeBucket.Counts, len(val.negBuckets.counts), len(val.negBuckets.counts), ) copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts) if !e.noSum { hDPts[i].Sum = val.sum } if !e.noMinMax { hDPts[i].Min = metricdata.NewExtrema(val.min) hDPts[i].Max = metricdata.NewExtrema(val.max) } collectExemplars(&hDPts[i].Exemplars, val.res.Collect) i++ // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not // overload the system. } h.DataPoints = hDPts *dest = h return n }
// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" import ( "context" "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/exemplar" ) // FilteredExemplarReservoir wraps a [exemplar.Reservoir] with a filter. type FilteredExemplarReservoir[N int64 | float64] interface { // Offer accepts the parameters associated with a measurement. The // parameters will be stored as an exemplar if the filter decides to // sample the measurement. // // The passed ctx needs to contain any baggage or span that were active // when the measurement was made. This information may be used by the // Reservoir in making a sampling decision. Offer(ctx context.Context, val N, attr []attribute.KeyValue) // Collect returns all the held exemplars in the reservoir. Collect(dest *[]exemplar.Exemplar) } // filteredExemplarReservoir handles the pre-sampled exemplar of measurements made. type filteredExemplarReservoir[N int64 | float64] struct { filter exemplar.Filter reservoir exemplar.Reservoir } // NewFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which only offers values // that are allowed by the filter. func NewFilteredExemplarReservoir[N int64 | float64]( f exemplar.Filter, r exemplar.Reservoir, ) FilteredExemplarReservoir[N] { return &filteredExemplarReservoir[N]{ filter: f, reservoir: r, } } func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) { if f.filter(ctx) { // only record the current time if we are sampling this measurement. f.reservoir.Offer(ctx, time.Now(), exemplar.NewValue(val), attr) } } func (f *filteredExemplarReservoir[N]) Collect(dest *[]exemplar.Exemplar) { f.reservoir.Collect(dest) }
// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" import ( "context" "slices" "sort" "sync" "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) type buckets[N int64 | float64] struct { attrs attribute.Set res FilteredExemplarReservoir[N] counts []uint64 count uint64 total N min, max N } // newBuckets returns buckets with n bins. func newBuckets[N int64 | float64](attrs attribute.Set, n int) *buckets[N] { return &buckets[N]{attrs: attrs, counts: make([]uint64, n)} } func (b *buckets[N]) sum(value N) { b.total += value } func (b *buckets[N]) bin(idx int, value N) { b.counts[idx]++ b.count++ if value < b.min { b.min = value } else if value > b.max { b.max = value } } // histValues summarizes a set of measurements as an histValues with // explicitly defined buckets. type histValues[N int64 | float64] struct { noSum bool bounds []float64 newRes func(attribute.Set) FilteredExemplarReservoir[N] limit limiter[*buckets[N]] values map[attribute.Distinct]*buckets[N] valuesMu sync.Mutex } func newHistValues[N int64 | float64]( bounds []float64, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N], ) *histValues[N] { // The responsibility of keeping all buckets correctly associated with the // passed boundaries is ultimately this type's responsibility. Make a copy // here so we can always guarantee this. Or, in the case of failure, have // complete control over the fix. b := slices.Clone(bounds) slices.Sort(b) return &histValues[N]{ noSum: noSum, bounds: b, newRes: r, limit: newLimiter[*buckets[N]](limit), values: make(map[attribute.Distinct]*buckets[N]), } } // Aggregate records the measurement value, scoped by attr, and aggregates it // into a histogram. func (s *histValues[N]) measure( ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue, ) { // This search will return an index in the range [0, len(s.bounds)], where // it will return len(s.bounds) if value is greater than the last element // of s.bounds. This aligns with the buckets in that the length of buckets // is len(s.bounds)+1, with the last bucket representing: // (s.bounds[len(s.bounds)-1], +∞). idx := sort.SearchFloat64s(s.bounds, float64(value)) s.valuesMu.Lock() defer s.valuesMu.Unlock() attr := s.limit.Attributes(fltrAttr, s.values) b, ok := s.values[attr.Equivalent()] if !ok { // N+1 buckets. For example: // // bounds = [0, 5, 10] // // Then, // // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) b = newBuckets[N](attr, len(s.bounds)+1) b.res = s.newRes(attr) // Ensure min and max are recorded values (not zero), for new buckets. b.min, b.max = value, value s.values[attr.Equivalent()] = b } b.bin(idx, value) if !s.noSum { b.sum(value) } b.res.Offer(ctx, value, droppedAttr) } // newHistogram returns an Aggregator that summarizes a set of measurements as // an histogram. func newHistogram[N int64 | float64]( boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N], ) *histogram[N] { return &histogram[N]{ histValues: newHistValues[N](boundaries, noSum, limit, r), noMinMax: noMinMax, start: now(), } } // histogram summarizes a set of measurements as an histogram with explicitly // defined buckets. type histogram[N int64 | float64] struct { *histValues[N] noMinMax bool start time.Time } func (s *histogram[N]) delta(dest *metricdata.Aggregation) int { t := now() // If *dest is not a metricdata.Histogram, memory reuse is missed. In that // case, use the zero-value h and hope for better alignment next cycle. h, _ := (*dest).(metricdata.Histogram[N]) h.Temporality = metricdata.DeltaTemporality s.valuesMu.Lock() defer s.valuesMu.Unlock() // Do not allow modification of our copy of bounds. bounds := slices.Clone(s.bounds) n := len(s.values) hDPts := reset(h.DataPoints, n, n) var i int for _, val := range s.values { hDPts[i].Attributes = val.attrs hDPts[i].StartTime = s.start hDPts[i].Time = t hDPts[i].Count = val.count hDPts[i].Bounds = bounds hDPts[i].BucketCounts = val.counts if !s.noSum { hDPts[i].Sum = val.total } if !s.noMinMax { hDPts[i].Min = metricdata.NewExtrema(val.min) hDPts[i].Max = metricdata.NewExtrema(val.max) } collectExemplars(&hDPts[i].Exemplars, val.res.Collect) i++ } // Unused attribute sets do not report. clear(s.values) // The delta collection cycle resets. s.start = t h.DataPoints = hDPts *dest = h return n } func (s *histogram[N]) cumulative(dest *metricdata.Aggregation) int { t := now() // If *dest is not a metricdata.Histogram, memory reuse is missed. In that // case, use the zero-value h and hope for better alignment next cycle. h, _ := (*dest).(metricdata.Histogram[N]) h.Temporality = metricdata.CumulativeTemporality s.valuesMu.Lock() defer s.valuesMu.Unlock() // Do not allow modification of our copy of bounds. bounds := slices.Clone(s.bounds) n := len(s.values) hDPts := reset(h.DataPoints, n, n) var i int for _, val := range s.values { hDPts[i].Attributes = val.attrs hDPts[i].StartTime = s.start hDPts[i].Time = t hDPts[i].Count = val.count hDPts[i].Bounds = bounds // The HistogramDataPoint field values returned need to be copies of // the buckets value as we will keep updating them. // // TODO (#3047): Making copies for bounds and counts incurs a large // memory allocation footprint. Alternatives should be explored. hDPts[i].BucketCounts = slices.Clone(val.counts) if !s.noSum { hDPts[i].Sum = val.total } if !s.noMinMax { hDPts[i].Min = metricdata.NewExtrema(val.min) hDPts[i].Max = metricdata.NewExtrema(val.max) } collectExemplars(&hDPts[i].Exemplars, val.res.Collect) i++ // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not // overload the system. } h.DataPoints = hDPts *dest = h return n }
// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" import ( "context" "sync" "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) // datapoint is timestamped measurement data. type datapoint[N int64 | float64] struct { attrs attribute.Set value N res FilteredExemplarReservoir[N] } func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *lastValue[N] { return &lastValue[N]{ newRes: r, limit: newLimiter[datapoint[N]](limit), values: make(map[attribute.Distinct]datapoint[N]), start: now(), } } // lastValue summarizes a set of measurements as the last one made. type lastValue[N int64 | float64] struct { sync.Mutex newRes func(attribute.Set) FilteredExemplarReservoir[N] limit limiter[datapoint[N]] values map[attribute.Distinct]datapoint[N] start time.Time } func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { s.Lock() defer s.Unlock() attr := s.limit.Attributes(fltrAttr, s.values) d, ok := s.values[attr.Equivalent()] if !ok { d.res = s.newRes(attr) } d.attrs = attr d.value = value d.res.Offer(ctx, value, droppedAttr) s.values[attr.Equivalent()] = d } func (s *lastValue[N]) delta(dest *metricdata.Aggregation) int { t := now() // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of // the DataPoints is missed (better luck next time). gData, _ := (*dest).(metricdata.Gauge[N]) s.Lock() defer s.Unlock() n := s.copyDpts(&gData.DataPoints, t) // Do not report stale values. clear(s.values) // Update start time for delta temporality. s.start = t *dest = gData return n } func (s *lastValue[N]) cumulative(dest *metricdata.Aggregation) int { t := now() // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of // the DataPoints is missed (better luck next time). gData, _ := (*dest).(metricdata.Gauge[N]) s.Lock() defer s.Unlock() n := s.copyDpts(&gData.DataPoints, t) // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not // overload the system. *dest = gData return n } // copyDpts copies the datapoints held by s into dest. The number of datapoints // copied is returned. func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) int { n := len(s.values) *dest = reset(*dest, n, n) var i int for _, v := range s.values { (*dest)[i].Attributes = v.attrs (*dest)[i].StartTime = s.start (*dest)[i].Time = t (*dest)[i].Value = v.value collectExemplars(&(*dest)[i].Exemplars, v.res.Collect) i++ } return n } // newPrecomputedLastValue returns an aggregator that summarizes a set of // observations as the last one made. func newPrecomputedLastValue[N int64 | float64]( limit int, r func(attribute.Set) FilteredExemplarReservoir[N], ) *precomputedLastValue[N] { return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)} } // precomputedLastValue summarizes a set of observations as the last one made. type precomputedLastValue[N int64 | float64] struct { *lastValue[N] } func (s *precomputedLastValue[N]) delta(dest *metricdata.Aggregation) int { t := now() // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of // the DataPoints is missed (better luck next time). gData, _ := (*dest).(metricdata.Gauge[N]) s.Lock() defer s.Unlock() n := s.copyDpts(&gData.DataPoints, t) // Do not report stale values. clear(s.values) // Update start time for delta temporality. s.start = t *dest = gData return n } func (s *precomputedLastValue[N]) cumulative(dest *metricdata.Aggregation) int { t := now() // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of // the DataPoints is missed (better luck next time). gData, _ := (*dest).(metricdata.Gauge[N]) s.Lock() defer s.Unlock() n := s.copyDpts(&gData.DataPoints, t) // Do not report stale values. clear(s.values) *dest = gData return n }
// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" import "go.opentelemetry.io/otel/attribute" // overflowSet is the attribute set used to record a measurement when adding // another distinct attribute set to the aggregate would exceed the aggregate // limit. var overflowSet = attribute.NewSet(attribute.Bool("otel.metric.overflow", true)) // limiter limits aggregate values. type limiter[V any] struct { // aggLimit is the maximum number of metric streams that can be aggregated. // // Any metric stream with attributes distinct from any set already // aggregated once the aggLimit will be meet will instead be aggregated // into an "overflow" metric stream. That stream will only contain the // "otel.metric.overflow"=true attribute. aggLimit int } // newLimiter returns a new Limiter with the provided aggregation limit. func newLimiter[V any](aggregation int) limiter[V] { return limiter[V]{aggLimit: aggregation} } // Attributes checks if adding a measurement for attrs will exceed the // aggregation cardinality limit for the existing measurements. If it will, // overflowSet is returned. Otherwise, if it will not exceed the limit, or the // limit is not set (limit <= 0), attr is returned. func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]V) attribute.Set { if l.aggLimit > 0 { _, exists := measurements[attrs.Equivalent()] if !exists && len(measurements) >= l.aggLimit-1 { return overflowSet } } return attrs }
// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" import ( "context" "sync" "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) type sumValue[N int64 | float64] struct { n N res FilteredExemplarReservoir[N] attrs attribute.Set } // valueMap is the storage for sums. type valueMap[N int64 | float64] struct { sync.Mutex newRes func(attribute.Set) FilteredExemplarReservoir[N] limit limiter[sumValue[N]] values map[attribute.Distinct]sumValue[N] } func newValueMap[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *valueMap[N] { return &valueMap[N]{ newRes: r, limit: newLimiter[sumValue[N]](limit), values: make(map[attribute.Distinct]sumValue[N]), } } func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { s.Lock() defer s.Unlock() attr := s.limit.Attributes(fltrAttr, s.values) v, ok := s.values[attr.Equivalent()] if !ok { v.res = s.newRes(attr) } v.attrs = attr v.n += value v.res.Offer(ctx, value, droppedAttr) s.values[attr.Equivalent()] = v } // newSum returns an aggregator that summarizes a set of measurements as their // arithmetic sum. Each sum is scoped by attributes and the aggregation cycle // the measurements were made in. func newSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *sum[N] { return &sum[N]{ valueMap: newValueMap[N](limit, r), monotonic: monotonic, start: now(), } } // sum summarizes a set of measurements made as their arithmetic sum. type sum[N int64 | float64] struct { *valueMap[N] monotonic bool start time.Time } func (s *sum[N]) delta(dest *metricdata.Aggregation) int { t := now() // If *dest is not a metricdata.Sum, memory reuse is missed. In that case, // use the zero-value sData and hope for better alignment next cycle. sData, _ := (*dest).(metricdata.Sum[N]) sData.Temporality = metricdata.DeltaTemporality sData.IsMonotonic = s.monotonic s.Lock() defer s.Unlock() n := len(s.values) dPts := reset(sData.DataPoints, n, n) var i int for _, val := range s.values { dPts[i].Attributes = val.attrs dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = val.n collectExemplars(&dPts[i].Exemplars, val.res.Collect) i++ } // Do not report stale values. clear(s.values) // The delta collection cycle resets. s.start = t sData.DataPoints = dPts *dest = sData return n } func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { t := now() // If *dest is not a metricdata.Sum, memory reuse is missed. In that case, // use the zero-value sData and hope for better alignment next cycle. sData, _ := (*dest).(metricdata.Sum[N]) sData.Temporality = metricdata.CumulativeTemporality sData.IsMonotonic = s.monotonic s.Lock() defer s.Unlock() n := len(s.values) dPts := reset(sData.DataPoints, n, n) var i int for _, value := range s.values { dPts[i].Attributes = value.attrs dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = value.n collectExemplars(&dPts[i].Exemplars, value.res.Collect) // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not // overload the system. i++ } sData.DataPoints = dPts *dest = sData return n } // newPrecomputedSum returns an aggregator that summarizes a set of // observations as their arithmetic sum. Each sum is scoped by attributes and // the aggregation cycle the measurements were made in. func newPrecomputedSum[N int64 | float64]( monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N], ) *precomputedSum[N] { return &precomputedSum[N]{ valueMap: newValueMap[N](limit, r), monotonic: monotonic, start: now(), } } // precomputedSum summarizes a set of observations as their arithmetic sum. type precomputedSum[N int64 | float64] struct { *valueMap[N] monotonic bool start time.Time reported map[attribute.Distinct]N } func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int { t := now() newReported := make(map[attribute.Distinct]N) // If *dest is not a metricdata.Sum, memory reuse is missed. In that case, // use the zero-value sData and hope for better alignment next cycle. sData, _ := (*dest).(metricdata.Sum[N]) sData.Temporality = metricdata.DeltaTemporality sData.IsMonotonic = s.monotonic s.Lock() defer s.Unlock() n := len(s.values) dPts := reset(sData.DataPoints, n, n) var i int for key, value := range s.values { delta := value.n - s.reported[key] dPts[i].Attributes = value.attrs dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = delta collectExemplars(&dPts[i].Exemplars, value.res.Collect) newReported[key] = value.n i++ } // Unused attribute sets do not report. clear(s.values) s.reported = newReported // The delta collection cycle resets. s.start = t sData.DataPoints = dPts *dest = sData return n } func (s *precomputedSum[N]) cumulative(dest *metricdata.Aggregation) int { t := now() // If *dest is not a metricdata.Sum, memory reuse is missed. In that case, // use the zero-value sData and hope for better alignment next cycle. sData, _ := (*dest).(metricdata.Sum[N]) sData.Temporality = metricdata.CumulativeTemporality sData.IsMonotonic = s.monotonic s.Lock() defer s.Unlock() n := len(s.values) dPts := reset(sData.DataPoints, n, n) var i int for _, val := range s.values { dPts[i].Attributes = val.attrs dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = val.n collectExemplars(&dPts[i].Exemplars, val.res.Collect) i++ } // Unused attribute sets do not report. clear(s.values) sData.DataPoints = dPts *dest = sData return n }