Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "runtime/pprof"
10 : "sync"
11 : "time"
12 :
13 : "github.com/cockroachdb/errors/oserror"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/objstorage"
16 : "github.com/cockroachdb/pebble/vfs"
17 : "github.com/cockroachdb/pebble/wal"
18 : "github.com/cockroachdb/tokenbucket"
19 : )
20 :
21 : // Cleaner exports the base.Cleaner type.
22 : type Cleaner = base.Cleaner
23 :
24 : // DeleteCleaner exports the base.DeleteCleaner type.
25 : type DeleteCleaner = base.DeleteCleaner
26 :
27 : // ArchiveCleaner exports the base.ArchiveCleaner type.
28 : type ArchiveCleaner = base.ArchiveCleaner
29 :
30 : type cleanupManager struct {
31 : opts *Options
32 : objProvider objstorage.Provider
33 : onTableDeleteFn func(fileSize uint64, isLocal bool)
34 : deletePacer *deletionPacer
35 :
36 : // jobsCh is used as the cleanup job queue.
37 : jobsCh chan *cleanupJob
38 : // waitGroup is used to wait for the background goroutine to exit.
39 : waitGroup sync.WaitGroup
40 :
41 : mu struct {
42 : sync.Mutex
43 : // totalJobs is the total number of enqueued jobs (completed or in progress).
44 : totalJobs int
45 : completedJobs int
46 : completedJobsCond sync.Cond
47 : jobsQueueWarningIssued bool
48 : }
49 : }
50 :
51 : // We can queue this many jobs before we have to block EnqueueJob.
52 : const jobsQueueDepth = 1000
53 :
54 : // deletableFile is used for non log files.
55 : type deletableFile struct {
56 : dir string
57 : fileNum base.DiskFileNum
58 : fileSize uint64
59 : isLocal bool
60 : }
61 :
62 : // obsoleteFile holds information about a file that needs to be deleted soon.
63 : type obsoleteFile struct {
64 : fileType fileType
65 : // nonLogFile is populated when fileType != fileTypeLog.
66 : nonLogFile deletableFile
67 : // logFile is populated when fileType == fileTypeLog.
68 : logFile wal.DeletableLog
69 : }
70 :
71 : type cleanupJob struct {
72 : jobID JobID
73 : obsoleteFiles []obsoleteFile
74 : }
75 :
76 : // openCleanupManager creates a cleanupManager and starts its background goroutine.
77 : // The cleanupManager must be Close()d.
78 : func openCleanupManager(
79 : opts *Options,
80 : objProvider objstorage.Provider,
81 : onTableDeleteFn func(fileSize uint64, isLocal bool),
82 : getDeletePacerInfo func() deletionPacerInfo,
83 2 : ) *cleanupManager {
84 2 : cm := &cleanupManager{
85 2 : opts: opts,
86 2 : objProvider: objProvider,
87 2 : onTableDeleteFn: onTableDeleteFn,
88 2 : deletePacer: newDeletionPacer(time.Now(), int64(opts.TargetByteDeletionRate), getDeletePacerInfo),
89 2 : jobsCh: make(chan *cleanupJob, jobsQueueDepth),
90 2 : }
91 2 : cm.mu.completedJobsCond.L = &cm.mu.Mutex
92 2 : cm.waitGroup.Add(1)
93 2 :
94 2 : go func() {
95 2 : pprof.Do(context.Background(), gcLabels, func(context.Context) {
96 2 : cm.mainLoop()
97 2 : })
98 : }()
99 :
100 2 : return cm
101 : }
102 :
103 : // Close stops the background goroutine, waiting until all queued jobs are completed.
104 : // Delete pacing is disabled for the remaining jobs.
105 2 : func (cm *cleanupManager) Close() {
106 2 : close(cm.jobsCh)
107 2 : cm.waitGroup.Wait()
108 2 : }
109 :
110 : // EnqueueJob adds a cleanup job to the manager's queue.
111 2 : func (cm *cleanupManager) EnqueueJob(jobID JobID, obsoleteFiles []obsoleteFile) {
112 2 : job := &cleanupJob{
113 2 : jobID: jobID,
114 2 : obsoleteFiles: obsoleteFiles,
115 2 : }
116 2 :
117 2 : // Report deleted bytes to the pacer, which can use this data to potentially
118 2 : // increase the deletion rate to keep up. We want to do this at enqueue time
119 2 : // rather than when we get to the job, otherwise the reported bytes will be
120 2 : // subject to the throttling rate which defeats the purpose.
121 2 : var pacingBytes uint64
122 2 : for _, of := range obsoleteFiles {
123 2 : if cm.needsPacing(of.fileType, of.nonLogFile.fileNum) {
124 2 : pacingBytes += of.nonLogFile.fileSize
125 2 : }
126 : }
127 2 : if pacingBytes > 0 {
128 2 : cm.deletePacer.ReportDeletion(time.Now(), pacingBytes)
129 2 : }
130 :
131 2 : cm.mu.Lock()
132 2 : cm.mu.totalJobs++
133 2 : cm.maybeLogLocked()
134 2 : cm.mu.Unlock()
135 2 :
136 2 : cm.jobsCh <- job
137 : }
138 :
139 : // Wait until the completion of all jobs that were already queued.
140 : //
141 : // Does not wait for jobs that are enqueued during the call.
142 : //
143 : // Note that DB.mu should not be held while calling this method; the background
144 : // goroutine needs to acquire DB.mu to update deleted table metrics.
145 2 : func (cm *cleanupManager) Wait() {
146 2 : cm.mu.Lock()
147 2 : defer cm.mu.Unlock()
148 2 : n := cm.mu.totalJobs
149 2 : for cm.mu.completedJobs < n {
150 2 : cm.mu.completedJobsCond.Wait()
151 2 : }
152 : }
153 :
154 : // mainLoop runs the manager's background goroutine.
155 2 : func (cm *cleanupManager) mainLoop() {
156 2 : defer cm.waitGroup.Done()
157 2 :
158 2 : var tb tokenbucket.TokenBucket
159 2 : // Use a token bucket with 1 token / second refill rate and 1 token burst.
160 2 : tb.Init(1.0, 1.0)
161 2 : for job := range cm.jobsCh {
162 2 : for _, of := range job.obsoleteFiles {
163 2 : switch of.fileType {
164 2 : case fileTypeTable:
165 2 : cm.maybePace(&tb, of.fileType, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
166 2 : cm.onTableDeleteFn(of.nonLogFile.fileSize, of.nonLogFile.isLocal)
167 2 : cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.nonLogFile.fileNum)
168 2 : case fileTypeLog:
169 2 : cm.deleteObsoleteFile(of.logFile.FS, fileTypeLog, job.jobID, of.logFile.Path,
170 2 : base.DiskFileNum(of.logFile.NumWAL), of.logFile.ApproxFileSize)
171 2 : default:
172 2 : path := base.MakeFilepath(cm.opts.FS, of.nonLogFile.dir, of.fileType, of.nonLogFile.fileNum)
173 2 : cm.deleteObsoleteFile(
174 2 : cm.opts.FS, of.fileType, job.jobID, path, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
175 : }
176 : }
177 2 : cm.mu.Lock()
178 2 : cm.mu.completedJobs++
179 2 : cm.mu.completedJobsCond.Broadcast()
180 2 : cm.maybeLogLocked()
181 2 : cm.mu.Unlock()
182 : }
183 : }
184 :
185 : // fileNumIfSST is read iff fileType is fileTypeTable.
186 2 : func (cm *cleanupManager) needsPacing(fileType base.FileType, fileNumIfSST base.DiskFileNum) bool {
187 2 : if fileType != fileTypeTable {
188 2 : return false
189 2 : }
190 2 : meta, err := cm.objProvider.Lookup(fileType, fileNumIfSST)
191 2 : if err != nil {
192 1 : // The object was already removed from the provider; we won't actually
193 1 : // delete anything, so we don't need to pace.
194 1 : return false
195 1 : }
196 : // Don't throttle deletion of remote objects.
197 2 : return !meta.IsRemote()
198 : }
199 :
200 : // maybePace sleeps before deleting an object if appropriate. It is always
201 : // called from the background goroutine.
202 : func (cm *cleanupManager) maybePace(
203 : tb *tokenbucket.TokenBucket, fileType base.FileType, fileNum base.DiskFileNum, fileSize uint64,
204 2 : ) {
205 2 : if !cm.needsPacing(fileType, fileNum) {
206 2 : return
207 2 : }
208 :
209 2 : tokens := cm.deletePacer.PacingDelay(time.Now(), fileSize)
210 2 : if tokens == 0.0 {
211 2 : // The token bucket might be in debt; it could make us wait even for 0
212 2 : // tokens. We don't want that if the pacer decided throttling should be
213 2 : // disabled.
214 2 : return
215 2 : }
216 : // Wait for tokens. We use a token bucket instead of sleeping outright because
217 : // the token bucket accumulates up to one second of unused tokens.
218 2 : for {
219 2 : ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
220 2 : if ok {
221 2 : break
222 : }
223 0 : time.Sleep(d)
224 : }
225 : }
226 :
227 : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
228 : func (cm *cleanupManager) deleteObsoleteFile(
229 : fs vfs.FS, fileType fileType, jobID JobID, path string, fileNum base.DiskFileNum, fileSize uint64,
230 2 : ) {
231 2 : // TODO(peter): need to handle this error, probably by re-adding the
232 2 : // file that couldn't be deleted to one of the obsolete slices map.
233 2 : err := cm.opts.Cleaner.Clean(fs, fileType, path)
234 2 : if oserror.IsNotExist(err) {
235 0 : return
236 0 : }
237 :
238 2 : switch fileType {
239 2 : case fileTypeLog:
240 2 : cm.opts.EventListener.WALDeleted(WALDeleteInfo{
241 2 : JobID: int(jobID),
242 2 : Path: path,
243 2 : FileNum: fileNum,
244 2 : Err: err,
245 2 : })
246 2 : case fileTypeManifest:
247 2 : cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
248 2 : JobID: int(jobID),
249 2 : Path: path,
250 2 : FileNum: fileNum,
251 2 : Err: err,
252 2 : })
253 0 : case fileTypeTable:
254 0 : panic("invalid deletion of object file")
255 : }
256 : }
257 :
258 : func (cm *cleanupManager) deleteObsoleteObject(
259 : fileType fileType, jobID JobID, fileNum base.DiskFileNum,
260 2 : ) {
261 2 : if fileType != fileTypeTable {
262 0 : panic("not an object")
263 : }
264 :
265 2 : var path string
266 2 : meta, err := cm.objProvider.Lookup(fileType, fileNum)
267 2 : if err != nil {
268 1 : path = "<nil>"
269 2 : } else {
270 2 : path = cm.objProvider.Path(meta)
271 2 : err = cm.objProvider.Remove(fileType, fileNum)
272 2 : }
273 2 : if cm.objProvider.IsNotExistError(err) {
274 1 : return
275 1 : }
276 :
277 2 : switch fileType {
278 2 : case fileTypeTable:
279 2 : cm.opts.EventListener.TableDeleted(TableDeleteInfo{
280 2 : JobID: int(jobID),
281 2 : Path: path,
282 2 : FileNum: fileNum,
283 2 : Err: err,
284 2 : })
285 : }
286 : }
287 :
288 : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
289 : // when the job queue gets back to less than 10% full.
290 : //
291 : // Must be called with cm.mu locked.
292 2 : func (cm *cleanupManager) maybeLogLocked() {
293 2 : const highThreshold = jobsQueueDepth * 3 / 4
294 2 : const lowThreshold = jobsQueueDepth / 10
295 2 :
296 2 : jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
297 2 :
298 2 : if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
299 0 : cm.mu.jobsQueueWarningIssued = true
300 0 : cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
301 0 : }
302 :
303 2 : if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
304 0 : cm.mu.jobsQueueWarningIssued = false
305 0 : cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
306 0 : }
307 : }
|