Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "cmp"
9 : "slices"
10 : "sync"
11 : "sync/atomic"
12 : "time"
13 : "unsafe"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/redact"
17 : )
18 :
19 : // Category is a user-understandable string, where stats are aggregated for
20 : // each category. The cardinality of this should be low, say < 20. The prefix
21 : // "pebble-" is reserved for internal Pebble categories.
22 : //
23 : // Examples of categories that can be useful in the CockroachDB context are:
24 : // sql-user, sql-stats, raft, rangefeed, mvcc-gc, range-snapshot.
25 : type Category uint8
26 :
27 : // CategoryUnknown is the unknown category. It has the latency-sensitive QoS
28 : // level.
29 : const CategoryUnknown Category = 0
30 :
31 : // CategoryMax is the maximum value of a category, and is also the maximum
32 : // number of categories that can be registered.
33 : const CategoryMax = 30
34 :
35 1 : func (c Category) String() string {
36 1 : return categories[c].name
37 1 : }
38 :
39 : // QoSLevel returns the QoSLevel associated with this Category.
40 1 : func (c Category) QoSLevel() QoSLevel {
41 1 : return categories[c].qosLevel
42 1 : }
43 :
44 : // SafeFormat implements the redact.SafeFormatter interface.
45 0 : func (c Category) SafeFormat(p redact.SafePrinter, verb rune) {
46 0 : p.SafeString(redact.SafeString(c.String()))
47 0 : }
48 :
49 : // RegisterCategory registers a new category. Each category has a name and an
50 : // associated QoS level. The category name must be unique.
51 : //
52 : // Only CategoryMax categories can be registered in total.
53 1 : func RegisterCategory(name string, qosLevel QoSLevel) Category {
54 1 : if categoriesList != nil {
55 0 : panic("ReigsterCategory called after Categories()")
56 : }
57 1 : c := Category(numRegisteredCategories.Add(1))
58 1 : if c > CategoryMax {
59 0 : panic("too many categories")
60 : }
61 1 : categories[c].name = name
62 1 : categories[c].qosLevel = qosLevel
63 1 : return c
64 : }
65 :
66 : // Categories returns all registered categories, including CategoryUnknown.
67 : //
68 : // Can only be called after all categories have been registered. Calling
69 : // RegisterCategory() after Categories() will result in a panic.
70 0 : func Categories() []Category {
71 0 : categoriesListOnce.Do(func() {
72 0 : categoriesList = make([]Category, numRegisteredCategories.Load()+1)
73 0 : for i := range categoriesList {
74 0 : categoriesList[i] = Category(i)
75 0 : }
76 : })
77 0 : return categoriesList
78 : }
79 :
80 : var categories = [CategoryMax + 1]struct {
81 : name string
82 : qosLevel QoSLevel
83 : }{
84 : CategoryUnknown: {name: "unknown", qosLevel: LatencySensitiveQoSLevel},
85 : }
86 :
87 : var numRegisteredCategories atomic.Uint32
88 :
89 : var categoriesList []Category
90 : var categoriesListOnce sync.Once
91 :
92 : // StringToCategoryForTesting returns the Category for the string, or panics if
93 : // the string is not known.
94 1 : func StringToCategoryForTesting(s string) Category {
95 1 : for i := range categories {
96 1 : if categories[i].name == s {
97 1 : return Category(i)
98 1 : }
99 : }
100 0 : panic(errors.AssertionFailedf("unknown Category %s", s))
101 : }
102 :
103 : // QoSLevel describes whether the read is latency-sensitive or not. Each
104 : // category must map to a single QoSLevel. While category strings are opaque
105 : // to Pebble, the QoSLevel may be internally utilized in Pebble to better
106 : // optimize future reads.
107 : type QoSLevel uint8
108 :
109 : const (
110 : // LatencySensitiveQoSLevel is the default when QoSLevel is not specified,
111 : // and represents reads that are latency-sensitive.
112 : LatencySensitiveQoSLevel QoSLevel = iota
113 : // NonLatencySensitiveQoSLevel represents reads that are not
114 : // latency-sensitive.
115 : NonLatencySensitiveQoSLevel
116 : )
117 :
118 : // SafeFormat implements the redact.SafeFormatter interface.
119 1 : func (q QoSLevel) SafeFormat(p redact.SafePrinter, verb rune) {
120 1 : switch q {
121 1 : case LatencySensitiveQoSLevel:
122 1 : p.Printf("latency")
123 1 : case NonLatencySensitiveQoSLevel:
124 1 : p.Printf("non-latency")
125 0 : default:
126 0 : p.Printf("<unknown-qos>")
127 : }
128 : }
129 :
130 : // StringToQoSForTesting returns the QoSLevel for the string, or panics if the
131 : // string is not known.
132 0 : func StringToQoSForTesting(s string) QoSLevel {
133 0 : switch s {
134 0 : case "latency":
135 0 : return LatencySensitiveQoSLevel
136 0 : case "non-latency":
137 0 : return NonLatencySensitiveQoSLevel
138 : }
139 0 : panic(errors.AssertionFailedf("unknown QoS %s", s))
140 : }
141 :
142 : // CategoryStats provides stats about a category of reads.
143 : type CategoryStats struct {
144 : // BlockBytes is the bytes in the loaded blocks. If the block was
145 : // compressed, this is the compressed bytes. Currently, only the index
146 : // blocks, data blocks containing points, and filter blocks are included.
147 : // Additionally, value blocks read after the corresponding iterator is
148 : // closed are not included.
149 : BlockBytes uint64
150 : // BlockBytesInCache is the subset of BlockBytes that were in the block
151 : // cache.
152 : BlockBytesInCache uint64
153 : // BlockReadDuration is the total duration to read the bytes not in the
154 : // cache, i.e., BlockBytes-BlockBytesInCache.
155 : BlockReadDuration time.Duration
156 : }
157 :
158 1 : func (s *CategoryStats) aggregate(a CategoryStats) {
159 1 : s.BlockBytes += a.BlockBytes
160 1 : s.BlockBytesInCache += a.BlockBytesInCache
161 1 : s.BlockReadDuration += a.BlockReadDuration
162 1 : }
163 :
164 : // CategoryStatsAggregate is the aggregate for the given category.
165 : type CategoryStatsAggregate struct {
166 : Category Category
167 : CategoryStats CategoryStats
168 : }
169 :
170 : const numCategoryStatsShards = 16
171 :
172 : type categoryStatsWithMu struct {
173 : mu sync.Mutex
174 : // Protected by mu.
175 : stats CategoryStats
176 : }
177 :
178 : // Accumulate implements the IterStatsAccumulator interface.
179 1 : func (c *categoryStatsWithMu) Accumulate(stats CategoryStats) {
180 1 : c.mu.Lock()
181 1 : c.stats.aggregate(stats)
182 1 : c.mu.Unlock()
183 1 : }
184 :
185 : // CategoryStatsCollector collects and aggregates the stats per category.
186 : type CategoryStatsCollector struct {
187 : // mu protects additions to statsMap.
188 : mu sync.Mutex
189 : // Category => *shardedCategoryStats.
190 : statsMap sync.Map
191 : }
192 :
193 : // shardedCategoryStats accumulates stats for a category, splitting its stats
194 : // across multiple shards to prevent mutex contention. In high-read workloads,
195 : // contention on the category stats mutex has been observed.
196 : type shardedCategoryStats struct {
197 : Category Category
198 : shards [numCategoryStatsShards]struct {
199 : categoryStatsWithMu
200 : // Pad each shard to 64 bytes so they don't share a cache line.
201 : _ [64 - unsafe.Sizeof(categoryStatsWithMu{})]byte
202 : }
203 : }
204 :
205 : // getStats retrieves the aggregated stats for the category, summing across all
206 : // shards.
207 1 : func (s *shardedCategoryStats) getStats() CategoryStatsAggregate {
208 1 : agg := CategoryStatsAggregate{
209 1 : Category: s.Category,
210 1 : }
211 1 : for i := range s.shards {
212 1 : s.shards[i].mu.Lock()
213 1 : agg.CategoryStats.aggregate(s.shards[i].stats)
214 1 : s.shards[i].mu.Unlock()
215 1 : }
216 1 : return agg
217 : }
218 :
219 : // Accumulator returns a stats accumulator for the given category. The provided
220 : // p is used to detrmine which shard to write stats to.
221 1 : func (c *CategoryStatsCollector) Accumulator(p uint64, category Category) IterStatsAccumulator {
222 1 : v, ok := c.statsMap.Load(category)
223 1 : if !ok {
224 1 : c.mu.Lock()
225 1 : v, _ = c.statsMap.LoadOrStore(category, &shardedCategoryStats{
226 1 : Category: category,
227 1 : })
228 1 : c.mu.Unlock()
229 1 : }
230 1 : s := v.(*shardedCategoryStats)
231 1 : // This equation is taken from:
232 1 : // https://en.wikipedia.org/wiki/Linear_congruential_generator#Parameters_in_common_use
233 1 : shard := ((p * 25214903917) >> 32) & (numCategoryStatsShards - 1)
234 1 : return &s.shards[shard].categoryStatsWithMu
235 : }
236 :
237 : // GetStats returns the aggregated stats.
238 1 : func (c *CategoryStatsCollector) GetStats() []CategoryStatsAggregate {
239 1 : var stats []CategoryStatsAggregate
240 1 : c.statsMap.Range(func(_, v any) bool {
241 1 : s := v.(*shardedCategoryStats).getStats()
242 1 : stats = append(stats, s)
243 1 : return true
244 1 : })
245 1 : slices.SortFunc(stats, func(a, b CategoryStatsAggregate) int {
246 1 : return cmp.Compare(a.Category, b.Category)
247 1 : })
248 1 : return stats
249 : }
250 :
251 : type IterStatsAccumulator interface {
252 : // Accumulate accumulates the provided stats.
253 : Accumulate(cas CategoryStats)
254 : }
255 :
256 : // iterStatsAccumulator is a helper for a sstable iterator to accumulate stats,
257 : // which are reported to the CategoryStatsCollector when the accumulator is
258 : // closed.
259 : type iterStatsAccumulator struct {
260 : stats CategoryStats
261 : parent IterStatsAccumulator
262 : }
263 :
264 1 : func (a *iterStatsAccumulator) init(parent IterStatsAccumulator) {
265 1 : a.parent = parent
266 1 : }
267 :
268 : func (a *iterStatsAccumulator) reportStats(
269 : blockBytes, blockBytesInCache uint64, blockReadDuration time.Duration,
270 1 : ) {
271 1 : a.stats.BlockBytes += blockBytes
272 1 : a.stats.BlockBytesInCache += blockBytesInCache
273 1 : a.stats.BlockReadDuration += blockReadDuration
274 1 : }
275 :
276 1 : func (a *iterStatsAccumulator) close() {
277 1 : if a.parent != nil {
278 1 : a.parent.Accumulate(a.stats)
279 1 : }
280 : }
|