Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "runtime/pprof"
10 : "sync"
11 : "time"
12 :
13 : "github.com/cockroachdb/errors/oserror"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/objstorage"
17 : "github.com/cockroachdb/tokenbucket"
18 : )
19 :
20 : // Cleaner exports the base.Cleaner type.
21 : type Cleaner = base.Cleaner
22 :
23 : // DeleteCleaner exports the base.DeleteCleaner type.
24 : type DeleteCleaner = base.DeleteCleaner
25 :
26 : // ArchiveCleaner exports the base.ArchiveCleaner type.
27 : type ArchiveCleaner = base.ArchiveCleaner
28 :
29 : type cleanupManager struct {
30 : opts *Options
31 : objProvider objstorage.Provider
32 : onTableDeleteFn func(fileSize uint64)
33 : deletePacer *deletionPacer
34 :
35 : // jobsCh is used as the cleanup job queue.
36 : jobsCh chan *cleanupJob
37 : // waitGroup is used to wait for the background goroutine to exit.
38 : waitGroup sync.WaitGroup
39 :
40 : mu struct {
41 : sync.Mutex
42 : // totalJobs is the total number of enqueued jobs (completed or in progress).
43 : totalJobs int
44 : completedJobs int
45 : completedJobsCond sync.Cond
46 : jobsQueueWarningIssued bool
47 : }
48 : }
49 :
50 : // We can queue this many jobs before we have to block EnqueueJob.
51 : const jobsQueueDepth = 1000
52 :
53 : // obsoleteFile holds information about a file that needs to be deleted soon.
54 : type obsoleteFile struct {
55 : dir string
56 : fileNum base.DiskFileNum
57 : fileType fileType
58 : fileSize uint64
59 : }
60 :
61 : type cleanupJob struct {
62 : jobID int
63 : obsoleteFiles []obsoleteFile
64 : }
65 :
66 : // openCleanupManager creates a cleanupManager and starts its background goroutine.
67 : // The cleanupManager must be Close()d.
68 : func openCleanupManager(
69 : opts *Options,
70 : objProvider objstorage.Provider,
71 : onTableDeleteFn func(fileSize uint64),
72 : getDeletePacerInfo func() deletionPacerInfo,
73 2 : ) *cleanupManager {
74 2 : cm := &cleanupManager{
75 2 : opts: opts,
76 2 : objProvider: objProvider,
77 2 : onTableDeleteFn: onTableDeleteFn,
78 2 : deletePacer: newDeletionPacer(time.Now(), int64(opts.TargetByteDeletionRate), getDeletePacerInfo),
79 2 : jobsCh: make(chan *cleanupJob, jobsQueueDepth),
80 2 : }
81 2 : cm.mu.completedJobsCond.L = &cm.mu.Mutex
82 2 : cm.waitGroup.Add(1)
83 2 :
84 2 : go func() {
85 2 : pprof.Do(context.Background(), gcLabels, func(context.Context) {
86 2 : cm.mainLoop()
87 2 : })
88 : }()
89 :
90 2 : return cm
91 : }
92 :
93 : // Close stops the background goroutine, waiting until all queued jobs are completed.
94 : // Delete pacing is disabled for the remaining jobs.
95 2 : func (cm *cleanupManager) Close() {
96 2 : close(cm.jobsCh)
97 2 : cm.waitGroup.Wait()
98 2 : }
99 :
100 : // EnqueueJob adds a cleanup job to the manager's queue.
101 2 : func (cm *cleanupManager) EnqueueJob(jobID int, obsoleteFiles []obsoleteFile) {
102 2 : job := &cleanupJob{
103 2 : jobID: jobID,
104 2 : obsoleteFiles: obsoleteFiles,
105 2 : }
106 2 :
107 2 : // Report deleted bytes to the pacer, which can use this data to potentially
108 2 : // increase the deletion rate to keep up. We want to do this at enqueue time
109 2 : // rather than when we get to the job, otherwise the reported bytes will be
110 2 : // subject to the throttling rate which defeats the purpose.
111 2 : var pacingBytes uint64
112 2 : for _, of := range obsoleteFiles {
113 2 : if cm.needsPacing(of.fileType, of.fileNum) {
114 2 : pacingBytes += of.fileSize
115 2 : }
116 : }
117 2 : if pacingBytes > 0 {
118 2 : cm.deletePacer.ReportDeletion(time.Now(), pacingBytes)
119 2 : }
120 :
121 2 : cm.mu.Lock()
122 2 : cm.mu.totalJobs++
123 2 : cm.maybeLogLocked()
124 2 : cm.mu.Unlock()
125 2 :
126 2 : if invariants.Enabled && len(cm.jobsCh) >= cap(cm.jobsCh)-2 {
127 0 : panic("cleanup jobs queue full")
128 : }
129 :
130 2 : cm.jobsCh <- job
131 : }
132 :
133 : // Wait until the completion of all jobs that were already queued.
134 : //
135 : // Does not wait for jobs that are enqueued during the call.
136 : //
137 : // Note that DB.mu should not be held while calling this method; the background
138 : // goroutine needs to acquire DB.mu to update deleted table metrics.
139 2 : func (cm *cleanupManager) Wait() {
140 2 : cm.mu.Lock()
141 2 : defer cm.mu.Unlock()
142 2 : n := cm.mu.totalJobs
143 2 : for cm.mu.completedJobs < n {
144 2 : cm.mu.completedJobsCond.Wait()
145 2 : }
146 : }
147 :
148 : // mainLoop runs the manager's background goroutine.
149 2 : func (cm *cleanupManager) mainLoop() {
150 2 : defer cm.waitGroup.Done()
151 2 :
152 2 : var tb tokenbucket.TokenBucket
153 2 : // Use a token bucket with 1 token / second refill rate and 1 token burst.
154 2 : tb.Init(1.0, 1.0)
155 2 : for job := range cm.jobsCh {
156 2 : for _, of := range job.obsoleteFiles {
157 2 : if of.fileType != fileTypeTable {
158 2 : path := base.MakeFilepath(cm.opts.FS, of.dir, of.fileType, of.fileNum)
159 2 : cm.deleteObsoleteFile(of.fileType, job.jobID, path, of.fileNum, of.fileSize)
160 2 : } else {
161 2 : cm.maybePace(&tb, of.fileType, of.fileNum, of.fileSize)
162 2 : cm.onTableDeleteFn(of.fileSize)
163 2 : cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.fileNum)
164 2 : }
165 : }
166 2 : cm.mu.Lock()
167 2 : cm.mu.completedJobs++
168 2 : cm.mu.completedJobsCond.Broadcast()
169 2 : cm.maybeLogLocked()
170 2 : cm.mu.Unlock()
171 : }
172 : }
173 :
174 2 : func (cm *cleanupManager) needsPacing(fileType base.FileType, fileNum base.DiskFileNum) bool {
175 2 : if fileType != fileTypeTable {
176 2 : return false
177 2 : }
178 2 : meta, err := cm.objProvider.Lookup(fileType, fileNum)
179 2 : if err != nil {
180 0 : // The object was already removed from the provider; we won't actually
181 0 : // delete anything, so we don't need to pace.
182 0 : return false
183 0 : }
184 : // Don't throttle deletion of remote objects.
185 2 : return !meta.IsRemote()
186 : }
187 :
188 : // maybePace sleeps before deleting an object if appropriate. It is always
189 : // called from the background goroutine.
190 : func (cm *cleanupManager) maybePace(
191 : tb *tokenbucket.TokenBucket, fileType base.FileType, fileNum base.DiskFileNum, fileSize uint64,
192 2 : ) {
193 2 : if !cm.needsPacing(fileType, fileNum) {
194 2 : return
195 2 : }
196 :
197 2 : tokens := cm.deletePacer.PacingDelay(time.Now(), fileSize)
198 2 : if tokens == 0.0 {
199 2 : // The token bucket might be in debt; it could make us wait even for 0
200 2 : // tokens. We don't want that if the pacer decided throttling should be
201 2 : // disabled.
202 2 : return
203 2 : }
204 : // Wait for tokens. We use a token bucket instead of sleeping outright because
205 : // the token bucket accumulates up to one second of unused tokens.
206 2 : for {
207 2 : ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
208 2 : if ok {
209 2 : break
210 : }
211 0 : time.Sleep(d)
212 : }
213 : }
214 :
215 : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
216 : func (cm *cleanupManager) deleteObsoleteFile(
217 : fileType fileType, jobID int, path string, fileNum base.DiskFileNum, fileSize uint64,
218 2 : ) {
219 2 : // TODO(peter): need to handle this error, probably by re-adding the
220 2 : // file that couldn't be deleted to one of the obsolete slices map.
221 2 : err := cm.opts.Cleaner.Clean(cm.opts.FS, fileType, path)
222 2 : if oserror.IsNotExist(err) {
223 0 : return
224 0 : }
225 :
226 2 : switch fileType {
227 2 : case fileTypeLog:
228 2 : cm.opts.EventListener.WALDeleted(WALDeleteInfo{
229 2 : JobID: jobID,
230 2 : Path: path,
231 2 : FileNum: fileNum.FileNum(),
232 2 : Err: err,
233 2 : })
234 2 : case fileTypeManifest:
235 2 : cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
236 2 : JobID: jobID,
237 2 : Path: path,
238 2 : FileNum: fileNum.FileNum(),
239 2 : Err: err,
240 2 : })
241 0 : case fileTypeTable:
242 0 : panic("invalid deletion of object file")
243 : }
244 : }
245 :
246 : func (cm *cleanupManager) deleteObsoleteObject(
247 : fileType fileType, jobID int, fileNum base.DiskFileNum,
248 2 : ) {
249 2 : if fileType != fileTypeTable {
250 0 : panic("not an object")
251 : }
252 :
253 2 : var path string
254 2 : meta, err := cm.objProvider.Lookup(fileType, fileNum)
255 2 : if err != nil {
256 0 : path = "<nil>"
257 2 : } else {
258 2 : path = cm.objProvider.Path(meta)
259 2 : err = cm.objProvider.Remove(fileType, fileNum)
260 2 : }
261 2 : if cm.objProvider.IsNotExistError(err) {
262 0 : return
263 0 : }
264 :
265 2 : switch fileType {
266 2 : case fileTypeTable:
267 2 : cm.opts.EventListener.TableDeleted(TableDeleteInfo{
268 2 : JobID: jobID,
269 2 : Path: path,
270 2 : FileNum: fileNum.FileNum(),
271 2 : Err: err,
272 2 : })
273 : }
274 : }
275 :
276 : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
277 : // when the job queue gets back to less than 10% full.
278 : //
279 : // Must be called with cm.mu locked.
280 2 : func (cm *cleanupManager) maybeLogLocked() {
281 2 : const highThreshold = jobsQueueDepth * 3 / 4
282 2 : const lowThreshold = jobsQueueDepth / 10
283 2 :
284 2 : jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
285 2 :
286 2 : if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
287 0 : cm.mu.jobsQueueWarningIssued = true
288 0 : cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
289 0 : }
290 :
291 2 : if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
292 0 : cm.mu.jobsQueueWarningIssued = false
293 0 : cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
294 0 : }
295 : }
|