Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use 2 : // of this source code is governed by a BSD-style license that can be found in 3 : // the LICENSE file. 4 : 5 : package sstable 6 : 7 : import ( 8 : "cmp" 9 : "slices" 10 : "sync" 11 : "time" 12 : 13 : "github.com/cockroachdb/errors" 14 : "github.com/cockroachdb/redact" 15 : ) 16 : 17 : // Category is a user-understandable string, where stats are aggregated for 18 : // each category. The cardinality of this should be low, say < 20. The prefix 19 : // "pebble-" is reserved for internal Pebble categories. 20 : // 21 : // Examples of categories that can be useful in the CockroachDB context are: 22 : // sql-user, sql-stats, raft, rangefeed, mvcc-gc, range-snapshot. 23 : type Category string 24 : 25 : // QoSLevel describes whether the read is latency-sensitive or not. Each 26 : // category must map to a single QoSLevel. While category strings are opaque 27 : // to Pebble, the QoSLevel may be internally utilized in Pebble to better 28 : // optimize future reads. 29 : type QoSLevel int 30 : 31 : const ( 32 : // LatencySensitiveQoSLevel is the default when QoSLevel is not specified, 33 : // and represents reads that are latency-sensitive. 34 : LatencySensitiveQoSLevel QoSLevel = iota 35 : // NonLatencySensitiveQoSLevel represents reads that are not 36 : // latency-sensitive. 37 : NonLatencySensitiveQoSLevel 38 : ) 39 : 40 : // SafeFormat implements the redact.SafeFormatter interface. 41 1 : func (q QoSLevel) SafeFormat(p redact.SafePrinter, verb rune) { 42 1 : switch q { 43 1 : case LatencySensitiveQoSLevel: 44 1 : p.Printf("latency") 45 1 : case NonLatencySensitiveQoSLevel: 46 1 : p.Printf("non-latency") 47 0 : default: 48 0 : p.Printf("<unknown-qos>") 49 : } 50 : } 51 : 52 : // StringToQoSForTesting returns the QoSLevel for the string, or panics if the 53 : // string is not known. 54 1 : func StringToQoSForTesting(s string) QoSLevel { 55 1 : switch s { 56 1 : case "latency": 57 1 : return LatencySensitiveQoSLevel 58 1 : case "non-latency": 59 1 : return NonLatencySensitiveQoSLevel 60 : } 61 0 : panic(errors.AssertionFailedf("unknown QoS %s", s)) 62 : } 63 : 64 : // CategoryAndQoS specifies both the Category and the QoSLevel. 65 : type CategoryAndQoS struct { 66 : Category 67 : QoSLevel 68 : } 69 : 70 : // CategoryStats provides stats about a category of reads. 71 : type CategoryStats struct { 72 : // BlockBytes is the bytes in the loaded blocks. If the block was 73 : // compressed, this is the compressed bytes. Currently, only the index 74 : // blocks, data blocks containing points, and filter blocks are included. 75 : // Additionally, value blocks read after the corresponding iterator is 76 : // closed are not included. 77 : BlockBytes uint64 78 : // BlockBytesInCache is the subset of BlockBytes that were in the block 79 : // cache. 80 : BlockBytesInCache uint64 81 : // BlockReadDuration is the total duration to read the bytes not in the 82 : // cache, i.e., BlockBytes-BlockBytesInCache. 83 : BlockReadDuration time.Duration 84 : } 85 : 86 1 : func (s *CategoryStats) aggregate(a CategoryStats) { 87 1 : s.BlockBytes += a.BlockBytes 88 1 : s.BlockBytesInCache += a.BlockBytesInCache 89 1 : s.BlockReadDuration += a.BlockReadDuration 90 1 : } 91 : 92 : // CategoryStatsAggregate is the aggregate for the given category. 93 : type CategoryStatsAggregate struct { 94 : Category 95 : QoSLevel 96 : CategoryStats 97 : } 98 : 99 : type categoryStatsWithMu struct { 100 : mu sync.Mutex 101 : // Protected by mu. 102 : stats CategoryStatsAggregate 103 : } 104 : 105 : // CategoryStatsCollector collects and aggregates the stats per category. 106 : type CategoryStatsCollector struct { 107 : // mu protects additions to statsMap. 108 : mu sync.Mutex 109 : // Category => categoryStatsWithMu. 110 : statsMap sync.Map 111 : } 112 : 113 : func (c *CategoryStatsCollector) reportStats( 114 : category Category, qosLevel QoSLevel, stats CategoryStats, 115 1 : ) { 116 1 : v, ok := c.statsMap.Load(category) 117 1 : if !ok { 118 1 : c.mu.Lock() 119 1 : v, _ = c.statsMap.LoadOrStore(category, &categoryStatsWithMu{ 120 1 : stats: CategoryStatsAggregate{Category: category, QoSLevel: qosLevel}, 121 1 : }) 122 1 : c.mu.Unlock() 123 1 : } 124 1 : aggStats := v.(*categoryStatsWithMu) 125 1 : aggStats.mu.Lock() 126 1 : aggStats.stats.CategoryStats.aggregate(stats) 127 1 : aggStats.mu.Unlock() 128 : } 129 : 130 : // GetStats returns the aggregated stats. 131 1 : func (c *CategoryStatsCollector) GetStats() []CategoryStatsAggregate { 132 1 : var stats []CategoryStatsAggregate 133 1 : c.statsMap.Range(func(_, v any) bool { 134 1 : aggStats := v.(*categoryStatsWithMu) 135 1 : aggStats.mu.Lock() 136 1 : s := aggStats.stats 137 1 : aggStats.mu.Unlock() 138 1 : if len(s.Category) == 0 { 139 1 : s.Category = "_unknown" 140 1 : } 141 1 : stats = append(stats, s) 142 1 : return true 143 : }) 144 1 : slices.SortFunc(stats, func(a, b CategoryStatsAggregate) int { 145 1 : return cmp.Compare(a.Category, b.Category) 146 1 : }) 147 1 : return stats 148 : } 149 : 150 : // iterStatsAccumulator is a helper for a sstable iterator to accumulate 151 : // stats, which are reported to the CategoryStatsCollector when the 152 : // accumulator is closed. 153 : type iterStatsAccumulator struct { 154 : Category 155 : QoSLevel 156 : stats CategoryStats 157 : collector *CategoryStatsCollector 158 : } 159 : 160 : func (accum *iterStatsAccumulator) init( 161 : categoryAndQoS CategoryAndQoS, collector *CategoryStatsCollector, 162 1 : ) { 163 1 : accum.Category = categoryAndQoS.Category 164 1 : accum.QoSLevel = categoryAndQoS.QoSLevel 165 1 : accum.collector = collector 166 1 : } 167 : 168 : func (accum *iterStatsAccumulator) reportStats( 169 : blockBytes, blockBytesInCache uint64, blockReadDuration time.Duration, 170 1 : ) { 171 1 : accum.stats.BlockBytes += blockBytes 172 1 : accum.stats.BlockBytesInCache += blockBytesInCache 173 1 : accum.stats.BlockReadDuration += blockReadDuration 174 1 : } 175 : 176 1 : func (accum *iterStatsAccumulator) close() { 177 1 : if accum.collector != nil { 178 1 : accum.collector.reportStats(accum.Category, accum.QoSLevel, accum.stats) 179 1 : } 180 : }