Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "sync"
9 : "time"
10 :
11 : "github.com/cockroachdb/crlib/crtime"
12 : )
13 :
14 : // deletionPacerInfo contains any info from the db necessary to make deletion
15 : // pacing decisions (to limit background IO usage so that it does not contend
16 : // with foreground traffic).
17 : type deletionPacerInfo struct {
18 : freeBytes uint64
19 : obsoleteBytes uint64
20 : liveBytes uint64
21 : }
22 :
23 : // deletionPacer rate limits deletions of obsolete files. This is necessary to
24 : // prevent overloading the disk with too many deletions too quickly after a
25 : // large compaction, or an iterator close. On some SSDs, disk performance can be
26 : // negatively impacted if too many blocks are deleted very quickly, so this
27 : // mechanism helps mitigate that.
28 : type deletionPacer struct {
29 : // If there are less than freeSpaceThreshold bytes of free space on
30 : // disk, increase the pace of deletions such that we delete enough bytes to
31 : // get back to the threshold within the freeSpaceTimeframe.
32 : freeSpaceThreshold uint64
33 : freeSpaceTimeframe time.Duration
34 :
35 : // If the ratio of obsolete bytes to live bytes is greater than
36 : // obsoleteBytesMaxRatio, increase the pace of deletions such that we delete
37 : // enough bytes to get back to the threshold within the obsoleteBytesTimeframe.
38 : obsoleteBytesMaxRatio float64
39 : obsoleteBytesTimeframe time.Duration
40 :
41 : mu struct {
42 : sync.Mutex
43 :
44 : // history keeps rack of recent deletion history; it used to increase the
45 : // deletion rate to match the pace of deletions.
46 : history history
47 : }
48 :
49 : targetByteDeletionRate int64
50 :
51 : getInfo func() deletionPacerInfo
52 : }
53 :
54 : const deletePacerHistory = 5 * time.Minute
55 :
56 : // newDeletionPacer instantiates a new deletionPacer for use when deleting
57 : // obsolete files.
58 : //
59 : // targetByteDeletionRate is the rate (in bytes/sec) at which we want to
60 : // normally limit deletes (when we are not falling behind or running out of
61 : // space). A value of 0.0 disables pacing.
62 : func newDeletionPacer(
63 : now crtime.Mono,
64 : freeSpaceThreshold uint64,
65 : targetByteDeletionRate int64,
66 : freeSpaceTimeframe time.Duration,
67 : obsoleteBytesMaxRatio float64,
68 : obsoleteBytesTimeframe time.Duration,
69 : getInfo func() deletionPacerInfo,
70 1 : ) *deletionPacer {
71 1 : d := &deletionPacer{
72 1 : freeSpaceThreshold: freeSpaceThreshold,
73 1 : freeSpaceTimeframe: freeSpaceTimeframe,
74 1 :
75 1 : obsoleteBytesMaxRatio: obsoleteBytesMaxRatio,
76 1 : obsoleteBytesTimeframe: obsoleteBytesTimeframe,
77 1 :
78 1 : targetByteDeletionRate: targetByteDeletionRate,
79 1 : getInfo: getInfo,
80 1 : }
81 1 :
82 1 : d.mu.history.Init(now, deletePacerHistory)
83 1 : return d
84 1 : }
85 :
86 : // ReportDeletion is used to report a deletion to the pacer. The pacer uses it
87 : // to keep track of the recent rate of deletions and potentially increase the
88 : // deletion rate accordingly.
89 : //
90 : // ReportDeletion is thread-safe.
91 1 : func (p *deletionPacer) ReportDeletion(now crtime.Mono, bytesToDelete uint64) {
92 1 : p.mu.Lock()
93 1 : defer p.mu.Unlock()
94 1 : p.mu.history.Add(now, int64(bytesToDelete))
95 1 : }
96 :
97 : // PacingDelay returns the recommended pacing wait time (in seconds) for
98 : // deleting the given number of bytes.
99 : //
100 : // PacingDelay is thread-safe.
101 1 : func (p *deletionPacer) PacingDelay(now crtime.Mono, bytesToDelete uint64) (waitSeconds float64) {
102 1 : if p.targetByteDeletionRate == 0 {
103 1 : // Pacing disabled.
104 1 : return 0.0
105 1 : }
106 :
107 1 : baseRate := float64(p.targetByteDeletionRate)
108 1 : // If recent deletion rate is more than our target, use that so that we don't
109 1 : // fall behind.
110 1 : historicRate := func() float64 {
111 1 : p.mu.Lock()
112 1 : defer p.mu.Unlock()
113 1 : return float64(p.mu.history.Sum(now)) / deletePacerHistory.Seconds()
114 1 : }()
115 1 : if historicRate > baseRate {
116 0 : baseRate = historicRate
117 0 : }
118 :
119 : // Apply heuristics to increase the deletion rate.
120 1 : var extraRate float64
121 1 : info := p.getInfo()
122 1 : if info.freeBytes <= p.freeSpaceThreshold {
123 0 : // Increase the rate so that we can free up enough bytes within the timeframe.
124 0 : extraRate = float64(p.freeSpaceThreshold-info.freeBytes) / p.freeSpaceTimeframe.Seconds()
125 0 : }
126 1 : if info.liveBytes == 0 {
127 1 : // We don't know the obsolete bytes ratio. Disable pacing altogether.
128 1 : return 0.0
129 1 : }
130 1 : obsoleteBytesRatio := float64(info.obsoleteBytes) / float64(info.liveBytes)
131 1 : if obsoleteBytesRatio >= p.obsoleteBytesMaxRatio {
132 1 : // Increase the rate so that we can free up enough bytes within the timeframe.
133 1 : r := (obsoleteBytesRatio - p.obsoleteBytesMaxRatio) * float64(info.liveBytes) / p.obsoleteBytesTimeframe.Seconds()
134 1 : if extraRate < r {
135 1 : extraRate = r
136 1 : }
137 : }
138 :
139 1 : return float64(bytesToDelete) / (baseRate + extraRate)
140 : }
141 :
142 : // history is a helper used to keep track of the recent history of a set of
143 : // data points (in our case deleted bytes), at limited granularity.
144 : // Specifically, we split the desired timeframe into 100 "epochs" and all times
145 : // are effectively rounded down to the nearest epoch boundary.
146 : type history struct {
147 : epochDuration time.Duration
148 : startTime crtime.Mono
149 : // currEpoch is the epoch of the most recent operation.
150 : currEpoch int64
151 : // val contains the recent epoch values.
152 : // val[currEpoch % historyEpochs] is the current epoch.
153 : // val[(currEpoch + 1) % historyEpochs] is the oldest epoch.
154 : val [historyEpochs]int64
155 : // sum is always equal to the sum of values in val.
156 : sum int64
157 : }
158 :
159 : const historyEpochs = 100
160 :
161 : // Init the history helper to keep track of data over the given number of
162 : // seconds.
163 1 : func (h *history) Init(now crtime.Mono, timeframe time.Duration) {
164 1 : *h = history{
165 1 : epochDuration: timeframe / time.Duration(historyEpochs),
166 1 : startTime: now,
167 1 : currEpoch: 0,
168 1 : sum: 0,
169 1 : }
170 1 : }
171 :
172 : // Add adds a value for the current time.
173 1 : func (h *history) Add(now crtime.Mono, val int64) {
174 1 : h.advance(now)
175 1 : h.val[h.currEpoch%historyEpochs] += val
176 1 : h.sum += val
177 1 : }
178 :
179 : // Sum returns the sum of recent values. The result is approximate in that the
180 : // cut-off time is within 1% of the exact one.
181 1 : func (h *history) Sum(now crtime.Mono) int64 {
182 1 : h.advance(now)
183 1 : return h.sum
184 1 : }
185 :
186 1 : func (h *history) epoch(t crtime.Mono) int64 {
187 1 : return int64(t.Sub(h.startTime) / h.epochDuration)
188 1 : }
189 :
190 : // advance advances the time to the given time.
191 1 : func (h *history) advance(now crtime.Mono) {
192 1 : epoch := h.epoch(now)
193 1 : for h.currEpoch < epoch {
194 1 : h.currEpoch++
195 1 : // Forget the data for the oldest epoch.
196 1 : h.sum -= h.val[h.currEpoch%historyEpochs]
197 1 : h.val[h.currEpoch%historyEpochs] = 0
198 1 : }
199 : }
|