Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "sync"
9 : "time"
10 :
11 : "github.com/cockroachdb/crlib/crtime"
12 : )
13 :
14 : // deletionPacerInfo contains any info from the db necessary to make deletion
15 : // pacing decisions (to limit background IO usage so that it does not contend
16 : // with foreground traffic).
17 : type deletionPacerInfo struct {
18 : freeBytes uint64
19 : obsoleteBytes uint64
20 : liveBytes uint64
21 : }
22 :
23 : // deletionPacer rate limits deletions of obsolete files. This is necessary to
24 : // prevent overloading the disk with too many deletions too quickly after a
25 : // large compaction, or an iterator close. On some SSDs, disk performance can be
26 : // negatively impacted if too many blocks are deleted very quickly, so this
27 : // mechanism helps mitigate that.
28 : type deletionPacer struct {
29 : // If there are less than freeSpaceThreshold bytes of free space on
30 : // disk, increase the pace of deletions such that we delete enough bytes to
31 : // get back to the threshold within the freeSpaceTimeframe.
32 : freeSpaceThreshold uint64
33 : freeSpaceTimeframe time.Duration
34 :
35 : // If the ratio of obsolete bytes to live bytes is greater than
36 : // obsoleteBytesMaxRatio, increase the pace of deletions such that we delete
37 : // enough bytes to get back to the threshold within the obsoleteBytesTimeframe.
38 : obsoleteBytesMaxRatio float64
39 : obsoleteBytesTimeframe time.Duration
40 :
41 : mu struct {
42 : sync.Mutex
43 :
44 : // history keeps rack of recent deletion history; it used to increase the
45 : // deletion rate to match the pace of deletions.
46 : history history
47 : }
48 :
49 : targetByteDeletionRate int64
50 :
51 : getInfo func() deletionPacerInfo
52 : }
53 :
54 : const deletePacerHistory = 5 * time.Minute
55 :
56 : // newDeletionPacer instantiates a new deletionPacer for use when deleting
57 : // obsolete files.
58 : //
59 : // targetByteDeletionRate is the rate (in bytes/sec) at which we want to
60 : // normally limit deletes (when we are not falling behind or running out of
61 : // space). A value of 0.0 disables pacing.
62 : func newDeletionPacer(
63 : now crtime.Mono,
64 : freeSpaceThreshold uint64,
65 : targetByteDeletionRate int64,
66 : freeSpaceTimeframe time.Duration,
67 : obsoleteBytesMaxRatio float64,
68 : obsoleteBytesTimeframe time.Duration,
69 : getInfo func() deletionPacerInfo,
70 2 : ) *deletionPacer {
71 2 : d := &deletionPacer{
72 2 : freeSpaceThreshold: freeSpaceThreshold,
73 2 : freeSpaceTimeframe: freeSpaceTimeframe,
74 2 :
75 2 : obsoleteBytesMaxRatio: obsoleteBytesMaxRatio,
76 2 : obsoleteBytesTimeframe: obsoleteBytesTimeframe,
77 2 :
78 2 : targetByteDeletionRate: targetByteDeletionRate,
79 2 : getInfo: getInfo,
80 2 : }
81 2 :
82 2 : d.mu.history.Init(now, deletePacerHistory)
83 2 : return d
84 2 : }
85 :
86 : // ReportDeletion is used to report a deletion to the pacer. The pacer uses it
87 : // to keep track of the recent rate of deletions and potentially increase the
88 : // deletion rate accordingly.
89 : //
90 : // ReportDeletion is thread-safe.
91 2 : func (p *deletionPacer) ReportDeletion(now crtime.Mono, bytesToDelete uint64) {
92 2 : p.mu.Lock()
93 2 : defer p.mu.Unlock()
94 2 : p.mu.history.Add(now, int64(bytesToDelete))
95 2 : }
96 :
97 : // PacingDelay returns the recommended pacing wait time (in seconds) for
98 : // deleting the given number of bytes.
99 : //
100 : // PacingDelay is thread-safe.
101 2 : func (p *deletionPacer) PacingDelay(now crtime.Mono, bytesToDelete uint64) (waitSeconds float64) {
102 2 : if p.targetByteDeletionRate == 0 {
103 2 : // Pacing disabled.
104 2 : return 0.0
105 2 : }
106 :
107 2 : baseRate := float64(p.targetByteDeletionRate)
108 2 : // If recent deletion rate is more than our target, use that so that we don't
109 2 : // fall behind.
110 2 : historicRate := func() float64 {
111 2 : p.mu.Lock()
112 2 : defer p.mu.Unlock()
113 2 : return float64(p.mu.history.Sum(now)) / deletePacerHistory.Seconds()
114 2 : }()
115 2 : if historicRate > baseRate {
116 1 : baseRate = historicRate
117 1 : }
118 :
119 : // Apply heuristics to increase the deletion rate.
120 2 : var extraRate float64
121 2 : info := p.getInfo()
122 2 : if info.freeBytes <= p.freeSpaceThreshold {
123 1 : // Increase the rate so that we can free up enough bytes within the timeframe.
124 1 : extraRate = float64(p.freeSpaceThreshold-info.freeBytes) / p.freeSpaceTimeframe.Seconds()
125 1 : }
126 2 : if info.liveBytes == 0 {
127 2 : // We don't know the obsolete bytes ratio. Disable pacing altogether.
128 2 : return 0.0
129 2 : }
130 2 : obsoleteBytesRatio := float64(info.obsoleteBytes) / float64(info.liveBytes)
131 2 : if obsoleteBytesRatio >= p.obsoleteBytesMaxRatio {
132 2 : // Increase the rate so that we can free up enough bytes within the timeframe.
133 2 : r := (obsoleteBytesRatio - p.obsoleteBytesMaxRatio) * float64(info.liveBytes) / p.obsoleteBytesTimeframe.Seconds()
134 2 : if extraRate < r {
135 2 : extraRate = r
136 2 : }
137 : }
138 :
139 2 : return float64(bytesToDelete) / (baseRate + extraRate)
140 : }
141 :
142 : // history is a helper used to keep track of the recent history of a set of
143 : // data points (in our case deleted bytes), at limited granularity.
144 : // Specifically, we split the desired timeframe into 100 "epochs" and all times
145 : // are effectively rounded down to the nearest epoch boundary.
146 : type history struct {
147 : epochDuration time.Duration
148 : startTime crtime.Mono
149 : // currEpoch is the epoch of the most recent operation.
150 : currEpoch int64
151 : // val contains the recent epoch values.
152 : // val[currEpoch % historyEpochs] is the current epoch.
153 : // val[(currEpoch + 1) % historyEpochs] is the oldest epoch.
154 : val [historyEpochs]int64
155 : // sum is always equal to the sum of values in val.
156 : sum int64
157 : }
158 :
159 : const historyEpochs = 100
160 :
161 : // Init the history helper to keep track of data over the given number of
162 : // seconds.
163 2 : func (h *history) Init(now crtime.Mono, timeframe time.Duration) {
164 2 : *h = history{
165 2 : epochDuration: timeframe / time.Duration(historyEpochs),
166 2 : startTime: now,
167 2 : currEpoch: 0,
168 2 : sum: 0,
169 2 : }
170 2 : }
171 :
172 : // Add adds a value for the current time.
173 2 : func (h *history) Add(now crtime.Mono, val int64) {
174 2 : h.advance(now)
175 2 : h.val[h.currEpoch%historyEpochs] += val
176 2 : h.sum += val
177 2 : }
178 :
179 : // Sum returns the sum of recent values. The result is approximate in that the
180 : // cut-off time is within 1% of the exact one.
181 2 : func (h *history) Sum(now crtime.Mono) int64 {
182 2 : h.advance(now)
183 2 : return h.sum
184 2 : }
185 :
186 2 : func (h *history) epoch(t crtime.Mono) int64 {
187 2 : return int64(t.Sub(h.startTime) / h.epochDuration)
188 2 : }
189 :
190 : // advance advances the time to the given time.
191 2 : func (h *history) advance(now crtime.Mono) {
192 2 : epoch := h.epoch(now)
193 2 : for h.currEpoch < epoch {
194 2 : h.currEpoch++
195 2 : // Forget the data for the oldest epoch.
196 2 : h.sum -= h.val[h.currEpoch%historyEpochs]
197 2 : h.val[h.currEpoch%historyEpochs] = 0
198 2 : }
199 : }
|