Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "runtime/pprof"
10 : "sync"
11 : "time"
12 :
13 : "github.com/cockroachdb/errors/oserror"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/objstorage"
17 : "github.com/cockroachdb/tokenbucket"
18 : )
19 :
20 : // Cleaner exports the base.Cleaner type.
21 : type Cleaner = base.Cleaner
22 :
23 : // DeleteCleaner exports the base.DeleteCleaner type.
24 : type DeleteCleaner = base.DeleteCleaner
25 :
26 : // ArchiveCleaner exports the base.ArchiveCleaner type.
27 : type ArchiveCleaner = base.ArchiveCleaner
28 :
29 : type cleanupManager struct {
30 : opts *Options
31 : objProvider objstorage.Provider
32 : onTableDeleteFn func(fileSize uint64)
33 : deletePacer *deletionPacer
34 :
35 : // jobsCh is used as the cleanup job queue.
36 : jobsCh chan *cleanupJob
37 : // waitGroup is used to wait for the background goroutine to exit.
38 : waitGroup sync.WaitGroup
39 :
40 : mu struct {
41 : sync.Mutex
42 : // totalJobs is the total number of enqueued jobs (completed or in progress).
43 : totalJobs int
44 : completedJobs int
45 : completedJobsCond sync.Cond
46 : jobsQueueWarningIssued bool
47 : }
48 : }
49 :
50 : // We can queue this many jobs before we have to block EnqueueJob.
51 : const jobsQueueDepth = 1000
52 :
53 : // obsoleteFile holds information about a file that needs to be deleted soon.
54 : type obsoleteFile struct {
55 : dir string
56 : fileNum base.DiskFileNum
57 : fileType fileType
58 : fileSize uint64
59 : }
60 :
61 : type cleanupJob struct {
62 : jobID int
63 : obsoleteFiles []obsoleteFile
64 : }
65 :
66 : // openCleanupManager creates a cleanupManager and starts its background goroutine.
67 : // The cleanupManager must be Close()d.
68 : func openCleanupManager(
69 : opts *Options,
70 : objProvider objstorage.Provider,
71 : onTableDeleteFn func(fileSize uint64),
72 : getDeletePacerInfo func() deletionPacerInfo,
73 1 : ) *cleanupManager {
74 1 : cm := &cleanupManager{
75 1 : opts: opts,
76 1 : objProvider: objProvider,
77 1 : onTableDeleteFn: onTableDeleteFn,
78 1 : deletePacer: newDeletionPacer(time.Now(), int64(opts.TargetByteDeletionRate), getDeletePacerInfo),
79 1 : jobsCh: make(chan *cleanupJob, jobsQueueDepth),
80 1 : }
81 1 : cm.mu.completedJobsCond.L = &cm.mu.Mutex
82 1 : cm.waitGroup.Add(1)
83 1 :
84 1 : go func() {
85 1 : pprof.Do(context.Background(), gcLabels, func(context.Context) {
86 1 : cm.mainLoop()
87 1 : })
88 : }()
89 :
90 1 : return cm
91 : }
92 :
93 : // Close stops the background goroutine, waiting until all queued jobs are completed.
94 : // Delete pacing is disabled for the remaining jobs.
95 1 : func (cm *cleanupManager) Close() {
96 1 : close(cm.jobsCh)
97 1 : cm.waitGroup.Wait()
98 1 : }
99 :
100 : // EnqueueJob adds a cleanup job to the manager's queue.
101 1 : func (cm *cleanupManager) EnqueueJob(jobID int, obsoleteFiles []obsoleteFile) {
102 1 : job := &cleanupJob{
103 1 : jobID: jobID,
104 1 : obsoleteFiles: obsoleteFiles,
105 1 : }
106 1 :
107 1 : // Report deleted bytes to the pacer, which can use this data to potentially
108 1 : // increase the deletion rate to keep up. We want to do this at enqueue time
109 1 : // rather than when we get to the job, otherwise the reported bytes will be
110 1 : // subject to the throttling rate which defeats the purpose.
111 1 : var pacingBytes uint64
112 1 : for _, of := range obsoleteFiles {
113 1 : if cm.needsPacing(of.fileType, of.fileNum) {
114 1 : pacingBytes += of.fileSize
115 1 : }
116 : }
117 1 : if pacingBytes > 0 {
118 1 : cm.deletePacer.ReportDeletion(time.Now(), pacingBytes)
119 1 : }
120 :
121 1 : cm.mu.Lock()
122 1 : cm.mu.totalJobs++
123 1 : cm.maybeLogLocked()
124 1 : cm.mu.Unlock()
125 1 :
126 1 : if invariants.Enabled && len(cm.jobsCh) >= cap(cm.jobsCh)-2 {
127 0 : panic("cleanup jobs queue full")
128 : }
129 :
130 1 : cm.jobsCh <- job
131 : }
132 :
133 : // Wait until the completion of all jobs that were already queued.
134 : //
135 : // Does not wait for jobs that are enqueued during the call.
136 : //
137 : // Note that DB.mu should not be held while calling this method; the background
138 : // goroutine needs to acquire DB.mu to update deleted table metrics.
139 1 : func (cm *cleanupManager) Wait() {
140 1 : cm.mu.Lock()
141 1 : defer cm.mu.Unlock()
142 1 : n := cm.mu.totalJobs
143 1 : for cm.mu.completedJobs < n {
144 1 : cm.mu.completedJobsCond.Wait()
145 1 : }
146 : }
147 :
148 : // mainLoop runs the manager's background goroutine.
149 1 : func (cm *cleanupManager) mainLoop() {
150 1 : defer cm.waitGroup.Done()
151 1 :
152 1 : var tb tokenbucket.TokenBucket
153 1 : // Use a token bucket with 1 token / second refill rate and 1 token burst.
154 1 : tb.Init(1.0, 1.0)
155 1 : for job := range cm.jobsCh {
156 1 : for _, of := range job.obsoleteFiles {
157 1 : if of.fileType != fileTypeTable {
158 1 : path := base.MakeFilepath(cm.opts.FS, of.dir, of.fileType, of.fileNum)
159 1 : cm.deleteObsoleteFile(of.fileType, job.jobID, path, of.fileNum, of.fileSize)
160 1 : } else {
161 1 : cm.maybePace(&tb, of.fileType, of.fileNum, of.fileSize)
162 1 : cm.onTableDeleteFn(of.fileSize)
163 1 : cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.fileNum)
164 1 : }
165 : }
166 1 : cm.mu.Lock()
167 1 : cm.mu.completedJobs++
168 1 : cm.mu.completedJobsCond.Broadcast()
169 1 : cm.maybeLogLocked()
170 1 : cm.mu.Unlock()
171 : }
172 : }
173 :
174 1 : func (cm *cleanupManager) needsPacing(fileType base.FileType, fileNum base.DiskFileNum) bool {
175 1 : if fileType != fileTypeTable {
176 1 : return false
177 1 : }
178 1 : meta, err := cm.objProvider.Lookup(fileType, fileNum)
179 1 : if err != nil {
180 0 : // The object was already removed from the provider; we won't actually
181 0 : // delete anything, so we don't need to pace.
182 0 : return false
183 0 : }
184 : // Don't throttle deletion of remote objects.
185 1 : return !meta.IsRemote()
186 : }
187 :
188 : // maybePace sleeps before deleting an object if appropriate. It is always
189 : // called from the background goroutine.
190 : func (cm *cleanupManager) maybePace(
191 : tb *tokenbucket.TokenBucket, fileType base.FileType, fileNum base.DiskFileNum, fileSize uint64,
192 1 : ) {
193 1 : if !cm.needsPacing(fileType, fileNum) {
194 1 : return
195 1 : }
196 :
197 1 : tokens := cm.deletePacer.PacingDelay(time.Now(), fileSize)
198 1 : if tokens == 0.0 {
199 1 : // The token bucket might be in debt; it could make us wait even for 0
200 1 : // tokens. We don't want that if the pacer decided throttling should be
201 1 : // disabled.
202 1 : return
203 1 : }
204 : // Wait for tokens. We use a token bucket instead of sleeping outright because
205 : // the token bucket accumulates up to one second of unused tokens.
206 1 : for {
207 1 : ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
208 1 : if ok {
209 1 : break
210 : }
211 0 : time.Sleep(d)
212 : }
213 : }
214 :
215 : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
216 : func (cm *cleanupManager) deleteObsoleteFile(
217 : fileType fileType, jobID int, path string, fileNum base.DiskFileNum, fileSize uint64,
218 1 : ) {
219 1 : // TODO(peter): need to handle this error, probably by re-adding the
220 1 : // file that couldn't be deleted to one of the obsolete slices map.
221 1 : err := cm.opts.Cleaner.Clean(cm.opts.FS, fileType, path)
222 1 : if oserror.IsNotExist(err) {
223 0 : return
224 0 : }
225 :
226 1 : switch fileType {
227 1 : case fileTypeLog:
228 1 : cm.opts.EventListener.WALDeleted(WALDeleteInfo{
229 1 : JobID: jobID,
230 1 : Path: path,
231 1 : FileNum: fileNum.FileNum(),
232 1 : Err: err,
233 1 : })
234 1 : case fileTypeManifest:
235 1 : cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
236 1 : JobID: jobID,
237 1 : Path: path,
238 1 : FileNum: fileNum.FileNum(),
239 1 : Err: err,
240 1 : })
241 0 : case fileTypeTable:
242 0 : panic("invalid deletion of object file")
243 : }
244 : }
245 :
246 : func (cm *cleanupManager) deleteObsoleteObject(
247 : fileType fileType, jobID int, fileNum base.DiskFileNum,
248 1 : ) {
249 1 : if fileType != fileTypeTable {
250 0 : panic("not an object")
251 : }
252 :
253 1 : var path string
254 1 : meta, err := cm.objProvider.Lookup(fileType, fileNum)
255 1 : if err != nil {
256 0 : path = "<nil>"
257 1 : } else {
258 1 : path = cm.objProvider.Path(meta)
259 1 : err = cm.objProvider.Remove(fileType, fileNum)
260 1 : }
261 1 : if cm.objProvider.IsNotExistError(err) {
262 0 : return
263 0 : }
264 :
265 1 : switch fileType {
266 1 : case fileTypeTable:
267 1 : cm.opts.EventListener.TableDeleted(TableDeleteInfo{
268 1 : JobID: jobID,
269 1 : Path: path,
270 1 : FileNum: fileNum.FileNum(),
271 1 : Err: err,
272 1 : })
273 : }
274 : }
275 :
276 : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
277 : // when the job queue gets back to less than 10% full.
278 : //
279 : // Must be called with cm.mu locked.
280 1 : func (cm *cleanupManager) maybeLogLocked() {
281 1 : const highThreshold = jobsQueueDepth * 3 / 4
282 1 : const lowThreshold = jobsQueueDepth / 10
283 1 :
284 1 : jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
285 1 :
286 1 : if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
287 0 : cm.mu.jobsQueueWarningIssued = true
288 0 : cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
289 0 : }
290 :
291 1 : if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
292 0 : cm.mu.jobsQueueWarningIssued = false
293 0 : cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
294 0 : }
295 : }
|