Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use 2 : // of this source code is governed by a BSD-style license that can be found in 3 : // the LICENSE file. 4 : 5 : package pebble 6 : 7 : import ( 8 : "sync" 9 : "time" 10 : ) 11 : 12 : // deletionPacerInfo contains any info from the db necessary to make deletion 13 : // pacing decisions (to limit background IO usage so that it does not contend 14 : // with foreground traffic). 15 : type deletionPacerInfo struct { 16 : freeBytes uint64 17 : obsoleteBytes uint64 18 : liveBytes uint64 19 : } 20 : 21 : // deletionPacer rate limits deletions of obsolete files. This is necessary to 22 : // prevent overloading the disk with too many deletions too quickly after a 23 : // large compaction, or an iterator close. On some SSDs, disk performance can be 24 : // negatively impacted if too many blocks are deleted very quickly, so this 25 : // mechanism helps mitigate that. 26 : type deletionPacer struct { 27 : // If there are less than freeSpaceThreshold bytes of free space on 28 : // disk, increase the pace of deletions such that we delete enough bytes to 29 : // get back to the threshold within the freeSpaceTimeframe. 30 : freeSpaceThreshold uint64 31 : freeSpaceTimeframe time.Duration 32 : 33 : // If the ratio of obsolete bytes to live bytes is greater than 34 : // obsoleteBytesMaxRatio, increase the pace of deletions such that we delete 35 : // enough bytes to get back to the threshold within the obsoleteBytesTimeframe. 36 : obsoleteBytesMaxRatio float64 37 : obsoleteBytesTimeframe time.Duration 38 : 39 : mu struct { 40 : sync.Mutex 41 : 42 : // history keeps rack of recent deletion history; it used to increase the 43 : // deletion rate to match the pace of deletions. 44 : history history 45 : } 46 : 47 : targetByteDeletionRate int64 48 : 49 : getInfo func() deletionPacerInfo 50 : } 51 : 52 : const deletePacerHistory = 5 * time.Minute 53 : 54 : // newDeletionPacer instantiates a new deletionPacer for use when deleting 55 : // obsolete files. 56 : // 57 : // targetByteDeletionRate is the rate (in bytes/sec) at which we want to 58 : // normally limit deletes (when we are not falling behind or running out of 59 : // space). A value of 0.0 disables pacing. 60 : func newDeletionPacer( 61 : now time.Time, targetByteDeletionRate int64, getInfo func() deletionPacerInfo, 62 2 : ) *deletionPacer { 63 2 : d := &deletionPacer{ 64 2 : freeSpaceThreshold: 16 << 30, // 16 GB 65 2 : freeSpaceTimeframe: 10 * time.Second, 66 2 : 67 2 : obsoleteBytesMaxRatio: 0.20, 68 2 : obsoleteBytesTimeframe: 5 * time.Minute, 69 2 : 70 2 : targetByteDeletionRate: targetByteDeletionRate, 71 2 : getInfo: getInfo, 72 2 : } 73 2 : d.mu.history.Init(now, deletePacerHistory) 74 2 : return d 75 2 : } 76 : 77 : // ReportDeletion is used to report a deletion to the pacer. The pacer uses it 78 : // to keep track of the recent rate of deletions and potentially increase the 79 : // deletion rate accordingly. 80 : // 81 : // ReportDeletion is thread-safe. 82 2 : func (p *deletionPacer) ReportDeletion(now time.Time, bytesToDelete uint64) { 83 2 : p.mu.Lock() 84 2 : defer p.mu.Unlock() 85 2 : p.mu.history.Add(now, int64(bytesToDelete)) 86 2 : } 87 : 88 : // PacingDelay returns the recommended pacing wait time (in seconds) for 89 : // deleting the given number of bytes. 90 : // 91 : // PacingDelay is thread-safe. 92 2 : func (p *deletionPacer) PacingDelay(now time.Time, bytesToDelete uint64) (waitSeconds float64) { 93 2 : if p.targetByteDeletionRate == 0 { 94 2 : // Pacing disabled. 95 2 : return 0.0 96 2 : } 97 : 98 2 : baseRate := float64(p.targetByteDeletionRate) 99 2 : // If recent deletion rate is more than our target, use that so that we don't 100 2 : // fall behind. 101 2 : historicRate := func() float64 { 102 2 : p.mu.Lock() 103 2 : defer p.mu.Unlock() 104 2 : return float64(p.mu.history.Sum(now)) / deletePacerHistory.Seconds() 105 2 : }() 106 2 : if historicRate > baseRate { 107 1 : baseRate = historicRate 108 1 : } 109 : 110 : // Apply heuristics to increase the deletion rate. 111 2 : var extraRate float64 112 2 : info := p.getInfo() 113 2 : if info.freeBytes <= p.freeSpaceThreshold { 114 1 : // Increase the rate so that we can free up enough bytes within the timeframe. 115 1 : extraRate = float64(p.freeSpaceThreshold-info.freeBytes) / p.freeSpaceTimeframe.Seconds() 116 1 : } 117 2 : if info.liveBytes == 0 { 118 2 : // We don't know the obsolete bytes ratio. Disable pacing altogether. 119 2 : return 0.0 120 2 : } 121 2 : obsoleteBytesRatio := float64(info.obsoleteBytes) / float64(info.liveBytes) 122 2 : if obsoleteBytesRatio >= p.obsoleteBytesMaxRatio { 123 2 : // Increase the rate so that we can free up enough bytes within the timeframe. 124 2 : r := (obsoleteBytesRatio - p.obsoleteBytesMaxRatio) * float64(info.liveBytes) / p.obsoleteBytesTimeframe.Seconds() 125 2 : if extraRate < r { 126 2 : extraRate = r 127 2 : } 128 : } 129 : 130 2 : return float64(bytesToDelete) / (baseRate + extraRate) 131 : } 132 : 133 : // history is a helper used to keep track of the recent history of a set of 134 : // data points (in our case deleted bytes), at limited granularity. 135 : // Specifically, we split the desired timeframe into 100 "epochs" and all times 136 : // are effectively rounded down to the nearest epoch boundary. 137 : type history struct { 138 : epochDuration time.Duration 139 : startTime time.Time 140 : // currEpoch is the epoch of the most recent operation. 141 : currEpoch int64 142 : // val contains the recent epoch values. 143 : // val[currEpoch % historyEpochs] is the current epoch. 144 : // val[(currEpoch + 1) % historyEpochs] is the oldest epoch. 145 : val [historyEpochs]int64 146 : // sum is always equal to the sum of values in val. 147 : sum int64 148 : } 149 : 150 : const historyEpochs = 100 151 : 152 : // Init the history helper to keep track of data over the given number of 153 : // seconds. 154 2 : func (h *history) Init(now time.Time, timeframe time.Duration) { 155 2 : *h = history{ 156 2 : epochDuration: timeframe / time.Duration(historyEpochs), 157 2 : startTime: now, 158 2 : currEpoch: 0, 159 2 : sum: 0, 160 2 : } 161 2 : } 162 : 163 : // Add adds a value for the current time. 164 2 : func (h *history) Add(now time.Time, val int64) { 165 2 : h.advance(now) 166 2 : h.val[h.currEpoch%historyEpochs] += val 167 2 : h.sum += val 168 2 : } 169 : 170 : // Sum returns the sum of recent values. The result is approximate in that the 171 : // cut-off time is within 1% of the exact one. 172 2 : func (h *history) Sum(now time.Time) int64 { 173 2 : h.advance(now) 174 2 : return h.sum 175 2 : } 176 : 177 2 : func (h *history) epoch(t time.Time) int64 { 178 2 : return int64(t.Sub(h.startTime) / h.epochDuration) 179 2 : } 180 : 181 : // advance advances the time to the given time. 182 2 : func (h *history) advance(now time.Time) { 183 2 : epoch := h.epoch(now) 184 2 : for h.currEpoch < epoch { 185 2 : h.currEpoch++ 186 2 : // Forget the data for the oldest epoch. 187 2 : h.sum -= h.val[h.currEpoch%historyEpochs] 188 2 : h.val[h.currEpoch%historyEpochs] = 0 189 2 : } 190 : }