Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use 2 : // of this source code is governed by a BSD-style license that can be found in 3 : // the LICENSE file. 4 : 5 : package pebble 6 : 7 : import ( 8 : "sync" 9 : "time" 10 : 11 : "github.com/cockroachdb/crlib/crtime" 12 : ) 13 : 14 : // deletionPacerInfo contains any info from the db necessary to make deletion 15 : // pacing decisions (to limit background IO usage so that it does not contend 16 : // with foreground traffic). 17 : type deletionPacerInfo struct { 18 : freeBytes uint64 19 : obsoleteBytes uint64 20 : liveBytes uint64 21 : } 22 : 23 : // deletionPacer rate limits deletions of obsolete files. This is necessary to 24 : // prevent overloading the disk with too many deletions too quickly after a 25 : // large compaction, or an iterator close. On some SSDs, disk performance can be 26 : // negatively impacted if too many blocks are deleted very quickly, so this 27 : // mechanism helps mitigate that. 28 : type deletionPacer struct { 29 : // If there are less than freeSpaceThreshold bytes of free space on 30 : // disk, increase the pace of deletions such that we delete enough bytes to 31 : // get back to the threshold within the freeSpaceTimeframe. 32 : freeSpaceThreshold uint64 33 : freeSpaceTimeframe time.Duration 34 : 35 : // If the ratio of obsolete bytes to live bytes is greater than 36 : // obsoleteBytesMaxRatio, increase the pace of deletions such that we delete 37 : // enough bytes to get back to the threshold within the obsoleteBytesTimeframe. 38 : obsoleteBytesMaxRatio float64 39 : obsoleteBytesTimeframe time.Duration 40 : 41 : mu struct { 42 : sync.Mutex 43 : 44 : // history keeps rack of recent deletion history; it used to increase the 45 : // deletion rate to match the pace of deletions. 46 : history history 47 : } 48 : 49 : targetByteDeletionRate int64 50 : 51 : getInfo func() deletionPacerInfo 52 : } 53 : 54 : const deletePacerHistory = 5 * time.Minute 55 : 56 : // newDeletionPacer instantiates a new deletionPacer for use when deleting 57 : // obsolete files. 58 : // 59 : // targetByteDeletionRate is the rate (in bytes/sec) at which we want to 60 : // normally limit deletes (when we are not falling behind or running out of 61 : // space). A value of 0.0 disables pacing. 62 : func newDeletionPacer( 63 : now crtime.Mono, targetByteDeletionRate int64, getInfo func() deletionPacerInfo, 64 1 : ) *deletionPacer { 65 1 : d := &deletionPacer{ 66 1 : freeSpaceThreshold: 16 << 30, // 16 GB 67 1 : freeSpaceTimeframe: 10 * time.Second, 68 1 : 69 1 : obsoleteBytesMaxRatio: 0.20, 70 1 : obsoleteBytesTimeframe: 5 * time.Minute, 71 1 : 72 1 : targetByteDeletionRate: targetByteDeletionRate, 73 1 : getInfo: getInfo, 74 1 : } 75 1 : d.mu.history.Init(now, deletePacerHistory) 76 1 : return d 77 1 : } 78 : 79 : // ReportDeletion is used to report a deletion to the pacer. The pacer uses it 80 : // to keep track of the recent rate of deletions and potentially increase the 81 : // deletion rate accordingly. 82 : // 83 : // ReportDeletion is thread-safe. 84 1 : func (p *deletionPacer) ReportDeletion(now crtime.Mono, bytesToDelete uint64) { 85 1 : p.mu.Lock() 86 1 : defer p.mu.Unlock() 87 1 : p.mu.history.Add(now, int64(bytesToDelete)) 88 1 : } 89 : 90 : // PacingDelay returns the recommended pacing wait time (in seconds) for 91 : // deleting the given number of bytes. 92 : // 93 : // PacingDelay is thread-safe. 94 1 : func (p *deletionPacer) PacingDelay(now crtime.Mono, bytesToDelete uint64) (waitSeconds float64) { 95 1 : if p.targetByteDeletionRate == 0 { 96 1 : // Pacing disabled. 97 1 : return 0.0 98 1 : } 99 : 100 1 : baseRate := float64(p.targetByteDeletionRate) 101 1 : // If recent deletion rate is more than our target, use that so that we don't 102 1 : // fall behind. 103 1 : historicRate := func() float64 { 104 1 : p.mu.Lock() 105 1 : defer p.mu.Unlock() 106 1 : return float64(p.mu.history.Sum(now)) / deletePacerHistory.Seconds() 107 1 : }() 108 1 : if historicRate > baseRate { 109 1 : baseRate = historicRate 110 1 : } 111 : 112 : // Apply heuristics to increase the deletion rate. 113 1 : var extraRate float64 114 1 : info := p.getInfo() 115 1 : if info.freeBytes <= p.freeSpaceThreshold { 116 1 : // Increase the rate so that we can free up enough bytes within the timeframe. 117 1 : extraRate = float64(p.freeSpaceThreshold-info.freeBytes) / p.freeSpaceTimeframe.Seconds() 118 1 : } 119 1 : if info.liveBytes == 0 { 120 1 : // We don't know the obsolete bytes ratio. Disable pacing altogether. 121 1 : return 0.0 122 1 : } 123 1 : obsoleteBytesRatio := float64(info.obsoleteBytes) / float64(info.liveBytes) 124 1 : if obsoleteBytesRatio >= p.obsoleteBytesMaxRatio { 125 1 : // Increase the rate so that we can free up enough bytes within the timeframe. 126 1 : r := (obsoleteBytesRatio - p.obsoleteBytesMaxRatio) * float64(info.liveBytes) / p.obsoleteBytesTimeframe.Seconds() 127 1 : if extraRate < r { 128 1 : extraRate = r 129 1 : } 130 : } 131 : 132 1 : return float64(bytesToDelete) / (baseRate + extraRate) 133 : } 134 : 135 : // history is a helper used to keep track of the recent history of a set of 136 : // data points (in our case deleted bytes), at limited granularity. 137 : // Specifically, we split the desired timeframe into 100 "epochs" and all times 138 : // are effectively rounded down to the nearest epoch boundary. 139 : type history struct { 140 : epochDuration time.Duration 141 : startTime crtime.Mono 142 : // currEpoch is the epoch of the most recent operation. 143 : currEpoch int64 144 : // val contains the recent epoch values. 145 : // val[currEpoch % historyEpochs] is the current epoch. 146 : // val[(currEpoch + 1) % historyEpochs] is the oldest epoch. 147 : val [historyEpochs]int64 148 : // sum is always equal to the sum of values in val. 149 : sum int64 150 : } 151 : 152 : const historyEpochs = 100 153 : 154 : // Init the history helper to keep track of data over the given number of 155 : // seconds. 156 1 : func (h *history) Init(now crtime.Mono, timeframe time.Duration) { 157 1 : *h = history{ 158 1 : epochDuration: timeframe / time.Duration(historyEpochs), 159 1 : startTime: now, 160 1 : currEpoch: 0, 161 1 : sum: 0, 162 1 : } 163 1 : } 164 : 165 : // Add adds a value for the current time. 166 1 : func (h *history) Add(now crtime.Mono, val int64) { 167 1 : h.advance(now) 168 1 : h.val[h.currEpoch%historyEpochs] += val 169 1 : h.sum += val 170 1 : } 171 : 172 : // Sum returns the sum of recent values. The result is approximate in that the 173 : // cut-off time is within 1% of the exact one. 174 1 : func (h *history) Sum(now crtime.Mono) int64 { 175 1 : h.advance(now) 176 1 : return h.sum 177 1 : } 178 : 179 1 : func (h *history) epoch(t crtime.Mono) int64 { 180 1 : return int64(t.Sub(h.startTime) / h.epochDuration) 181 1 : } 182 : 183 : // advance advances the time to the given time. 184 1 : func (h *history) advance(now crtime.Mono) { 185 1 : epoch := h.epoch(now) 186 1 : for h.currEpoch < epoch { 187 1 : h.currEpoch++ 188 1 : // Forget the data for the oldest epoch. 189 1 : h.sum -= h.val[h.currEpoch%historyEpochs] 190 1 : h.val[h.currEpoch%historyEpochs] = 0 191 1 : } 192 : }