Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use 2 : // of this source code is governed by a BSD-style license that can be found in 3 : // the LICENSE file. 4 : 5 : package sstable 6 : 7 : import ( 8 : "cmp" 9 : "slices" 10 : "sync" 11 : 12 : "github.com/cockroachdb/errors" 13 : "github.com/cockroachdb/redact" 14 : ) 15 : 16 : // Category is a user-understandable string, where stats are aggregated for 17 : // each category. The cardinality of this should be low, say < 20. The prefix 18 : // "pebble-" is reserved for internal Pebble categories. 19 : // 20 : // Examples of categories that can be useful in the CockroachDB context are: 21 : // sql-user, sql-stats, raft, rangefeed, mvcc-gc, range-snapshot. 22 : type Category string 23 : 24 : // QoSLevel describes whether the read is latency-sensitive or not. Each 25 : // category must map to a single QoSLevel. While category strings are opaque 26 : // to Pebble, the QoSLevel may be internally utilized in Pebble to better 27 : // optimize future reads. 28 : type QoSLevel int 29 : 30 : const ( 31 : // LatencySensitiveQoSLevel is the default when QoSLevel is not specified, 32 : // and represents reads that are latency-sensitive. 33 : LatencySensitiveQoSLevel QoSLevel = iota 34 : // NonLatencySensitiveQoSLevel represents reads that are not 35 : // latency-sensitive. 36 : NonLatencySensitiveQoSLevel 37 : ) 38 : 39 : // SafeFormat implements the redact.SafeFormatter interface. 40 1 : func (q QoSLevel) SafeFormat(p redact.SafePrinter, verb rune) { 41 1 : switch q { 42 1 : case LatencySensitiveQoSLevel: 43 1 : p.Printf("latency") 44 1 : case NonLatencySensitiveQoSLevel: 45 1 : p.Printf("non-latency") 46 0 : default: 47 0 : p.Printf("<unknown-qos>") 48 : } 49 : } 50 : 51 : // StringToQoSForTesting returns the QoSLevel for the string, or panics if the 52 : // string is not known. 53 1 : func StringToQoSForTesting(s string) QoSLevel { 54 1 : switch s { 55 1 : case "latency": 56 1 : return LatencySensitiveQoSLevel 57 1 : case "non-latency": 58 1 : return NonLatencySensitiveQoSLevel 59 : } 60 0 : panic(errors.AssertionFailedf("unknown QoS %s", s)) 61 : } 62 : 63 : // CategoryAndQoS specifies both the Category and the QoSLevel. 64 : type CategoryAndQoS struct { 65 : Category 66 : QoSLevel 67 : } 68 : 69 : // CategoryStats provides stats about a category of reads. 70 : type CategoryStats struct { 71 : // BlockBytes is the bytes in the loaded blocks. If the block was 72 : // compressed, this is the compressed bytes. Currently, only the index 73 : // blocks, data blocks containing points, and filter blocks are included. 74 : // Additionally, value blocks read after the corresponding iterator is 75 : // closed are not included. 76 : BlockBytes uint64 77 : // BlockBytesInCache is the subset of BlockBytes that were in the block 78 : // cache. 79 : BlockBytesInCache uint64 80 : } 81 : 82 1 : func (s *CategoryStats) aggregate(a CategoryStats) { 83 1 : s.BlockBytes += a.BlockBytes 84 1 : s.BlockBytesInCache += a.BlockBytesInCache 85 1 : } 86 : 87 : // CategoryStatsAggregate is the aggregate for the given category. 88 : type CategoryStatsAggregate struct { 89 : Category 90 : QoSLevel 91 : CategoryStats 92 : } 93 : 94 : type categoryStatsWithMu struct { 95 : mu sync.Mutex 96 : // Protected by mu. 97 : stats CategoryStatsAggregate 98 : } 99 : 100 : // CategoryStatsCollector collects and aggregates the stats per category. 101 : type CategoryStatsCollector struct { 102 : // mu protects additions to statsMap. 103 : mu sync.Mutex 104 : // Category => categoryStatsWithMu. 105 : statsMap sync.Map 106 : } 107 : 108 : func (c *CategoryStatsCollector) reportStats( 109 : category Category, qosLevel QoSLevel, stats CategoryStats, 110 1 : ) { 111 1 : v, ok := c.statsMap.Load(category) 112 1 : if !ok { 113 1 : c.mu.Lock() 114 1 : v, _ = c.statsMap.LoadOrStore(category, &categoryStatsWithMu{ 115 1 : stats: CategoryStatsAggregate{Category: category, QoSLevel: qosLevel}, 116 1 : }) 117 1 : c.mu.Unlock() 118 1 : } 119 1 : aggStats := v.(*categoryStatsWithMu) 120 1 : aggStats.mu.Lock() 121 1 : aggStats.stats.CategoryStats.aggregate(stats) 122 1 : aggStats.mu.Unlock() 123 : } 124 : 125 : // GetStats returns the aggregated stats. 126 1 : func (c *CategoryStatsCollector) GetStats() []CategoryStatsAggregate { 127 1 : var stats []CategoryStatsAggregate 128 1 : c.statsMap.Range(func(_, v any) bool { 129 1 : aggStats := v.(*categoryStatsWithMu) 130 1 : aggStats.mu.Lock() 131 1 : s := aggStats.stats 132 1 : aggStats.mu.Unlock() 133 1 : if len(s.Category) == 0 { 134 1 : s.Category = "_unknown" 135 1 : } 136 1 : stats = append(stats, s) 137 1 : return true 138 : }) 139 1 : slices.SortFunc(stats, func(a, b CategoryStatsAggregate) int { 140 1 : return cmp.Compare(a.Category, b.Category) 141 1 : }) 142 1 : return stats 143 : } 144 : 145 : // iterStatsAccumulator is a helper for a sstable iterator to accumulate 146 : // stats, which are reported to the CategoryStatsCollector when the 147 : // accumulator is closed. 148 : type iterStatsAccumulator struct { 149 : Category 150 : QoSLevel 151 : stats CategoryStats 152 : collector *CategoryStatsCollector 153 : } 154 : 155 : func (accum *iterStatsAccumulator) init( 156 : categoryAndQoS CategoryAndQoS, collector *CategoryStatsCollector, 157 1 : ) { 158 1 : accum.Category = categoryAndQoS.Category 159 1 : accum.QoSLevel = categoryAndQoS.QoSLevel 160 1 : accum.collector = collector 161 1 : } 162 : 163 1 : func (accum *iterStatsAccumulator) reportStats(blockBytes, blockBytesInCache uint64) { 164 1 : accum.stats.BlockBytes += blockBytes 165 1 : accum.stats.BlockBytesInCache += blockBytesInCache 166 1 : } 167 : 168 1 : func (accum *iterStatsAccumulator) close() { 169 1 : if accum.collector != nil { 170 1 : accum.collector.reportStats(accum.Category, accum.QoSLevel, accum.stats) 171 1 : } 172 : }