Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use 2 : // of this source code is governed by a BSD-style license that can be found in 3 : // the LICENSE file. 4 : 5 : package pebble 6 : 7 : import ( 8 : "sync" 9 : "time" 10 : ) 11 : 12 : // deletionPacerInfo contains any info from the db necessary to make deletion 13 : // pacing decisions (to limit background IO usage so that it does not contend 14 : // with foreground traffic). 15 : type deletionPacerInfo struct { 16 : freeBytes uint64 17 : obsoleteBytes uint64 18 : liveBytes uint64 19 : } 20 : 21 : // deletionPacer rate limits deletions of obsolete files. This is necessary to 22 : // prevent overloading the disk with too many deletions too quickly after a 23 : // large compaction, or an iterator close. On some SSDs, disk performance can be 24 : // negatively impacted if too many blocks are deleted very quickly, so this 25 : // mechanism helps mitigate that. 26 : type deletionPacer struct { 27 : // If there are less than freeSpaceThreshold bytes of free space on 28 : // disk, increase the pace of deletions such that we delete enough bytes to 29 : // get back to the threshold within the freeSpaceTimeframe. 30 : freeSpaceThreshold uint64 31 : freeSpaceTimeframe time.Duration 32 : 33 : // If the ratio of obsolete bytes to live bytes is greater than 34 : // obsoleteBytesMaxRatio, increase the pace of deletions such that we delete 35 : // enough bytes to get back to the threshold within the obsoleteBytesTimeframe. 36 : obsoleteBytesMaxRatio float64 37 : obsoleteBytesTimeframe time.Duration 38 : 39 : mu struct { 40 : sync.Mutex 41 : 42 : // history keeps rack of recent deletion history; it used to increase the 43 : // deletion rate to match the pace of deletions. 44 : history history 45 : } 46 : 47 : targetByteDeletionRate int64 48 : 49 : getInfo func() deletionPacerInfo 50 : } 51 : 52 : const deletePacerHistory = 5 * time.Minute 53 : 54 : // newDeletionPacer instantiates a new deletionPacer for use when deleting 55 : // obsolete files. 56 : // 57 : // targetByteDeletionRate is the rate (in bytes/sec) at which we want to 58 : // normally limit deletes (when we are not falling behind or running out of 59 : // space). A value of 0.0 disables pacing. 60 : func newDeletionPacer( 61 : now time.Time, targetByteDeletionRate int64, getInfo func() deletionPacerInfo, 62 1 : ) *deletionPacer { 63 1 : d := &deletionPacer{ 64 1 : freeSpaceThreshold: 16 << 30, // 16 GB 65 1 : freeSpaceTimeframe: 10 * time.Second, 66 1 : 67 1 : obsoleteBytesMaxRatio: 0.20, 68 1 : obsoleteBytesTimeframe: 5 * time.Minute, 69 1 : 70 1 : targetByteDeletionRate: targetByteDeletionRate, 71 1 : getInfo: getInfo, 72 1 : } 73 1 : d.mu.history.Init(now, deletePacerHistory) 74 1 : return d 75 1 : } 76 : 77 : // ReportDeletion is used to report a deletion to the pacer. The pacer uses it 78 : // to keep track of the recent rate of deletions and potentially increase the 79 : // deletion rate accordingly. 80 : // 81 : // ReportDeletion is thread-safe. 82 1 : func (p *deletionPacer) ReportDeletion(now time.Time, bytesToDelete uint64) { 83 1 : p.mu.Lock() 84 1 : defer p.mu.Unlock() 85 1 : p.mu.history.Add(now, int64(bytesToDelete)) 86 1 : } 87 : 88 : // PacingDelay returns the recommended pacing wait time (in seconds) for 89 : // deleting the given number of bytes. 90 : // 91 : // PacingDelay is thread-safe. 92 1 : func (p *deletionPacer) PacingDelay(now time.Time, bytesToDelete uint64) (waitSeconds float64) { 93 1 : if p.targetByteDeletionRate == 0 { 94 1 : // Pacing disabled. 95 1 : return 0.0 96 1 : } 97 : 98 1 : baseRate := float64(p.targetByteDeletionRate) 99 1 : // If recent deletion rate is more than our target, use that so that we don't 100 1 : // fall behind. 101 1 : historicRate := func() float64 { 102 1 : p.mu.Lock() 103 1 : defer p.mu.Unlock() 104 1 : return float64(p.mu.history.Sum(now)) / deletePacerHistory.Seconds() 105 1 : }() 106 1 : if historicRate > baseRate { 107 0 : baseRate = historicRate 108 0 : } 109 : 110 : // Apply heuristics to increase the deletion rate. 111 1 : var extraRate float64 112 1 : info := p.getInfo() 113 1 : if info.freeBytes <= p.freeSpaceThreshold { 114 0 : // Increase the rate so that we can free up enough bytes within the timeframe. 115 0 : extraRate = float64(p.freeSpaceThreshold-info.freeBytes) / p.freeSpaceTimeframe.Seconds() 116 0 : } 117 1 : if info.liveBytes == 0 { 118 1 : // We don't know the obsolete bytes ratio. Disable pacing altogether. 119 1 : return 0.0 120 1 : } 121 1 : obsoleteBytesRatio := float64(info.obsoleteBytes) / float64(info.liveBytes) 122 1 : if obsoleteBytesRatio >= p.obsoleteBytesMaxRatio { 123 1 : // Increase the rate so that we can free up enough bytes within the timeframe. 124 1 : r := (obsoleteBytesRatio - p.obsoleteBytesMaxRatio) * float64(info.liveBytes) / p.obsoleteBytesTimeframe.Seconds() 125 1 : if extraRate < r { 126 1 : extraRate = r 127 1 : } 128 : } 129 : 130 1 : return float64(bytesToDelete) / (baseRate + extraRate) 131 : } 132 : 133 : // history is a helper used to keep track of the recent history of a set of 134 : // data points (in our case deleted bytes), at limited granularity. 135 : // Specifically, we split the desired timeframe into 100 "epochs" and all times 136 : // are effectively rounded down to the nearest epoch boundary. 137 : type history struct { 138 : epochDuration time.Duration 139 : startTime time.Time 140 : // currEpoch is the epoch of the most recent operation. 141 : currEpoch int64 142 : // val contains the recent epoch values. 143 : // val[currEpoch % historyEpochs] is the current epoch. 144 : // val[(currEpoch + 1) % historyEpochs] is the oldest epoch. 145 : val [historyEpochs]int64 146 : // sum is always equal to the sum of values in val. 147 : sum int64 148 : } 149 : 150 : const historyEpochs = 100 151 : 152 : // Init the history helper to keep track of data over the given number of 153 : // seconds. 154 1 : func (h *history) Init(now time.Time, timeframe time.Duration) { 155 1 : *h = history{ 156 1 : epochDuration: timeframe / time.Duration(historyEpochs), 157 1 : startTime: now, 158 1 : currEpoch: 0, 159 1 : sum: 0, 160 1 : } 161 1 : } 162 : 163 : // Add adds a value for the current time. 164 1 : func (h *history) Add(now time.Time, val int64) { 165 1 : h.advance(now) 166 1 : h.val[h.currEpoch%historyEpochs] += val 167 1 : h.sum += val 168 1 : } 169 : 170 : // Sum returns the sum of recent values. The result is approximate in that the 171 : // cut-off time is within 1% of the exact one. 172 1 : func (h *history) Sum(now time.Time) int64 { 173 1 : h.advance(now) 174 1 : return h.sum 175 1 : } 176 : 177 1 : func (h *history) epoch(t time.Time) int64 { 178 1 : return int64(t.Sub(h.startTime) / h.epochDuration) 179 1 : } 180 : 181 : // advance advances the time to the given time. 182 1 : func (h *history) advance(now time.Time) { 183 1 : epoch := h.epoch(now) 184 1 : for h.currEpoch < epoch { 185 1 : h.currEpoch++ 186 1 : // Forget the data for the oldest epoch. 187 1 : h.sum -= h.val[h.currEpoch%historyEpochs] 188 1 : h.val[h.currEpoch%historyEpochs] = 0 189 1 : } 190 : }