Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "runtime/pprof"
10 : "sync"
11 : "time"
12 :
13 : "github.com/cockroachdb/errors/oserror"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/objstorage"
16 : "github.com/cockroachdb/pebble/vfs"
17 : "github.com/cockroachdb/pebble/wal"
18 : "github.com/cockroachdb/tokenbucket"
19 : )
20 :
21 : // Cleaner exports the base.Cleaner type.
22 : type Cleaner = base.Cleaner
23 :
24 : // DeleteCleaner exports the base.DeleteCleaner type.
25 : type DeleteCleaner = base.DeleteCleaner
26 :
27 : // ArchiveCleaner exports the base.ArchiveCleaner type.
28 : type ArchiveCleaner = base.ArchiveCleaner
29 :
30 : type cleanupManager struct {
31 : opts *Options
32 : objProvider objstorage.Provider
33 : onTableDeleteFn func(fileSize uint64)
34 : deletePacer *deletionPacer
35 :
36 : // jobsCh is used as the cleanup job queue.
37 : jobsCh chan *cleanupJob
38 : // waitGroup is used to wait for the background goroutine to exit.
39 : waitGroup sync.WaitGroup
40 :
41 : mu struct {
42 : sync.Mutex
43 : // totalJobs is the total number of enqueued jobs (completed or in progress).
44 : totalJobs int
45 : completedJobs int
46 : completedJobsCond sync.Cond
47 : jobsQueueWarningIssued bool
48 : }
49 : }
50 :
51 : // We can queue this many jobs before we have to block EnqueueJob.
52 : const jobsQueueDepth = 1000
53 :
54 : // deletableFile is used for non log files.
55 : type deletableFile struct {
56 : dir string
57 : fileNum base.DiskFileNum
58 : fileSize uint64
59 : }
60 :
61 : // obsoleteFile holds information about a file that needs to be deleted soon.
62 : type obsoleteFile struct {
63 : fileType fileType
64 : // nonLogFile is populated when fileType != fileTypeLog.
65 : nonLogFile deletableFile
66 : // logFile is populated when fileType == fileTypeLog.
67 : logFile wal.DeletableLog
68 : }
69 :
70 : type cleanupJob struct {
71 : jobID int
72 : obsoleteFiles []obsoleteFile
73 : }
74 :
75 : // openCleanupManager creates a cleanupManager and starts its background goroutine.
76 : // The cleanupManager must be Close()d.
77 : func openCleanupManager(
78 : opts *Options,
79 : objProvider objstorage.Provider,
80 : onTableDeleteFn func(fileSize uint64),
81 : getDeletePacerInfo func() deletionPacerInfo,
82 1 : ) *cleanupManager {
83 1 : cm := &cleanupManager{
84 1 : opts: opts,
85 1 : objProvider: objProvider,
86 1 : onTableDeleteFn: onTableDeleteFn,
87 1 : deletePacer: newDeletionPacer(time.Now(), int64(opts.TargetByteDeletionRate), getDeletePacerInfo),
88 1 : jobsCh: make(chan *cleanupJob, jobsQueueDepth),
89 1 : }
90 1 : cm.mu.completedJobsCond.L = &cm.mu.Mutex
91 1 : cm.waitGroup.Add(1)
92 1 :
93 1 : go func() {
94 1 : pprof.Do(context.Background(), gcLabels, func(context.Context) {
95 1 : cm.mainLoop()
96 1 : })
97 : }()
98 :
99 1 : return cm
100 : }
101 :
102 : // Close stops the background goroutine, waiting until all queued jobs are completed.
103 : // Delete pacing is disabled for the remaining jobs.
104 1 : func (cm *cleanupManager) Close() {
105 1 : close(cm.jobsCh)
106 1 : cm.waitGroup.Wait()
107 1 : }
108 :
109 : // EnqueueJob adds a cleanup job to the manager's queue.
110 1 : func (cm *cleanupManager) EnqueueJob(jobID int, obsoleteFiles []obsoleteFile) {
111 1 : job := &cleanupJob{
112 1 : jobID: jobID,
113 1 : obsoleteFiles: obsoleteFiles,
114 1 : }
115 1 :
116 1 : // Report deleted bytes to the pacer, which can use this data to potentially
117 1 : // increase the deletion rate to keep up. We want to do this at enqueue time
118 1 : // rather than when we get to the job, otherwise the reported bytes will be
119 1 : // subject to the throttling rate which defeats the purpose.
120 1 : var pacingBytes uint64
121 1 : for _, of := range obsoleteFiles {
122 1 : if cm.needsPacing(of.fileType, of.nonLogFile.fileNum) {
123 1 : pacingBytes += of.nonLogFile.fileSize
124 1 : }
125 : }
126 1 : if pacingBytes > 0 {
127 1 : cm.deletePacer.ReportDeletion(time.Now(), pacingBytes)
128 1 : }
129 :
130 1 : cm.mu.Lock()
131 1 : cm.mu.totalJobs++
132 1 : cm.maybeLogLocked()
133 1 : cm.mu.Unlock()
134 1 :
135 1 : cm.jobsCh <- job
136 : }
137 :
138 : // Wait until the completion of all jobs that were already queued.
139 : //
140 : // Does not wait for jobs that are enqueued during the call.
141 : //
142 : // Note that DB.mu should not be held while calling this method; the background
143 : // goroutine needs to acquire DB.mu to update deleted table metrics.
144 1 : func (cm *cleanupManager) Wait() {
145 1 : cm.mu.Lock()
146 1 : defer cm.mu.Unlock()
147 1 : n := cm.mu.totalJobs
148 1 : for cm.mu.completedJobs < n {
149 1 : cm.mu.completedJobsCond.Wait()
150 1 : }
151 : }
152 :
153 : // mainLoop runs the manager's background goroutine.
154 1 : func (cm *cleanupManager) mainLoop() {
155 1 : defer cm.waitGroup.Done()
156 1 :
157 1 : var tb tokenbucket.TokenBucket
158 1 : // Use a token bucket with 1 token / second refill rate and 1 token burst.
159 1 : tb.Init(1.0, 1.0)
160 1 : for job := range cm.jobsCh {
161 1 : for _, of := range job.obsoleteFiles {
162 1 : switch of.fileType {
163 1 : case fileTypeTable:
164 1 : cm.maybePace(&tb, of.fileType, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
165 1 : cm.onTableDeleteFn(of.nonLogFile.fileSize)
166 1 : cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.nonLogFile.fileNum)
167 1 : case fileTypeLog:
168 1 : cm.deleteObsoleteFile(of.logFile.FS, fileTypeLog, job.jobID, of.logFile.Path,
169 1 : base.DiskFileNum(of.logFile.NumWAL), of.logFile.ApproxFileSize)
170 1 : default:
171 1 : path := base.MakeFilepath(cm.opts.FS, of.nonLogFile.dir, of.fileType, of.nonLogFile.fileNum)
172 1 : cm.deleteObsoleteFile(
173 1 : cm.opts.FS, of.fileType, job.jobID, path, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
174 : }
175 : }
176 1 : cm.mu.Lock()
177 1 : cm.mu.completedJobs++
178 1 : cm.mu.completedJobsCond.Broadcast()
179 1 : cm.maybeLogLocked()
180 1 : cm.mu.Unlock()
181 : }
182 : }
183 :
184 : // fileNumIfSST is read iff fileType is fileTypeTable.
185 1 : func (cm *cleanupManager) needsPacing(fileType base.FileType, fileNumIfSST base.DiskFileNum) bool {
186 1 : if fileType != fileTypeTable {
187 1 : return false
188 1 : }
189 1 : meta, err := cm.objProvider.Lookup(fileType, fileNumIfSST)
190 1 : if err != nil {
191 0 : // The object was already removed from the provider; we won't actually
192 0 : // delete anything, so we don't need to pace.
193 0 : return false
194 0 : }
195 : // Don't throttle deletion of remote objects.
196 1 : return !meta.IsRemote()
197 : }
198 :
199 : // maybePace sleeps before deleting an object if appropriate. It is always
200 : // called from the background goroutine.
201 : func (cm *cleanupManager) maybePace(
202 : tb *tokenbucket.TokenBucket, fileType base.FileType, fileNum base.DiskFileNum, fileSize uint64,
203 1 : ) {
204 1 : if !cm.needsPacing(fileType, fileNum) {
205 1 : return
206 1 : }
207 :
208 1 : tokens := cm.deletePacer.PacingDelay(time.Now(), fileSize)
209 1 : if tokens == 0.0 {
210 1 : // The token bucket might be in debt; it could make us wait even for 0
211 1 : // tokens. We don't want that if the pacer decided throttling should be
212 1 : // disabled.
213 1 : return
214 1 : }
215 : // Wait for tokens. We use a token bucket instead of sleeping outright because
216 : // the token bucket accumulates up to one second of unused tokens.
217 1 : for {
218 1 : ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
219 1 : if ok {
220 1 : break
221 : }
222 0 : time.Sleep(d)
223 : }
224 : }
225 :
226 : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
227 : func (cm *cleanupManager) deleteObsoleteFile(
228 : fs vfs.FS, fileType fileType, jobID int, path string, fileNum base.DiskFileNum, fileSize uint64,
229 1 : ) {
230 1 : // TODO(peter): need to handle this error, probably by re-adding the
231 1 : // file that couldn't be deleted to one of the obsolete slices map.
232 1 : err := cm.opts.Cleaner.Clean(fs, fileType, path)
233 1 : if oserror.IsNotExist(err) {
234 0 : return
235 0 : }
236 :
237 1 : switch fileType {
238 1 : case fileTypeLog:
239 1 : cm.opts.EventListener.WALDeleted(WALDeleteInfo{
240 1 : JobID: jobID,
241 1 : Path: path,
242 1 : FileNum: fileNum,
243 1 : Err: err,
244 1 : })
245 1 : case fileTypeManifest:
246 1 : cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
247 1 : JobID: jobID,
248 1 : Path: path,
249 1 : FileNum: fileNum,
250 1 : Err: err,
251 1 : })
252 0 : case fileTypeTable:
253 0 : panic("invalid deletion of object file")
254 : }
255 : }
256 :
257 : func (cm *cleanupManager) deleteObsoleteObject(
258 : fileType fileType, jobID int, fileNum base.DiskFileNum,
259 1 : ) {
260 1 : if fileType != fileTypeTable {
261 0 : panic("not an object")
262 : }
263 :
264 1 : var path string
265 1 : meta, err := cm.objProvider.Lookup(fileType, fileNum)
266 1 : if err != nil {
267 0 : path = "<nil>"
268 1 : } else {
269 1 : path = cm.objProvider.Path(meta)
270 1 : err = cm.objProvider.Remove(fileType, fileNum)
271 1 : }
272 1 : if cm.objProvider.IsNotExistError(err) {
273 0 : return
274 0 : }
275 :
276 1 : switch fileType {
277 1 : case fileTypeTable:
278 1 : cm.opts.EventListener.TableDeleted(TableDeleteInfo{
279 1 : JobID: jobID,
280 1 : Path: path,
281 1 : FileNum: fileNum,
282 1 : Err: err,
283 1 : })
284 : }
285 : }
286 :
287 : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
288 : // when the job queue gets back to less than 10% full.
289 : //
290 : // Must be called with cm.mu locked.
291 1 : func (cm *cleanupManager) maybeLogLocked() {
292 1 : const highThreshold = jobsQueueDepth * 3 / 4
293 1 : const lowThreshold = jobsQueueDepth / 10
294 1 :
295 1 : jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
296 1 :
297 1 : if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
298 0 : cm.mu.jobsQueueWarningIssued = true
299 0 : cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
300 0 : }
301 :
302 1 : if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
303 0 : cm.mu.jobsQueueWarningIssued = false
304 0 : cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
305 0 : }
306 : }
|