Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "cmp"
9 : "context"
10 : "runtime/pprof"
11 : "slices"
12 : "sync"
13 : "time"
14 :
15 : "github.com/cockroachdb/crlib/crtime"
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/errors/oserror"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/invariants"
20 : "github.com/cockroachdb/pebble/objstorage"
21 : "github.com/cockroachdb/pebble/vfs"
22 : "github.com/cockroachdb/pebble/wal"
23 : "github.com/cockroachdb/tokenbucket"
24 : )
25 :
26 : // Cleaner exports the base.Cleaner type.
27 : type Cleaner = base.Cleaner
28 :
29 : // DeleteCleaner exports the base.DeleteCleaner type.
30 : type DeleteCleaner = base.DeleteCleaner
31 :
32 : // ArchiveCleaner exports the base.ArchiveCleaner type.
33 : type ArchiveCleaner = base.ArchiveCleaner
34 :
35 : type cleanupManager struct {
36 : opts *Options
37 : objProvider objstorage.Provider
38 : deletePacer *deletionPacer
39 :
40 : // jobsCh is used as the cleanup job queue.
41 : jobsCh chan *cleanupJob
42 : // waitGroup is used to wait for the background goroutine to exit.
43 : waitGroup sync.WaitGroup
44 : // stopCh is closed when the pacer is disabled after closing the cleanup manager.
45 : stopCh chan struct{}
46 :
47 : mu struct {
48 : sync.Mutex
49 : // totalJobs is the total number of enqueued jobs (completed or in progress).
50 : totalJobs int
51 : completedStats obsoleteObjectStats
52 : completedJobs int
53 : completedJobsCond sync.Cond
54 : jobsQueueWarningIssued bool
55 : }
56 : }
57 :
58 : // CompletedStats returns the stats summarizing objects deleted. The returned
59 : // stats increase monotonically over the lifetime of the DB.
60 0 : func (m *cleanupManager) CompletedStats() obsoleteObjectStats {
61 0 : m.mu.Lock()
62 0 : defer m.mu.Unlock()
63 0 : return m.mu.completedStats
64 0 : }
65 :
66 : // We can queue this many jobs before we have to block EnqueueJob.
67 : const jobsQueueDepth = 1000
68 : const jobsQueueHighThreshold = jobsQueueDepth * 3 / 4
69 : const jobsQueueLowThreshold = jobsQueueDepth / 10
70 :
71 : // obsoleteFile holds information about a file that needs to be deleted soon.
72 : type obsoleteFile struct {
73 : fileType base.FileType
74 : fs vfs.FS
75 : path string
76 : fileNum base.DiskFileNum
77 : fileSize uint64 // approx for log files
78 : isLocal bool
79 : }
80 :
81 1 : func (of *obsoleteFile) needsPacing() bool {
82 1 : // We only need to pace local objects--sstables and blob files.
83 1 : return of.isLocal && (of.fileType == base.FileTypeTable || of.fileType == base.FileTypeBlob)
84 1 : }
85 :
86 : type cleanupJob struct {
87 : jobID JobID
88 : obsoleteFiles []obsoleteFile
89 : stats obsoleteObjectStats
90 : }
91 :
92 : // openCleanupManager creates a cleanupManager and starts its background goroutine.
93 : // The cleanupManager must be Close()d.
94 : func openCleanupManager(
95 : opts *Options, objProvider objstorage.Provider, getDeletePacerInfo func() deletionPacerInfo,
96 1 : ) *cleanupManager {
97 1 : cm := &cleanupManager{
98 1 : opts: opts,
99 1 : objProvider: objProvider,
100 1 : deletePacer: newDeletionPacer(
101 1 : crtime.NowMono(),
102 1 : opts.FreeSpaceThresholdBytes,
103 1 : opts.TargetByteDeletionRate,
104 1 : opts.FreeSpaceTimeframe,
105 1 : opts.ObsoleteBytesMaxRatio,
106 1 : opts.ObsoleteBytesTimeframe,
107 1 : getDeletePacerInfo,
108 1 : ),
109 1 : jobsCh: make(chan *cleanupJob, jobsQueueDepth),
110 1 : stopCh: make(chan struct{}),
111 1 : }
112 1 : cm.mu.completedJobsCond.L = &cm.mu.Mutex
113 1 : cm.waitGroup.Add(1)
114 1 :
115 1 : go func() {
116 1 : pprof.Do(context.Background(), gcLabels, func(context.Context) {
117 1 : cm.mainLoop()
118 1 : })
119 : }()
120 :
121 1 : return cm
122 : }
123 :
124 : // Close stops the background goroutine, waiting until all queued jobs are completed.
125 : // Delete pacing is disabled for the remaining jobs.
126 1 : func (cm *cleanupManager) Close() {
127 1 : close(cm.jobsCh)
128 1 : close(cm.stopCh)
129 1 : cm.waitGroup.Wait()
130 1 : }
131 :
132 : // EnqueueJob adds a cleanup job to the manager's queue.
133 : func (cm *cleanupManager) EnqueueJob(
134 : jobID JobID, obsoleteFiles []obsoleteFile, stats obsoleteObjectStats,
135 1 : ) {
136 1 : job := &cleanupJob{
137 1 : jobID: jobID,
138 1 : obsoleteFiles: obsoleteFiles,
139 1 : stats: stats,
140 1 : }
141 1 :
142 1 : // Report deleted bytes to the pacer, which can use this data to potentially
143 1 : // increase the deletion rate to keep up. We want to do this at enqueue time
144 1 : // rather than when we get to the job, otherwise the reported bytes will be
145 1 : // subject to the throttling rate which defeats the purpose.
146 1 : var pacingBytes uint64
147 1 : for _, of := range obsoleteFiles {
148 1 : if of.needsPacing() {
149 1 : pacingBytes += of.fileSize
150 1 : }
151 : }
152 1 : if pacingBytes > 0 {
153 1 : cm.deletePacer.ReportDeletion(crtime.NowMono(), pacingBytes)
154 1 : }
155 :
156 1 : cm.mu.Lock()
157 1 : cm.mu.totalJobs++
158 1 : cm.maybeLogLocked()
159 1 : cm.mu.Unlock()
160 1 :
161 1 : cm.jobsCh <- job
162 : }
163 :
164 : // Wait until the completion of all jobs that were already queued.
165 : //
166 : // Does not wait for jobs that are enqueued during the call.
167 : //
168 : // Note that DB.mu should not be held while calling this method; the background
169 : // goroutine needs to acquire DB.mu to update deleted table metrics.
170 1 : func (cm *cleanupManager) Wait() {
171 1 : cm.mu.Lock()
172 1 : defer cm.mu.Unlock()
173 1 : n := cm.mu.totalJobs
174 1 : for cm.mu.completedJobs < n {
175 1 : cm.mu.completedJobsCond.Wait()
176 1 : }
177 : }
178 :
179 : // mainLoop runs the manager's background goroutine.
180 1 : func (cm *cleanupManager) mainLoop() {
181 1 : defer cm.waitGroup.Done()
182 1 :
183 1 : paceTimer := time.NewTimer(time.Duration(0))
184 1 : defer paceTimer.Stop()
185 1 :
186 1 : var tb tokenbucket.TokenBucket
187 1 : // Use a token bucket with 1 token / second refill rate and 1 token burst.
188 1 : tb.Init(1.0, 1.0)
189 1 : for job := range cm.jobsCh {
190 1 : cm.deleteObsoleteFilesInJob(job, &tb, paceTimer)
191 1 : cm.mu.Lock()
192 1 : cm.mu.completedJobs++
193 1 : cm.mu.completedStats.Add(job.stats)
194 1 : cm.mu.completedJobsCond.Broadcast()
195 1 : cm.maybeLogLocked()
196 1 : cm.mu.Unlock()
197 1 : }
198 : }
199 :
200 : // deleteObsoleteFilesInJob deletes all obsolete files in the given job. If tb
201 : // and paceTimer are provided, files that need pacing will be throttled
202 : // according to the deletion rate. If tb is nil, files are deleted immediately
203 : // without pacing (used when the cleanup manager is being closed).
204 : func (cm *cleanupManager) deleteObsoleteFilesInJob(
205 : job *cleanupJob, tb *tokenbucket.TokenBucket, paceTimer *time.Timer,
206 1 : ) {
207 1 : for _, of := range job.obsoleteFiles {
208 1 : switch of.fileType {
209 1 : case base.FileTypeTable, base.FileTypeBlob:
210 1 : if tb != nil {
211 1 : cm.maybePace(tb, &of, paceTimer)
212 1 : }
213 1 : cm.deleteObsoleteObject(of.fileType, job.jobID, of.fileNum)
214 1 : default:
215 1 : cm.deleteObsoleteFile(of.fs, of.fileType, job.jobID, of.path, of.fileNum)
216 : }
217 : }
218 : }
219 :
220 : // maybePace sleeps before deleting an object if appropriate. It is always
221 : // called from the background goroutine.
222 : func (cm *cleanupManager) maybePace(
223 : tb *tokenbucket.TokenBucket, of *obsoleteFile, paceTimer *time.Timer,
224 1 : ) {
225 1 : if !of.needsPacing() {
226 1 : return
227 1 : }
228 1 : if len(cm.jobsCh) >= jobsQueueHighThreshold {
229 0 : // If there are many jobs queued up, disable pacing. In this state, we
230 0 : // execute deletion jobs at the same rate as new jobs get queued.
231 0 : return
232 0 : }
233 1 : tokens := cm.deletePacer.PacingDelay(crtime.NowMono(), of.fileSize)
234 1 : if tokens == 0.0 {
235 1 : // The token bucket might be in debt; it could make us wait even for 0
236 1 : // tokens. We don't want that if the pacer decided throttling should be
237 1 : // disabled.
238 1 : return
239 1 : }
240 : // Wait for tokens. We use a token bucket instead of sleeping outright because
241 : // the token bucket accumulates up to one second of unused tokens.
242 1 : for {
243 1 : ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
244 1 : if ok {
245 1 : break
246 : }
247 1 : paceTimer.Reset(d)
248 1 : select {
249 1 : case <-paceTimer.C:
250 1 : case <-cm.stopCh:
251 1 : // The cleanup manager is being closed. Delete without pacing.
252 1 : return
253 : }
254 : }
255 : }
256 :
257 : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
258 : func (cm *cleanupManager) deleteObsoleteFile(
259 : fs vfs.FS, fileType base.FileType, jobID JobID, path string, fileNum base.DiskFileNum,
260 1 : ) {
261 1 : // TODO(peter): need to handle this error, probably by re-adding the
262 1 : // file that couldn't be deleted to one of the obsolete slices map.
263 1 : err := cm.opts.Cleaner.Clean(fs, fileType, path)
264 1 : if oserror.IsNotExist(err) {
265 0 : return
266 0 : }
267 :
268 1 : switch fileType {
269 1 : case base.FileTypeLog:
270 1 : cm.opts.EventListener.WALDeleted(WALDeleteInfo{
271 1 : JobID: int(jobID),
272 1 : Path: path,
273 1 : FileNum: fileNum,
274 1 : Err: err,
275 1 : })
276 1 : case base.FileTypeManifest:
277 1 : cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
278 1 : JobID: int(jobID),
279 1 : Path: path,
280 1 : FileNum: fileNum,
281 1 : Err: err,
282 1 : })
283 0 : case base.FileTypeTable, base.FileTypeBlob:
284 0 : panic("invalid deletion of object file")
285 : }
286 : }
287 :
288 : func (cm *cleanupManager) deleteObsoleteObject(
289 : fileType base.FileType, jobID JobID, fileNum base.DiskFileNum,
290 1 : ) {
291 1 : if fileType != base.FileTypeTable && fileType != base.FileTypeBlob {
292 0 : panic("not an object")
293 : }
294 :
295 1 : var path string
296 1 : meta, err := cm.objProvider.Lookup(fileType, fileNum)
297 1 : if err != nil {
298 0 : path = "<nil>"
299 1 : } else {
300 1 : path = cm.objProvider.Path(meta)
301 1 : err = cm.objProvider.Remove(fileType, fileNum)
302 1 : }
303 1 : if cm.objProvider.IsNotExistError(err) {
304 0 : return
305 0 : }
306 :
307 1 : switch fileType {
308 1 : case base.FileTypeTable:
309 1 : cm.opts.EventListener.TableDeleted(TableDeleteInfo{
310 1 : JobID: int(jobID),
311 1 : Path: path,
312 1 : FileNum: fileNum,
313 1 : Err: err,
314 1 : })
315 1 : case base.FileTypeBlob:
316 1 : cm.opts.EventListener.BlobFileDeleted(BlobFileDeleteInfo{
317 1 : JobID: int(jobID),
318 1 : Path: path,
319 1 : FileNum: fileNum,
320 1 : Err: err,
321 1 : })
322 : }
323 : }
324 :
325 : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
326 : // when the job queue gets back to less than 10% full.
327 : //
328 : // Must be called with cm.mu locked.
329 1 : func (cm *cleanupManager) maybeLogLocked() {
330 1 :
331 1 : jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
332 1 :
333 1 : if !cm.mu.jobsQueueWarningIssued && jobsInQueue > jobsQueueHighThreshold {
334 0 : cm.mu.jobsQueueWarningIssued = true
335 0 : cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", jobsQueueHighThreshold)
336 0 : }
337 :
338 1 : if cm.mu.jobsQueueWarningIssued && jobsInQueue < jobsQueueLowThreshold {
339 0 : cm.mu.jobsQueueWarningIssued = false
340 0 : cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", jobsQueueLowThreshold)
341 0 : }
342 : }
343 :
344 1 : func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
345 1 : var pacerInfo deletionPacerInfo
346 1 : // Call GetDiskUsage after every file deletion. This may seem inefficient,
347 1 : // but in practice this was observed to take constant time, regardless of
348 1 : // volume size used, at least on linux with ext4 and zfs. All invocations
349 1 : // take 10 microseconds or less.
350 1 : pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
351 1 : d.mu.Lock()
352 1 : m := &d.mu.versions.metrics
353 1 : pacerInfo.obsoleteBytes = m.Table.ObsoleteSize + m.BlobFiles.ObsoleteSize
354 1 : total := m.Total()
355 1 : d.mu.Unlock()
356 1 : pacerInfo.liveBytes = uint64(total.AggregateSize())
357 1 : return pacerInfo
358 1 : }
359 :
360 : // scanObsoleteFiles compares the provided directory listing to the set of
361 : // known, in-use files to find files no longer needed and adds those to the
362 : // internal lists of obsolete files. Note that the files are not actually
363 : // deleted by this method. A subsequent call to deleteObsoleteFiles must be
364 : // performed. Must be not be called concurrently with compactions and flushes
365 : // and will panic if any are in-progress. db.mu must be held when calling this
366 : // function.
367 1 : func (d *DB) scanObsoleteFiles(list []string, flushableIngests []*ingestedFlushable) {
368 1 : if d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
369 0 : panic(errors.AssertionFailedf("compaction or flush in progress"))
370 : }
371 :
372 1 : liveFileNums := make(map[base.DiskFileNum]struct{})
373 1 : d.mu.versions.addLiveFileNums(liveFileNums)
374 1 : // Protect against files which are only referred to by the ingestedFlushable
375 1 : // from being deleted. These are added to the flushable queue on WAL replay
376 1 : // and handle their own obsoletion/deletion. We exclude them from this obsolete
377 1 : // file scan to avoid double-deleting these files.
378 1 : for _, f := range flushableIngests {
379 1 : for _, file := range f.files {
380 1 : liveFileNums[file.TableBacking.DiskFileNum] = struct{}{}
381 1 : }
382 : }
383 :
384 1 : manifestFileNum := d.mu.versions.manifestFileNum
385 1 :
386 1 : var obsoleteTables []obsoleteFile
387 1 : var obsoleteBlobs []obsoleteFile
388 1 : var obsoleteOptions []obsoleteFile
389 1 : var obsoleteManifests []obsoleteFile
390 1 :
391 1 : for _, filename := range list {
392 1 : fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
393 1 : if !ok {
394 1 : continue
395 : }
396 1 : makeObsoleteFile := func() obsoleteFile {
397 1 : of := obsoleteFile{
398 1 : fileType: fileType,
399 1 : fs: d.opts.FS,
400 1 : path: d.opts.FS.PathJoin(d.dirname, filename),
401 1 : fileNum: diskFileNum,
402 1 : isLocal: true,
403 1 : }
404 1 : if stat, err := d.opts.FS.Stat(filename); err == nil {
405 0 : of.fileSize = uint64(stat.Size())
406 0 : }
407 1 : return of
408 : }
409 1 : switch fileType {
410 1 : case base.FileTypeManifest:
411 1 : if diskFileNum >= manifestFileNum {
412 1 : continue
413 : }
414 1 : obsoleteManifests = append(obsoleteManifests, makeObsoleteFile())
415 1 : case base.FileTypeOptions:
416 1 : if diskFileNum >= d.optionsFileNum {
417 0 : continue
418 : }
419 1 : obsoleteOptions = append(obsoleteOptions, makeObsoleteFile())
420 1 : case base.FileTypeTable, base.FileTypeBlob:
421 : // Objects are handled through the objstorage provider below.
422 1 : default:
423 : // Don't delete files we don't know about.
424 : }
425 : }
426 :
427 1 : objects := d.objProvider.List()
428 1 : for _, obj := range objects {
429 1 : if _, ok := liveFileNums[obj.DiskFileNum]; ok {
430 1 : continue
431 : }
432 1 : if obj.FileType != base.FileTypeTable && obj.FileType != base.FileTypeBlob {
433 0 : // Ignore object types we don't know about.
434 0 : continue
435 : }
436 1 : of := obsoleteFile{
437 1 : fileType: obj.FileType,
438 1 : fs: d.opts.FS,
439 1 : path: base.MakeFilepath(d.opts.FS, d.dirname, obj.FileType, obj.DiskFileNum),
440 1 : fileNum: obj.DiskFileNum,
441 1 : isLocal: true,
442 1 : }
443 1 : if size, err := d.objProvider.Size(obj); err == nil {
444 1 : of.fileSize = uint64(size)
445 1 : }
446 1 : if obj.FileType == base.FileTypeTable {
447 1 : obsoleteTables = append(obsoleteTables, of)
448 1 : } else {
449 1 : obsoleteBlobs = append(obsoleteBlobs, of)
450 1 : }
451 : }
452 :
453 1 : d.mu.versions.obsoleteTables = mergeObsoleteFiles(d.mu.versions.obsoleteTables, obsoleteTables)
454 1 : d.mu.versions.obsoleteBlobs = mergeObsoleteFiles(d.mu.versions.obsoleteBlobs, obsoleteBlobs)
455 1 : d.mu.versions.obsoleteManifests = mergeObsoleteFiles(d.mu.versions.obsoleteManifests, obsoleteManifests)
456 1 : d.mu.versions.obsoleteOptions = mergeObsoleteFiles(d.mu.versions.obsoleteOptions, obsoleteOptions)
457 1 : d.mu.versions.updateObsoleteObjectMetricsLocked()
458 : }
459 :
460 : // disableFileDeletions disables file deletions and then waits for any
461 : // in-progress deletion to finish. The caller is required to call
462 : // enableFileDeletions in order to enable file deletions again. It is ok for
463 : // multiple callers to disable file deletions simultaneously, though they must
464 : // all invoke enableFileDeletions in order for file deletions to be re-enabled
465 : // (there is an internal reference count on file deletion disablement).
466 : //
467 : // d.mu must be held when calling this method.
468 1 : func (d *DB) disableFileDeletions() {
469 1 : d.mu.fileDeletions.disableCount++
470 1 : d.mu.Unlock()
471 1 : defer d.mu.Lock()
472 1 : d.cleanupManager.Wait()
473 1 : }
474 :
475 : // enableFileDeletions enables previously disabled file deletions. A cleanup job
476 : // is queued if necessary.
477 : //
478 : // d.mu must be held when calling this method.
479 1 : func (d *DB) enableFileDeletions() {
480 1 : if d.mu.fileDeletions.disableCount <= 0 {
481 0 : panic("pebble: file deletion disablement invariant violated")
482 : }
483 1 : d.mu.fileDeletions.disableCount--
484 1 : if d.mu.fileDeletions.disableCount > 0 {
485 0 : return
486 0 : }
487 1 : d.deleteObsoleteFiles(d.newJobIDLocked())
488 : }
489 :
490 : type fileInfo = base.FileInfo
491 :
492 : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
493 : //
494 : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
495 : //
496 : // Does nothing if file deletions are disabled (see disableFileDeletions). A
497 : // cleanup job will be scheduled when file deletions are re-enabled.
498 1 : func (d *DB) deleteObsoleteFiles(jobID JobID) {
499 1 : if d.mu.fileDeletions.disableCount > 0 {
500 1 : return
501 1 : }
502 1 : _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
503 1 :
504 1 : // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
505 1 : // log that has not had its contents flushed to an sstable.
506 1 : obsoleteLogs, err := d.mu.log.manager.Obsolete(wal.NumWAL(d.mu.versions.minUnflushedLogNum), noRecycle)
507 1 : if err != nil {
508 0 : panic(err)
509 : }
510 :
511 1 : obsoleteTables := slices.Clone(d.mu.versions.obsoleteTables)
512 1 : d.mu.versions.obsoleteTables = d.mu.versions.obsoleteTables[:0]
513 1 : obsoleteBlobs := slices.Clone(d.mu.versions.obsoleteBlobs)
514 1 : d.mu.versions.obsoleteBlobs = d.mu.versions.obsoleteBlobs[:0]
515 1 :
516 1 : // Ensure everything is already sorted. We want determinism for testing, and
517 1 : // we need the manifests to be sorted because we want to delete some
518 1 : // contiguous prefix of the older manifests.
519 1 : if invariants.Enabled {
520 1 : switch {
521 0 : case !slices.IsSortedFunc(d.mu.versions.obsoleteManifests, cmpObsoleteFileNumbers):
522 0 : d.opts.Logger.Fatalf("obsoleteManifests is not sorted")
523 0 : case !slices.IsSortedFunc(d.mu.versions.obsoleteOptions, cmpObsoleteFileNumbers):
524 0 : d.opts.Logger.Fatalf("obsoleteOptions is not sorted")
525 0 : case !slices.IsSortedFunc(obsoleteTables, cmpObsoleteFileNumbers):
526 0 : d.opts.Logger.Fatalf("obsoleteTables is not sorted")
527 0 : case !slices.IsSortedFunc(obsoleteBlobs, cmpObsoleteFileNumbers):
528 0 : d.opts.Logger.Fatalf("obsoleteBlobs is not sorted")
529 : }
530 : }
531 :
532 1 : var obsoleteManifests []obsoleteFile
533 1 : manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
534 1 : if manifestsToDelete > 0 {
535 1 : obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
536 1 : d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
537 1 : if len(d.mu.versions.obsoleteManifests) == 0 {
538 0 : d.mu.versions.obsoleteManifests = nil
539 0 : }
540 : }
541 :
542 1 : obsoleteOptions := d.mu.versions.obsoleteOptions
543 1 : d.mu.versions.obsoleteOptions = nil
544 1 :
545 1 : // Compute the stats for the files being queued for deletion and add them to
546 1 : // the running total. These stats will be used during DB.Metrics() to
547 1 : // calculate the count and size of pending obsolete files by diffing these
548 1 : // stats and the stats reported by the cleanup manager.
549 1 : var objectStats obsoleteObjectStats
550 1 : objectStats.tablesAll, objectStats.tablesLocal = calculateObsoleteObjectStats(obsoleteTables)
551 1 : objectStats.blobFilesAll, objectStats.blobFilesLocal = calculateObsoleteObjectStats(obsoleteBlobs)
552 1 : d.mu.fileDeletions.queuedStats.Add(objectStats)
553 1 : d.mu.versions.updateObsoleteObjectMetricsLocked()
554 1 :
555 1 : // Release d.mu while preparing the cleanup job and possibly waiting.
556 1 : // Note the unusual order: Unlock and then Lock.
557 1 : d.mu.Unlock()
558 1 : defer d.mu.Lock()
559 1 :
560 1 : n := len(obsoleteLogs) + len(obsoleteTables) + len(obsoleteBlobs) + len(obsoleteManifests) + len(obsoleteOptions)
561 1 : filesToDelete := make([]obsoleteFile, 0, n)
562 1 : filesToDelete = append(filesToDelete, obsoleteManifests...)
563 1 : filesToDelete = append(filesToDelete, obsoleteOptions...)
564 1 : filesToDelete = append(filesToDelete, obsoleteTables...)
565 1 : filesToDelete = append(filesToDelete, obsoleteBlobs...)
566 1 : for _, f := range obsoleteLogs {
567 1 : filesToDelete = append(filesToDelete, obsoleteFile{
568 1 : fileType: base.FileTypeLog,
569 1 : fs: f.FS,
570 1 : path: f.Path,
571 1 : fileNum: base.DiskFileNum(f.NumWAL),
572 1 : fileSize: f.ApproxFileSize,
573 1 : isLocal: true,
574 1 : })
575 1 : }
576 1 : for _, f := range obsoleteTables {
577 1 : d.fileCache.Evict(f.fileNum, base.FileTypeTable)
578 1 : }
579 1 : for _, f := range obsoleteBlobs {
580 1 : d.fileCache.Evict(f.fileNum, base.FileTypeBlob)
581 1 : }
582 1 : if len(filesToDelete) > 0 {
583 1 : d.cleanupManager.EnqueueJob(jobID, filesToDelete, objectStats)
584 1 : }
585 1 : if d.opts.private.testingAlwaysWaitForCleanup {
586 0 : d.cleanupManager.Wait()
587 0 : }
588 : }
589 :
590 1 : func (d *DB) maybeScheduleObsoleteObjectDeletion() {
591 1 : d.mu.Lock()
592 1 : defer d.mu.Unlock()
593 1 : if len(d.mu.versions.obsoleteTables) > 0 || len(d.mu.versions.obsoleteBlobs) > 0 {
594 1 : d.deleteObsoleteFiles(d.newJobIDLocked())
595 1 : }
596 : }
597 :
598 1 : func mergeObsoleteFiles(a, b []obsoleteFile) []obsoleteFile {
599 1 : if len(b) == 0 {
600 1 : return a
601 1 : }
602 :
603 1 : a = append(a, b...)
604 1 : slices.SortFunc(a, cmpObsoleteFileNumbers)
605 1 : return slices.CompactFunc(a, func(a, b obsoleteFile) bool {
606 1 : return a.fileNum == b.fileNum
607 1 : })
608 : }
609 :
610 1 : func cmpObsoleteFileNumbers(a, b obsoleteFile) int {
611 1 : return cmp.Compare(a.fileNum, b.fileNum)
612 1 : }
613 :
614 : // objectInfo describes an object in object storage (either a sstable or a blob
615 : // file).
616 : type objectInfo struct {
617 : fileInfo
618 : isLocal bool
619 : }
620 :
621 1 : func (o objectInfo) asObsoleteFile(fs vfs.FS, fileType base.FileType, dirname string) obsoleteFile {
622 1 : return obsoleteFile{
623 1 : fileType: fileType,
624 1 : fs: fs,
625 1 : path: base.MakeFilepath(fs, dirname, fileType, o.FileNum),
626 1 : fileNum: o.FileNum,
627 1 : fileSize: o.FileSize,
628 1 : isLocal: o.isLocal,
629 1 : }
630 1 : }
631 :
632 1 : func calculateObsoleteObjectStats(files []obsoleteFile) (total, local countAndSize) {
633 1 : for _, of := range files {
634 1 : if of.isLocal {
635 1 : local.count++
636 1 : local.size += of.fileSize
637 1 : }
638 1 : total.count++
639 1 : total.size += of.fileSize
640 : }
641 1 : return total, local
642 : }
643 :
644 : type obsoleteObjectStats struct {
645 : tablesLocal countAndSize
646 : tablesAll countAndSize
647 : blobFilesLocal countAndSize
648 : blobFilesAll countAndSize
649 : }
650 :
651 1 : func (s *obsoleteObjectStats) Add(other obsoleteObjectStats) {
652 1 : s.tablesLocal.Add(other.tablesLocal)
653 1 : s.tablesAll.Add(other.tablesAll)
654 1 : s.blobFilesLocal.Add(other.blobFilesLocal)
655 1 : s.blobFilesAll.Add(other.blobFilesAll)
656 1 : }
657 :
658 0 : func (s *obsoleteObjectStats) Sub(other obsoleteObjectStats) {
659 0 : s.tablesLocal.Sub(other.tablesLocal)
660 0 : s.tablesAll.Sub(other.tablesAll)
661 0 : s.blobFilesLocal.Sub(other.blobFilesLocal)
662 0 : s.blobFilesAll.Sub(other.blobFilesAll)
663 0 : }
664 :
665 : type countAndSize struct {
666 : count uint64
667 : size uint64
668 : }
669 :
670 1 : func (c *countAndSize) Add(other countAndSize) {
671 1 : c.count += other.count
672 1 : c.size += other.size
673 1 : }
674 :
675 0 : func (c *countAndSize) Sub(other countAndSize) {
676 0 : c.count = invariants.SafeSub(c.count, other.count)
677 0 : c.size = invariants.SafeSub(c.size, other.size)
678 0 : }
679 :
680 1 : func makeZombieObjects() zombieObjects {
681 1 : return zombieObjects{
682 1 : objs: make(map[base.DiskFileNum]objectInfo),
683 1 : }
684 1 : }
685 :
686 : // zombieObjects tracks a set of objects that are no longer required by the most
687 : // recent version of the LSM, but may still need to be accessed by an open
688 : // iterator. Such objects are 'dead,' but cannot be deleted until iterators that
689 : // may access them are closed.
690 : type zombieObjects struct {
691 : objs map[base.DiskFileNum]objectInfo
692 : totalSize uint64
693 : localSize uint64
694 : localCount uint64
695 : }
696 :
697 : // Add adds an object to the set of zombie objects.
698 1 : func (z *zombieObjects) Add(obj objectInfo) {
699 1 : if _, ok := z.objs[obj.FileNum]; ok {
700 0 : panic(errors.AssertionFailedf("zombie object %s already exists", obj.FileNum))
701 : }
702 1 : z.objs[obj.FileNum] = obj
703 1 : z.totalSize += obj.FileSize
704 1 : if obj.isLocal {
705 1 : z.localSize += obj.FileSize
706 1 : z.localCount++
707 1 : }
708 : }
709 :
710 : // AddMetadata is like Add, but takes an ObjectMetadata and the object's size.
711 1 : func (z *zombieObjects) AddMetadata(meta *objstorage.ObjectMetadata, size uint64) {
712 1 : z.Add(objectInfo{
713 1 : fileInfo: fileInfo{
714 1 : FileNum: meta.DiskFileNum,
715 1 : FileSize: size,
716 1 : },
717 1 : isLocal: !meta.IsRemote(),
718 1 : })
719 1 : }
720 :
721 : // Count returns the number of zombie objects.
722 1 : func (z *zombieObjects) Count() int {
723 1 : return len(z.objs)
724 1 : }
725 :
726 : // Extract removes an object from the set of zombie objects, returning the
727 : // object that was removed.
728 1 : func (z *zombieObjects) Extract(fileNum base.DiskFileNum) objectInfo {
729 1 : obj, ok := z.objs[fileNum]
730 1 : if !ok {
731 0 : panic(errors.AssertionFailedf("zombie object %s not found", fileNum))
732 : }
733 1 : delete(z.objs, fileNum)
734 1 :
735 1 : // Detect underflow in case we have a bug that causes an object's size to be
736 1 : // mutated.
737 1 : if z.totalSize < obj.FileSize {
738 0 : panic(errors.AssertionFailedf("zombie object %s size %d is greater than total size %d", fileNum, obj.FileSize, z.totalSize))
739 : }
740 1 : if obj.isLocal && z.localSize < obj.FileSize {
741 0 : panic(errors.AssertionFailedf("zombie object %s size %d is greater than local size %d", fileNum, obj.FileSize, z.localSize))
742 : }
743 :
744 1 : z.totalSize -= obj.FileSize
745 1 : if obj.isLocal {
746 1 : z.localSize -= obj.FileSize
747 1 : z.localCount--
748 1 : }
749 1 : return obj
750 : }
751 :
752 : // TotalSize returns the size of all objects in the set.
753 0 : func (z *zombieObjects) TotalSize() uint64 {
754 0 : return z.totalSize
755 0 : }
756 :
757 : // LocalStats returns the count and size of all local objects in the set.
758 0 : func (z *zombieObjects) LocalStats() (count uint64, size uint64) {
759 0 : return z.localCount, z.localSize
760 0 : }
|