Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package tool
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "io"
11 : "math"
12 : "math/rand"
13 : "slices"
14 : "sort"
15 : "strconv"
16 : "strings"
17 : "sync"
18 : "time"
19 :
20 : "github.com/cockroachdb/errors"
21 : "github.com/cockroachdb/pebble"
22 : "github.com/cockroachdb/pebble/internal/base"
23 : "github.com/cockroachdb/pebble/objstorage"
24 : "github.com/spf13/cobra"
25 : )
26 :
27 : type benchIO struct {
28 : readableIdx int
29 : ofs int64
30 : size int
31 : // elapsed time for the IO, filled out by performIOs.
32 : elapsed time.Duration
33 : }
34 :
35 : const maxIOSize = 1024 * 1024
36 :
37 : // runIOBench runs an IO benchmark against the current sstables of a database.
38 : // The workload is random IO, with various IO sizes. The main goal of the
39 : // benchmark is to establish the relationship between IO size and latency,
40 : // especially against shared object storage.
41 0 : func (d *dbT) runIOBench(cmd *cobra.Command, args []string) {
42 0 : stdout := cmd.OutOrStdout()
43 0 :
44 0 : ioSizes, err := parseIOSizes(d.ioSizes)
45 0 : if err != nil {
46 0 : fmt.Fprintf(stdout, "error parsing io-sizes: %s\n", err)
47 0 : return
48 0 : }
49 :
50 0 : db, err := d.openDB(args[0])
51 0 : if err != nil {
52 0 : fmt.Fprintf(stdout, "%s\n", err)
53 0 : return
54 0 : }
55 0 : defer d.closeDB(stdout, db)
56 0 :
57 0 : readables, err := d.openBenchTables(db)
58 0 : if err != nil {
59 0 : fmt.Fprintf(stdout, "%s\n", err)
60 0 : return
61 0 : }
62 :
63 0 : defer func() {
64 0 : for _, r := range readables {
65 0 : r.Close()
66 0 : }
67 : }()
68 :
69 0 : ios := genBenchIOs(stdout, readables, d.ioCount, ioSizes)
70 0 :
71 0 : levels := "L5,L6"
72 0 : if d.allLevels {
73 0 : levels = "all"
74 0 : }
75 0 : fmt.Fprintf(stdout, "IO count: %d Parallelism: %d Levels: %s\n", d.ioCount, d.ioParallelism, levels)
76 0 :
77 0 : var wg sync.WaitGroup
78 0 : wg.Add(d.ioParallelism)
79 0 : remainingIOs := ios
80 0 : for i := 0; i < d.ioParallelism; i++ {
81 0 : // We want to distribute the IOs among d.ioParallelism goroutines. At each
82 0 : // step, we look at the number of IOs remaining and take the average (across
83 0 : // the goroutines that are left); this deals with any rounding issues.
84 0 : n := len(remainingIOs) / (d.ioParallelism - i)
85 0 : go func(workerIdx int, ios []benchIO) {
86 0 : defer wg.Done()
87 0 : if err := performIOs(readables, ios); err != nil {
88 0 : fmt.Fprintf(stdout, "worker %d encountered error: %v", workerIdx, err)
89 0 : }
90 : }(i, remainingIOs[:n])
91 0 : remainingIOs = remainingIOs[n:]
92 : }
93 0 : wg.Wait()
94 0 :
95 0 : elapsed := make([]time.Duration, d.ioCount)
96 0 : for _, ioSize := range ioSizes {
97 0 : elapsed = elapsed[:0]
98 0 : for i := range ios {
99 0 : if ios[i].size == ioSize {
100 0 : elapsed = append(elapsed, ios[i].elapsed)
101 0 : }
102 : }
103 0 : fmt.Fprintf(stdout, "%4dKB -- %s\n", ioSize/1024, getStats(elapsed))
104 : }
105 : }
106 :
107 : // genBenchIOs generates <count> IOs for each given size. All IOs (across all
108 : // sizes) are in random order.
109 : func genBenchIOs(
110 : stdout io.Writer, readables []objstorage.Readable, count int, sizes []int,
111 0 : ) []benchIO {
112 0 : // size[i] is the size of the object, in blocks of maxIOSize.
113 0 : size := make([]int, len(readables))
114 0 : // sum[i] is the sum (size[0] + ... + size[i]).
115 0 : sum := make([]int, len(readables))
116 0 : total := 0
117 0 : for i, r := range readables {
118 0 : size[i] = int(r.Size() / maxIOSize)
119 0 : total += size[i]
120 0 : sum[i] = total
121 0 : }
122 0 : fmt.Fprintf(stdout, "Opened %d objects; total size %d MB.\n", len(readables), total*maxIOSize/(1024*1024))
123 0 :
124 0 : // To avoid a lot of overlap between the reads, the total size should be a
125 0 : // factor larger than the size we will actually read (for the largest IO
126 0 : // size).
127 0 : const sizeFactor = 2
128 0 : if total*maxIOSize < count*sizes[len(sizes)-1]*sizeFactor {
129 0 : fmt.Fprintf(stdout, "Warning: store too small for the given IO count and sizes.\n")
130 0 : }
131 :
132 : // Choose how many IOs we do for each object, by selecting a random block
133 : // across all file blocks.
134 : // The choice of objects will be the same across all IO sizes.
135 0 : b := make([]int, count)
136 0 : for i := range b {
137 0 : b[i] = rand.Intn(total)
138 0 : }
139 : // For each b[i], find the index such that sum[idx-1] <= b < sum[idx].
140 : // Sorting b makes this easier: we can "merge" the sorted arrays b and sum.
141 0 : sort.Ints(b)
142 0 : rIdx := make([]int, count)
143 0 : currIdx := 0
144 0 : for i := range b {
145 0 : for b[i] >= sum[currIdx] {
146 0 : currIdx++
147 0 : }
148 0 : rIdx[i] = currIdx
149 : }
150 :
151 0 : res := make([]benchIO, 0, count*len(sizes))
152 0 : for _, ioSize := range sizes {
153 0 : for _, idx := range rIdx {
154 0 : // Random ioSize aligned offset.
155 0 : ofs := ioSize * rand.Intn(size[idx]*maxIOSize/ioSize)
156 0 :
157 0 : res = append(res, benchIO{
158 0 : readableIdx: idx,
159 0 : ofs: int64(ofs),
160 0 : size: ioSize,
161 0 : })
162 0 : }
163 : }
164 0 : rand.Shuffle(len(res), func(i, j int) {
165 0 : res[i], res[j] = res[j], res[i]
166 0 : })
167 0 : return res
168 : }
169 :
170 : // openBenchTables opens the sstables for the benchmark and returns them as a
171 : // list of Readables.
172 : //
173 : // By default, only L5/L6 sstables are used; all levels are used if the
174 : // allLevels flag is set.
175 : //
176 : // Note that only sstables that are at least maxIOSize (1MB) are used.
177 0 : func (d *dbT) openBenchTables(db *pebble.DB) ([]objstorage.Readable, error) {
178 0 : tables, err := db.SSTables()
179 0 : if err != nil {
180 0 : return nil, err
181 0 : }
182 0 : startLevel := 5
183 0 : if d.allLevels {
184 0 : startLevel = 0
185 0 : }
186 :
187 0 : var nums []base.DiskFileNum
188 0 : numsMap := make(map[base.DiskFileNum]struct{})
189 0 : for l := startLevel; l < len(tables); l++ {
190 0 : for _, t := range tables[l] {
191 0 : n := t.BackingSSTNum
192 0 : if _, ok := numsMap[n]; !ok {
193 0 : nums = append(nums, n)
194 0 : numsMap[n] = struct{}{}
195 0 : }
196 : }
197 : }
198 :
199 0 : p := db.ObjProvider()
200 0 : var res []objstorage.Readable
201 0 : for _, n := range nums {
202 0 : r, err := p.OpenForReading(context.Background(), base.FileTypeTable, n, objstorage.OpenOptions{})
203 0 : if err != nil {
204 0 : for _, r := range res {
205 0 : _ = r.Close()
206 0 : }
207 0 : return nil, err
208 : }
209 0 : if r.Size() < maxIOSize {
210 0 : _ = r.Close()
211 0 : continue
212 : }
213 0 : res = append(res, r)
214 : }
215 0 : if len(res) == 0 {
216 0 : return nil, errors.Errorf("no sstables (with size at least %d)", maxIOSize)
217 0 : }
218 :
219 0 : return res, nil
220 : }
221 :
222 : // parseIOSizes parses a comma-separated list of IO sizes, in KB.
223 0 : func parseIOSizes(sizes string) ([]int, error) {
224 0 : var res []int
225 0 : for _, s := range strings.Split(sizes, ",") {
226 0 : n, err := strconv.Atoi(s)
227 0 : if err != nil {
228 0 : return nil, err
229 0 : }
230 0 : ioSize := n * 1024
231 0 : if ioSize > maxIOSize {
232 0 : return nil, errors.Errorf("IO sizes over %d not supported", maxIOSize)
233 0 : }
234 0 : if maxIOSize%ioSize != 0 {
235 0 : return nil, errors.Errorf("IO size must be a divisor of %d", maxIOSize)
236 0 : }
237 0 : res = append(res, ioSize)
238 : }
239 0 : if len(res) == 0 {
240 0 : return nil, errors.Errorf("no IO sizes specified")
241 0 : }
242 0 : sort.Ints(res)
243 0 : return res, nil
244 : }
245 :
246 : // performIOs performs the given list of IOs and populates the elapsed fields.
247 0 : func performIOs(readables []objstorage.Readable, ios []benchIO) error {
248 0 : ctx := context.Background()
249 0 : rh := make([]objstorage.ReadHandle, len(readables))
250 0 : for i := range rh {
251 0 : rh[i] = readables[i].NewReadHandle(objstorage.NoReadBefore)
252 0 : }
253 0 : defer func() {
254 0 : for i := range rh {
255 0 : rh[i].Close()
256 0 : }
257 : }()
258 :
259 0 : buf := make([]byte, maxIOSize)
260 0 : startTime := time.Now()
261 0 : var firstErr error
262 0 : var nOtherErrs int
263 0 : for i := range ios {
264 0 : if err := rh[ios[i].readableIdx].ReadAt(ctx, buf[:ios[i].size], ios[i].ofs); err != nil {
265 0 : if firstErr == nil {
266 0 : firstErr = err
267 0 : } else {
268 0 : nOtherErrs++
269 0 : }
270 : }
271 0 : endTime := time.Now()
272 0 : ios[i].elapsed = endTime.Sub(startTime)
273 0 : startTime = endTime
274 : }
275 0 : if nOtherErrs > 0 {
276 0 : return errors.Errorf("%v; plus %d more errors", firstErr, nOtherErrs)
277 0 : }
278 0 : return firstErr
279 : }
280 :
281 : // getStats calculates various statistics given a list of elapsed times.
282 0 : func getStats(d []time.Duration) string {
283 0 : slices.Sort(d)
284 0 :
285 0 : factor := 1.0 / float64(len(d))
286 0 : var mean float64
287 0 : for i := range d {
288 0 : mean += float64(d[i]) * factor
289 0 : }
290 0 : var variance float64
291 0 : for i := range d {
292 0 : delta := float64(d[i]) - mean
293 0 : variance += delta * delta * factor
294 0 : }
295 :
296 0 : toStr := func(d time.Duration) string {
297 0 : if d < 10*time.Millisecond {
298 0 : return fmt.Sprintf("%1.2fms", float64(d)/float64(time.Millisecond))
299 0 : }
300 0 : if d < 100*time.Millisecond {
301 0 : return fmt.Sprintf("%2.1fms", float64(d)/float64(time.Millisecond))
302 0 : }
303 0 : return fmt.Sprintf("%4dms", d/time.Millisecond)
304 : }
305 :
306 0 : return fmt.Sprintf(
307 0 : "avg %s stddev %s p10 %s p50 %s p90 %s p95 %s p99 %s",
308 0 : toStr(time.Duration(mean)),
309 0 : toStr(time.Duration(math.Sqrt(variance))),
310 0 : toStr(d[len(d)*10/100]),
311 0 : toStr(d[len(d)*50/100]),
312 0 : toStr(d[len(d)*90/100]),
313 0 : toStr(d[len(d)*95/100]),
314 0 : toStr(d[len(d)*99/100]),
315 0 : )
316 : }
|