Line data Source code
1 : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use 2 : // of this source code is governed by a BSD-style license that can be found in 3 : // the LICENSE file. 4 : 5 : // Package datatest provides common datadriven test commands for use outside of 6 : // the root Pebble package. 7 : package datatest 8 : 9 : import ( 10 : "strings" 11 : "sync" 12 : 13 : "github.com/cockroachdb/datadriven" 14 : "github.com/cockroachdb/errors" 15 : "github.com/cockroachdb/pebble" 16 : ) 17 : 18 : // TODO(jackson): Consider a refactoring that can consolidate this package and 19 : // the datadriven commands defined in pebble/data_test.go. 20 : 21 : // DefineBatch interprets the provided datadriven command as a sequence of write 22 : // operations, one-per-line, to apply to the provided batch. 23 1 : func DefineBatch(d *datadriven.TestData, b *pebble.Batch) error { 24 1 : for _, line := range strings.Split(d.Input, "\n") { 25 1 : parts := strings.Fields(line) 26 1 : if len(parts) == 0 { 27 0 : continue 28 : } 29 1 : if parts[1] == `<nil>` { 30 0 : parts[1] = "" 31 0 : } 32 1 : var err error 33 1 : switch parts[0] { 34 1 : case "set": 35 1 : if len(parts) != 3 { 36 0 : return errors.Errorf("%s expects 2 arguments", parts[0]) 37 0 : } 38 1 : err = b.Set([]byte(parts[1]), []byte(parts[2]), nil) 39 1 : case "del": 40 1 : if len(parts) != 2 { 41 0 : return errors.Errorf("%s expects 1 argument", parts[0]) 42 0 : } 43 1 : err = b.Delete([]byte(parts[1]), nil) 44 1 : case "singledel": 45 1 : if len(parts) != 2 { 46 0 : return errors.Errorf("%s expects 1 argument", parts[0]) 47 0 : } 48 1 : err = b.SingleDelete([]byte(parts[1]), nil) 49 1 : case "del-range": 50 1 : if len(parts) != 3 { 51 0 : return errors.Errorf("%s expects 2 arguments", parts[0]) 52 0 : } 53 1 : err = b.DeleteRange([]byte(parts[1]), []byte(parts[2]), nil) 54 0 : case "merge": 55 0 : if len(parts) != 3 { 56 0 : return errors.Errorf("%s expects 2 arguments", parts[0]) 57 0 : } 58 0 : err = b.Merge([]byte(parts[1]), []byte(parts[2]), nil) 59 1 : case "range-key-set": 60 1 : if len(parts) != 5 { 61 0 : return errors.Errorf("%s expects 4 arguments", parts[0]) 62 0 : } 63 1 : err = b.RangeKeySet( 64 1 : []byte(parts[1]), 65 1 : []byte(parts[2]), 66 1 : []byte(parts[3]), 67 1 : []byte(parts[4]), 68 1 : nil) 69 1 : case "range-key-unset": 70 1 : if len(parts) != 4 { 71 0 : return errors.Errorf("%s expects 3 arguments", parts[0]) 72 0 : } 73 1 : err = b.RangeKeyUnset( 74 1 : []byte(parts[1]), 75 1 : []byte(parts[2]), 76 1 : []byte(parts[3]), 77 1 : nil) 78 1 : case "range-key-del": 79 1 : if len(parts) != 3 { 80 0 : return errors.Errorf("%s expects 2 arguments", parts[0]) 81 0 : } 82 1 : err = b.RangeKeyDelete( 83 1 : []byte(parts[1]), 84 1 : []byte(parts[2]), 85 1 : nil) 86 0 : default: 87 0 : return errors.Errorf("unknown op: %s", parts[0]) 88 : } 89 1 : if err != nil { 90 0 : return err 91 0 : } 92 : } 93 1 : return nil 94 : } 95 : 96 : // CompactionTracker is a listener that tracks the number of compactions. 97 : type CompactionTracker struct { 98 : sync.Cond 99 : count int 100 : attached bool 101 : } 102 : 103 : // NewCompactionTracker setups the necessary options to keep track of the 104 : // compactions that are in flight. 105 1 : func NewCompactionTracker(options *pebble.Options) *CompactionTracker { 106 1 : ct := CompactionTracker{} 107 1 : ct.Cond = sync.Cond{ 108 1 : L: &sync.Mutex{}, 109 1 : } 110 1 : ct.attached = true 111 1 : el := pebble.EventListener{ 112 1 : CompactionEnd: func(info pebble.CompactionInfo) { 113 1 : ct.L.Lock() 114 1 : ct.count-- 115 1 : ct.Broadcast() 116 1 : ct.L.Unlock() 117 1 : }, 118 1 : CompactionBegin: func(info pebble.CompactionInfo) { 119 1 : ct.L.Lock() 120 1 : ct.count++ 121 1 : ct.Broadcast() 122 1 : ct.L.Unlock() 123 1 : }, 124 : } 125 : 126 1 : options.AddEventListener(el) 127 1 : return &ct 128 : } 129 : 130 : // WaitForInflightCompactionsToEqual waits until compactions meet the specified target. 131 1 : func (cql *CompactionTracker) WaitForInflightCompactionsToEqual(target int) { 132 1 : cql.L.Lock() 133 1 : if !cql.attached { 134 0 : panic("Cannot wait for compactions if listener has not been attached") 135 : } 136 1 : for cql.count != target { 137 0 : cql.Wait() 138 0 : } 139 1 : cql.L.Unlock() 140 : }