Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "cmp"
9 : "slices"
10 :
11 : "github.com/cockroachdb/errors"
12 : "github.com/cockroachdb/errors/oserror"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/internal/deletepacer"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/metrics"
17 : "github.com/cockroachdb/pebble/objstorage"
18 : "github.com/cockroachdb/pebble/vfs"
19 : "github.com/cockroachdb/pebble/wal"
20 : )
21 :
22 : // Cleaner exports the base.Cleaner type.
23 : type Cleaner = base.Cleaner
24 :
25 : // DeleteCleaner exports the base.DeleteCleaner type.
26 : type DeleteCleaner = base.DeleteCleaner
27 :
28 : // ArchiveCleaner exports the base.ArchiveCleaner type.
29 : type ArchiveCleaner = base.ArchiveCleaner
30 :
31 : func openDeletePacer(
32 : opts *Options, objProvider objstorage.Provider, diskFreeSpaceFn deletepacer.DiskFreeSpaceFn,
33 1 : ) *deletepacer.DeletePacer {
34 1 : return deletepacer.Open(
35 1 : opts.DeletionPacing,
36 1 : opts.Logger,
37 1 : diskFreeSpaceFn,
38 1 : func(of deletepacer.ObsoleteFile, jobID int) {
39 1 : deleteObsoleteFile(opts.Cleaner, objProvider, opts.EventListener, of, jobID)
40 1 : },
41 : )
42 : }
43 :
44 : // deleteObsoleteFile deletes a file or object, once the delete pacer decided it is time.
45 : func deleteObsoleteFile(
46 : cleaner Cleaner,
47 : objProvider objstorage.Provider,
48 : eventListener *EventListener,
49 : of deletepacer.ObsoleteFile,
50 : jobID int,
51 1 : ) {
52 1 : path := of.Path
53 1 : var err error
54 1 : if of.FileType == base.FileTypeTable || of.FileType == base.FileTypeBlob {
55 1 : var meta objstorage.ObjectMetadata
56 1 : meta, err = objProvider.Lookup(of.FileType, of.FileNum)
57 1 : if err != nil {
58 0 : path = "<nil>"
59 1 : } else {
60 1 : path = objProvider.Path(meta)
61 1 : err = objProvider.Remove(of.FileType, of.FileNum)
62 1 : }
63 1 : if objProvider.IsNotExistError(err) {
64 0 : return
65 0 : }
66 1 : } else {
67 1 : // TODO(peter): need to handle this error, probably by re-adding the
68 1 : // file that couldn't be deleted to one of the obsolete slices map.
69 1 : err := cleaner.Clean(of.FS, of.FileType, path)
70 1 : if oserror.IsNotExist(err) {
71 0 : return
72 0 : }
73 : }
74 :
75 1 : switch of.FileType {
76 1 : case base.FileTypeTable:
77 1 : eventListener.TableDeleted(TableDeleteInfo{
78 1 : JobID: jobID,
79 1 : Path: path,
80 1 : FileNum: of.FileNum,
81 1 : Err: err,
82 1 : })
83 1 : case base.FileTypeBlob:
84 1 : eventListener.BlobFileDeleted(BlobFileDeleteInfo{
85 1 : JobID: jobID,
86 1 : Path: path,
87 1 : FileNum: of.FileNum,
88 1 : Err: err,
89 1 : })
90 1 : case base.FileTypeLog:
91 1 : eventListener.WALDeleted(WALDeleteInfo{
92 1 : JobID: jobID,
93 1 : Path: path,
94 1 : FileNum: of.FileNum,
95 1 : Err: err,
96 1 : })
97 1 : case base.FileTypeManifest:
98 1 : eventListener.ManifestDeleted(ManifestDeleteInfo{
99 1 : JobID: jobID,
100 1 : Path: path,
101 1 : FileNum: of.FileNum,
102 1 : Err: err,
103 1 : })
104 : }
105 : }
106 :
107 : // scanObsoleteFiles compares the provided directory listing to the set of
108 : // known, in-use files to find files no longer needed and adds those to the
109 : // internal lists of obsolete files. Note that the files are not actually
110 : // deleted by this method. A subsequent call to deleteObsoleteFiles must be
111 : // performed. Must be not be called concurrently with compactions and flushes
112 : // and will panic if any are in-progress. db.mu must be held when calling this
113 : // function.
114 1 : func (d *DB) scanObsoleteFiles(list []string, flushableIngests []*ingestedFlushable) {
115 1 : if d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
116 0 : panic(errors.AssertionFailedf("compaction or flush in progress"))
117 : }
118 :
119 1 : liveFileNums := make(map[base.DiskFileNum]struct{})
120 1 : d.mu.versions.addLiveFileNums(liveFileNums)
121 1 : // Protect against files which are only referred to by the ingestedFlushable
122 1 : // from being deleted. These are added to the flushable queue on WAL replay
123 1 : // and handle their own obsoletion/deletion. We exclude them from this obsolete
124 1 : // file scan to avoid double-deleting these files.
125 1 : for _, f := range flushableIngests {
126 1 : for _, file := range f.files {
127 1 : liveFileNums[file.TableBacking.DiskFileNum] = struct{}{}
128 1 : }
129 : }
130 :
131 1 : manifestFileNum := d.mu.versions.manifestFileNum
132 1 :
133 1 : var obsoleteTables []deletepacer.ObsoleteFile
134 1 : var obsoleteBlobs []deletepacer.ObsoleteFile
135 1 : var obsoleteOptions []deletepacer.ObsoleteFile
136 1 : var obsoleteManifests []deletepacer.ObsoleteFile
137 1 :
138 1 : for _, filename := range list {
139 1 : fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
140 1 : if !ok {
141 1 : continue
142 : }
143 1 : makeObsoleteFile := func() deletepacer.ObsoleteFile {
144 1 : of := deletepacer.ObsoleteFile{
145 1 : FileType: fileType,
146 1 : FS: d.opts.FS,
147 1 : Path: d.opts.FS.PathJoin(d.dirname, filename),
148 1 : FileNum: diskFileNum,
149 1 : Placement: base.Local,
150 1 : }
151 1 : if stat, err := d.opts.FS.Stat(filename); err == nil {
152 0 : of.FileSize = uint64(stat.Size())
153 0 : }
154 1 : return of
155 : }
156 1 : switch fileType {
157 1 : case base.FileTypeManifest:
158 1 : if diskFileNum >= manifestFileNum {
159 1 : continue
160 : }
161 1 : obsoleteManifests = append(obsoleteManifests, makeObsoleteFile())
162 1 : case base.FileTypeOptions:
163 1 : if diskFileNum >= d.optionsFileNum {
164 0 : continue
165 : }
166 1 : obsoleteOptions = append(obsoleteOptions, makeObsoleteFile())
167 1 : case base.FileTypeTable, base.FileTypeBlob:
168 : // Objects are handled through the objstorage provider below.
169 1 : default:
170 : // Don't delete files we don't know about.
171 : }
172 : }
173 :
174 1 : objects := d.objProvider.List()
175 1 : for _, obj := range objects {
176 1 : if _, ok := liveFileNums[obj.DiskFileNum]; ok {
177 1 : continue
178 : }
179 1 : if obj.FileType != base.FileTypeTable && obj.FileType != base.FileTypeBlob {
180 0 : // Ignore object types we don't know about.
181 0 : continue
182 : }
183 1 : of := deletepacer.ObsoleteFile{
184 1 : FileType: obj.FileType,
185 1 : FS: d.opts.FS,
186 1 : Path: base.MakeFilepath(d.opts.FS, d.dirname, obj.FileType, obj.DiskFileNum),
187 1 : FileNum: obj.DiskFileNum,
188 1 : Placement: obj.Placement(),
189 1 : }
190 1 : if size, err := d.objProvider.Size(obj); err == nil {
191 1 : of.FileSize = uint64(size)
192 1 : }
193 1 : if obj.FileType == base.FileTypeTable {
194 1 : obsoleteTables = append(obsoleteTables, of)
195 1 : } else {
196 1 : obsoleteBlobs = append(obsoleteBlobs, of)
197 1 : }
198 : }
199 :
200 1 : d.mu.versions.obsoleteTables = mergeObsoleteFiles(d.mu.versions.obsoleteTables, obsoleteTables)
201 1 : d.mu.versions.obsoleteBlobs = mergeObsoleteFiles(d.mu.versions.obsoleteBlobs, obsoleteBlobs)
202 1 : d.mu.versions.obsoleteManifests = mergeObsoleteFiles(d.mu.versions.obsoleteManifests, obsoleteManifests)
203 1 : d.mu.versions.obsoleteOptions = mergeObsoleteFiles(d.mu.versions.obsoleteOptions, obsoleteOptions)
204 : }
205 :
206 : // disableFileDeletions disables file deletions and then waits for any
207 : // in-progress deletion to finish. The caller is required to call
208 : // enableFileDeletions in order to enable file deletions again. It is ok for
209 : // multiple callers to disable file deletions simultaneously, though they must
210 : // all invoke enableFileDeletions in order for file deletions to be re-enabled
211 : // (there is an internal reference count on file deletion disablement).
212 : //
213 : // d.mu must be held when calling this method.
214 1 : func (d *DB) disableFileDeletions() {
215 1 : d.mu.fileDeletions.disableCount++
216 1 : d.mu.Unlock()
217 1 : defer d.mu.Lock()
218 1 : d.deletePacer.WaitForTesting()
219 1 : }
220 :
221 : // enableFileDeletions enables previously disabled file deletions. A cleanup job
222 : // is queued if necessary.
223 : //
224 : // d.mu must be held when calling this method.
225 1 : func (d *DB) enableFileDeletions() {
226 1 : if d.mu.fileDeletions.disableCount <= 0 {
227 0 : panic("pebble: file deletion disablement invariant violated")
228 : }
229 1 : d.mu.fileDeletions.disableCount--
230 1 : if d.mu.fileDeletions.disableCount > 0 {
231 0 : return
232 0 : }
233 1 : d.deleteObsoleteFiles(d.newJobIDLocked())
234 : }
235 :
236 : type fileInfo = base.FileInfo
237 :
238 : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
239 : //
240 : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
241 : //
242 : // Does nothing if file deletions are disabled (see disableFileDeletions). A
243 : // cleanup job will be scheduled when file deletions are re-enabled.
244 1 : func (d *DB) deleteObsoleteFiles(jobID JobID) {
245 1 : if d.mu.fileDeletions.disableCount > 0 {
246 1 : return
247 1 : }
248 1 : _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
249 1 :
250 1 : // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
251 1 : // log that has not had its contents flushed to an sstable.
252 1 : obsoleteLogs, err := d.mu.log.manager.Obsolete(wal.NumWAL(d.mu.versions.minUnflushedLogNum), noRecycle)
253 1 : if err != nil {
254 0 : panic(err)
255 : }
256 :
257 1 : obsoleteTables := slices.Clone(d.mu.versions.obsoleteTables)
258 1 : d.mu.versions.obsoleteTables = d.mu.versions.obsoleteTables[:0]
259 1 : obsoleteBlobs := slices.Clone(d.mu.versions.obsoleteBlobs)
260 1 : d.mu.versions.obsoleteBlobs = d.mu.versions.obsoleteBlobs[:0]
261 1 :
262 1 : // Ensure everything is already sorted. We want determinism for testing, and
263 1 : // we need the manifests to be sorted because we want to delete some
264 1 : // contiguous prefix of the older manifests.
265 1 : if invariants.Enabled {
266 1 : switch {
267 0 : case !slices.IsSortedFunc(d.mu.versions.obsoleteManifests, cmpObsoleteFileNumbers):
268 0 : d.opts.Logger.Fatalf("obsoleteManifests is not sorted")
269 0 : case !slices.IsSortedFunc(d.mu.versions.obsoleteOptions, cmpObsoleteFileNumbers):
270 0 : d.opts.Logger.Fatalf("obsoleteOptions is not sorted")
271 0 : case !slices.IsSortedFunc(obsoleteTables, cmpObsoleteFileNumbers):
272 0 : d.opts.Logger.Fatalf("obsoleteTables is not sorted")
273 0 : case !slices.IsSortedFunc(obsoleteBlobs, cmpObsoleteFileNumbers):
274 0 : d.opts.Logger.Fatalf("obsoleteBlobs is not sorted")
275 : }
276 : }
277 :
278 1 : var obsoleteManifests []deletepacer.ObsoleteFile
279 1 : manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
280 1 : if manifestsToDelete > 0 {
281 1 : obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
282 1 : d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
283 1 : if len(d.mu.versions.obsoleteManifests) == 0 {
284 0 : d.mu.versions.obsoleteManifests = nil
285 0 : }
286 : }
287 :
288 1 : obsoleteOptions := d.mu.versions.obsoleteOptions
289 1 : d.mu.versions.obsoleteOptions = nil
290 1 :
291 1 : // Release d.mu while preparing the cleanup job and possibly waiting.
292 1 : // Note the unusual order: Unlock and then Lock.
293 1 : d.mu.Unlock()
294 1 : defer d.mu.Lock()
295 1 :
296 1 : n := len(obsoleteLogs) + len(obsoleteTables) + len(obsoleteBlobs) + len(obsoleteManifests) + len(obsoleteOptions)
297 1 : filesToDelete := make([]deletepacer.ObsoleteFile, 0, n)
298 1 : filesToDelete = append(filesToDelete, obsoleteManifests...)
299 1 : filesToDelete = append(filesToDelete, obsoleteOptions...)
300 1 : filesToDelete = append(filesToDelete, obsoleteTables...)
301 1 : filesToDelete = append(filesToDelete, obsoleteBlobs...)
302 1 : for _, f := range obsoleteLogs {
303 1 : filesToDelete = append(filesToDelete, deletepacer.ObsoleteFile{
304 1 : FileType: base.FileTypeLog,
305 1 : FS: f.FS,
306 1 : Path: f.Path,
307 1 : FileNum: base.DiskFileNum(f.NumWAL),
308 1 : FileSize: f.ApproxFileSize,
309 1 : Placement: base.Local,
310 1 : })
311 1 : }
312 1 : for _, f := range obsoleteTables {
313 1 : d.fileCache.Evict(f.FileNum, base.FileTypeTable)
314 1 : }
315 1 : for _, f := range obsoleteBlobs {
316 1 : d.fileCache.Evict(f.FileNum, base.FileTypeBlob)
317 1 : }
318 1 : if len(filesToDelete) > 0 {
319 1 : d.deletePacer.Enqueue(int(jobID), filesToDelete...)
320 1 : }
321 1 : if d.opts.private.testingAlwaysWaitForCleanup {
322 0 : d.deletePacer.WaitForTesting()
323 0 : }
324 : }
325 :
326 1 : func (d *DB) maybeScheduleObsoleteObjectDeletion() {
327 1 : d.mu.Lock()
328 1 : defer d.mu.Unlock()
329 1 : if len(d.mu.versions.obsoleteTables) > 0 || len(d.mu.versions.obsoleteBlobs) > 0 {
330 1 : d.deleteObsoleteFiles(d.newJobIDLocked())
331 1 : }
332 : }
333 :
334 1 : func mergeObsoleteFiles(a, b []deletepacer.ObsoleteFile) []deletepacer.ObsoleteFile {
335 1 : if len(b) == 0 {
336 1 : return a
337 1 : }
338 :
339 1 : a = append(a, b...)
340 1 : slices.SortFunc(a, cmpObsoleteFileNumbers)
341 1 : return slices.CompactFunc(a, func(a, b deletepacer.ObsoleteFile) bool {
342 1 : return a.FileNum == b.FileNum
343 1 : })
344 : }
345 :
346 1 : func cmpObsoleteFileNumbers(a, b deletepacer.ObsoleteFile) int {
347 1 : return cmp.Compare(a.FileNum, b.FileNum)
348 1 : }
349 :
350 : // objectInfo describes an object in object storage (either a sstable or a blob
351 : // file).
352 : type objectInfo struct {
353 : fileInfo
354 : placement base.Placement
355 : }
356 :
357 : func (o objectInfo) asObsoleteFile(
358 : fs vfs.FS, fileType base.FileType, dirname string,
359 1 : ) deletepacer.ObsoleteFile {
360 1 : return deletepacer.ObsoleteFile{
361 1 : FileType: fileType,
362 1 : FS: fs,
363 1 : Path: base.MakeFilepath(fs, dirname, fileType, o.FileNum),
364 1 : FileNum: o.FileNum,
365 1 : FileSize: o.FileSize,
366 1 : Placement: o.placement,
367 1 : }
368 1 : }
369 :
370 1 : func makeZombieObjects() zombieObjects {
371 1 : return zombieObjects{
372 1 : objs: make(map[base.DiskFileNum]objectInfo),
373 1 : }
374 1 : }
375 :
376 : // zombieObjects tracks a set of objects that are no longer required by the most
377 : // recent version of the LSM, but may still need to be accessed by an open
378 : // iterator. Such objects are 'dead,' but cannot be deleted until iterators that
379 : // may access them are closed.
380 : type zombieObjects struct {
381 : objs map[base.DiskFileNum]objectInfo
382 : }
383 :
384 : // Add adds an object to the set of zombie objects.
385 1 : func (z *zombieObjects) Add(obj objectInfo) {
386 1 : if _, ok := z.objs[obj.FileNum]; ok {
387 0 : panic(errors.AssertionFailedf("zombie object %s already exists", obj.FileNum))
388 : }
389 1 : z.objs[obj.FileNum] = obj
390 : }
391 :
392 : // AddMetadata is like Add, but takes an ObjectMetadata and the object's size.
393 1 : func (z *zombieObjects) AddMetadata(meta *objstorage.ObjectMetadata, size uint64) {
394 1 : z.Add(objectInfo{
395 1 : fileInfo: fileInfo{
396 1 : FileNum: meta.DiskFileNum,
397 1 : FileSize: size,
398 1 : },
399 1 : placement: meta.Placement(),
400 1 : })
401 1 : }
402 :
403 : // Count returns the number of zombie objects.
404 1 : func (z *zombieObjects) Count() int {
405 1 : return len(z.objs)
406 1 : }
407 :
408 : // Extract removes an object from the set of zombie objects, returning the
409 : // object that was removed.
410 1 : func (z *zombieObjects) Extract(fileNum base.DiskFileNum) objectInfo {
411 1 : obj, ok := z.objs[fileNum]
412 1 : if !ok {
413 0 : panic(errors.AssertionFailedf("zombie object %s not found", fileNum))
414 : }
415 1 : delete(z.objs, fileNum)
416 1 : return obj
417 : }
418 :
419 : // Metrics returns the count and size of all objects in the set, broken down by placement.
420 0 : func (z *zombieObjects) Metrics() metrics.CountAndSizeByPlacement {
421 0 : var res metrics.CountAndSizeByPlacement
422 0 : for _, obj := range z.objs {
423 0 : res.Inc(obj.FileSize, obj.placement)
424 0 : }
425 0 : return res
426 : }
|