Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package objstorageprovider
6 :
7 : import (
8 : "cmp"
9 : "context"
10 : "io"
11 : "os"
12 : "slices"
13 : "sync"
14 : "sync/atomic"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/errors/oserror"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/invariants"
20 : "github.com/cockroachdb/pebble/objstorage"
21 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
22 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
23 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
24 : "github.com/cockroachdb/pebble/objstorage/remote"
25 : "github.com/cockroachdb/pebble/vfs"
26 : )
27 :
28 : // provider is the implementation of objstorage.Provider.
29 : type provider struct {
30 : st Settings
31 :
32 : fsDir vfs.File
33 :
34 : tracer *objiotracing.Tracer
35 :
36 : remote remoteSubsystem
37 :
38 : mu struct {
39 : sync.RWMutex
40 :
41 : remote remoteLockedState
42 :
43 : // TODO(radu): move these fields to a localLockedState struct.
44 : // localObjectsChanged is incremented whenever non-remote objects are created.
45 : // The purpose of this counter is to avoid syncing the local filesystem when
46 : // only remote objects are changed.
47 : localObjectsChangeCounter uint64
48 : // localObjectsChangeCounterSynced is the value of localObjectsChangeCounter
49 : // value at the time the last completed sync was launched.
50 : localObjectsChangeCounterSynced uint64
51 :
52 : // knownObjects maintains information about objects that are known to the provider.
53 : // It is initialized with the list of files in the manifest when we open a DB.
54 : knownObjects map[base.DiskFileNum]objstorage.ObjectMetadata
55 :
56 : // protectedObjects are objects that cannot be unreferenced because they
57 : // have outstanding SharedObjectBackingHandles. The value is a count of outstanding handles
58 : protectedObjects map[base.DiskFileNum]int
59 : }
60 : }
61 :
62 : var _ objstorage.Provider = (*provider)(nil)
63 :
64 : // Settings that must be specified when creating the provider.
65 : type Settings struct {
66 : Logger base.Logger
67 :
68 : // Local filesystem configuration.
69 : FS vfs.FS
70 : FSDirName string
71 :
72 : // FSDirInitialListing is a listing of FSDirName at the time of calling Open.
73 : //
74 : // This is an optional optimization to avoid double listing on Open when the
75 : // higher layer already has a listing. When nil, we obtain the listing on
76 : // Open.
77 : FSDirInitialListing []string
78 :
79 : // Cleaner cleans obsolete files from the local filesystem.
80 : //
81 : // The default cleaner uses the DeleteCleaner.
82 : FSCleaner base.Cleaner
83 :
84 : // NoSyncOnClose decides whether the implementation will enforce a
85 : // close-time synchronization (e.g., fdatasync() or sync_file_range())
86 : // on files it writes to. Setting this to true removes the guarantee for a
87 : // sync on close. Some implementations can still issue a non-blocking sync.
88 : NoSyncOnClose bool
89 :
90 : // BytesPerSync enables periodic syncing of files in order to smooth out
91 : // writes to disk. This option does not provide any persistence guarantee, but
92 : // is used to avoid latency spikes if the OS automatically decides to write
93 : // out a large chunk of dirty filesystem buffers.
94 : BytesPerSync int
95 :
96 : // Local contains fields that are only relevant for files stored on the local
97 : // filesystem.
98 : Local struct {
99 : // TODO(radu): move FSCleaner, NoSyncOnClose, BytesPerSync here.
100 :
101 : // ReadaheadConfig is used to retrieve the current readahead mode; it is
102 : // consulted whenever a read handle is initialized.
103 : ReadaheadConfig *ReadaheadConfig
104 : }
105 :
106 : // Fields here are set only if the provider is to support remote objects
107 : // (experimental).
108 : Remote struct {
109 : StorageFactory remote.StorageFactory
110 :
111 : // If CreateOnShared is non-zero, sstables are created on remote storage using
112 : // the CreateOnSharedLocator (when the PreferSharedStorage create option is
113 : // true).
114 : CreateOnShared remote.CreateOnSharedStrategy
115 : CreateOnSharedLocator remote.Locator
116 :
117 : // CacheSizeBytes is the size of the on-disk block cache for objects
118 : // on remote storage. If it is 0, no cache is used.
119 : CacheSizeBytes int64
120 :
121 : // CacheBlockSize is the block size of the cache; if 0, the default of 32KB is used.
122 : CacheBlockSize int
123 :
124 : // ShardingBlockSize is the size of a shard block. The cache is split into contiguous
125 : // ShardingBlockSize units. The units are distributed across multiple independent shards
126 : // of the cache, via a hash(offset) modulo num shards operation. The cache replacement
127 : // policies operate at the level of shard, not whole cache. This is done to reduce lock
128 : // contention.
129 : //
130 : // If ShardingBlockSize is 0, the default of 1 MB is used.
131 : ShardingBlockSize int64
132 :
133 : // The number of independent shards the cache leverages. Each shard is the same size,
134 : // and a hash of filenum & offset map a read to a certain shard. If set to 0,
135 : // 2*runtime.GOMAXPROCS is used as the shard count.
136 : CacheShardCount int
137 :
138 : // TODO(radu): allow the cache to live on another FS/location (e.g. to use
139 : // instance-local SSD).
140 : }
141 : }
142 :
143 : // ReadaheadConfig is a container for the settings that control the use of
144 : // read-ahead.
145 : //
146 : // It stores two ReadaheadModes:
147 : // - Informed is the type of read-ahead for operations that are known to read a
148 : // large consecutive chunk of a file.
149 : // - Speculative is the type of read-ahead used automatically, when consecutive
150 : // reads are detected.
151 : //
152 : // The settings can be changed and read atomically.
153 : type ReadaheadConfig struct {
154 : value atomic.Uint32
155 : }
156 :
157 : // These are the default readahead modes when a config is not specified.
158 : const (
159 : defaultReadaheadInformed = FadviseSequential
160 : defaultReadaheadSpeculative = FadviseSequential
161 : )
162 :
163 : // NewReadaheadConfig returns a new readahead config container initialized with
164 : // default values.
165 2 : func NewReadaheadConfig() *ReadaheadConfig {
166 2 : rc := &ReadaheadConfig{}
167 2 : rc.Set(defaultReadaheadInformed, defaultReadaheadSpeculative)
168 2 : return rc
169 2 : }
170 :
171 : // Set the informed and speculative readahead modes.
172 2 : func (rc *ReadaheadConfig) Set(informed, speculative ReadaheadMode) {
173 2 : rc.value.Store(uint32(speculative)<<8 | uint32(informed))
174 2 : }
175 :
176 : // Informed returns the type of read-ahead for operations that are known to read
177 : // a large consecutive chunk of a file.
178 2 : func (rc *ReadaheadConfig) Informed() ReadaheadMode {
179 2 : return ReadaheadMode(rc.value.Load() & 0xff)
180 2 : }
181 :
182 : // Speculative returns the type of read-ahead used automatically, when
183 : // consecutive reads are detected.
184 2 : func (rc *ReadaheadConfig) Speculative() ReadaheadMode {
185 2 : return ReadaheadMode(rc.value.Load() >> 8)
186 2 : }
187 :
188 : // ReadaheadMode indicates the type of read-ahead to use, either for informed
189 : // read-ahead (e.g. compactions) or speculative read-ahead.
190 : type ReadaheadMode uint8
191 :
192 : const (
193 : // NoReadahead disables readahead altogether.
194 : NoReadahead ReadaheadMode = iota
195 :
196 : // SysReadahead enables the use of SYS_READAHEAD call to prefetch data.
197 : // The prefetch window grows dynamically as consecutive writes are detected.
198 : SysReadahead
199 :
200 : // FadviseSequential enables the use of FADV_SEQUENTIAL. For informed
201 : // read-ahead, FADV_SEQUENTIAL is used from the beginning. For speculative
202 : // read-ahead, SYS_READAHEAD is first used until the window reaches the
203 : // maximum size, then we switch to FADV_SEQUENTIAL.
204 : FadviseSequential
205 : )
206 :
207 : // DefaultSettings initializes default settings (with no remote storage),
208 : // suitable for tests and tools.
209 1 : func DefaultSettings(fs vfs.FS, dirName string) Settings {
210 1 : return Settings{
211 1 : Logger: base.DefaultLogger,
212 1 : FS: fs,
213 1 : FSDirName: dirName,
214 1 : FSCleaner: base.DeleteCleaner{},
215 1 : NoSyncOnClose: false,
216 1 : BytesPerSync: 512 * 1024, // 512KB
217 1 : }
218 1 : }
219 :
220 : // Open creates the provider.
221 2 : func Open(settings Settings) (objstorage.Provider, error) {
222 2 : // Note: we can't just `return open(settings)` because in an error case we
223 2 : // would return (*provider)(nil) which is not objstorage.Provider(nil).
224 2 : p, err := open(settings)
225 2 : if err != nil {
226 1 : return nil, err
227 1 : }
228 2 : return p, nil
229 : }
230 :
231 2 : func open(settings Settings) (p *provider, _ error) {
232 2 : fsDir, err := settings.FS.OpenDir(settings.FSDirName)
233 2 : if err != nil {
234 1 : return nil, err
235 1 : }
236 :
237 2 : defer func() {
238 2 : if p == nil {
239 0 : fsDir.Close()
240 0 : }
241 : }()
242 :
243 2 : if settings.Local.ReadaheadConfig == nil {
244 2 : settings.Local.ReadaheadConfig = NewReadaheadConfig()
245 2 : }
246 :
247 2 : p = &provider{
248 2 : st: settings,
249 2 : fsDir: fsDir,
250 2 : }
251 2 : p.mu.knownObjects = make(map[base.DiskFileNum]objstorage.ObjectMetadata)
252 2 : p.mu.protectedObjects = make(map[base.DiskFileNum]int)
253 2 :
254 2 : if objiotracing.Enabled {
255 0 : p.tracer = objiotracing.Open(settings.FS, settings.FSDirName)
256 0 : }
257 :
258 : // Add local FS objects.
259 2 : if err := p.vfsInit(); err != nil {
260 0 : return nil, err
261 0 : }
262 :
263 : // Initialize remote subsystem (if configured) and add remote objects.
264 2 : if err := p.remoteInit(); err != nil {
265 0 : return nil, err
266 0 : }
267 :
268 2 : return p, nil
269 : }
270 :
271 : // Close is part of the objstorage.Provider interface.
272 2 : func (p *provider) Close() error {
273 2 : err := p.sharedClose()
274 2 : if p.fsDir != nil {
275 2 : err = firstError(err, p.fsDir.Close())
276 2 : p.fsDir = nil
277 2 : }
278 2 : if objiotracing.Enabled {
279 0 : if p.tracer != nil {
280 0 : p.tracer.Close()
281 0 : p.tracer = nil
282 0 : }
283 : }
284 2 : return err
285 : }
286 :
287 : // OpenForReading opens an existing object.
288 : func (p *provider) OpenForReading(
289 : ctx context.Context,
290 : fileType base.FileType,
291 : fileNum base.DiskFileNum,
292 : opts objstorage.OpenOptions,
293 2 : ) (objstorage.Readable, error) {
294 2 : meta, err := p.Lookup(fileType, fileNum)
295 2 : if err != nil {
296 1 : if opts.MustExist {
297 0 : p.st.Logger.Fatalf("%v", err)
298 0 : }
299 1 : return nil, err
300 : }
301 :
302 2 : var r objstorage.Readable
303 2 : if !meta.IsRemote() {
304 2 : r, err = p.vfsOpenForReading(ctx, fileType, fileNum, opts)
305 2 : } else {
306 2 : r, err = p.remoteOpenForReading(ctx, meta, opts)
307 2 : if err != nil && p.isNotExistError(meta, err) {
308 1 : // Wrap the error so that IsNotExistError functions properly.
309 1 : err = errors.Mark(err, os.ErrNotExist)
310 1 : }
311 : }
312 2 : if err != nil {
313 1 : return nil, err
314 1 : }
315 2 : if objiotracing.Enabled {
316 0 : r = p.tracer.WrapReadable(ctx, r, fileNum)
317 0 : }
318 2 : return r, nil
319 : }
320 :
321 : // Create creates a new object and opens it for writing.
322 : //
323 : // The object is not guaranteed to be durable (accessible in case of crashes)
324 : // until Sync is called.
325 : func (p *provider) Create(
326 : ctx context.Context,
327 : fileType base.FileType,
328 : fileNum base.DiskFileNum,
329 : opts objstorage.CreateOptions,
330 2 : ) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error) {
331 2 : if opts.PreferSharedStorage && p.st.Remote.CreateOnShared != remote.CreateOnSharedNone {
332 2 : w, meta, err = p.sharedCreate(ctx, fileType, fileNum, p.st.Remote.CreateOnSharedLocator, opts)
333 2 : } else {
334 2 : var category vfs.DiskWriteCategory
335 2 : if opts.WriteCategory != "" {
336 2 : category = opts.WriteCategory
337 2 : } else {
338 2 : category = vfs.WriteCategoryUnspecified
339 2 : }
340 2 : w, meta, err = p.vfsCreate(ctx, fileType, fileNum, category)
341 : }
342 2 : if err != nil {
343 1 : err = errors.Wrapf(err, "creating object %s", fileNum)
344 1 : return nil, objstorage.ObjectMetadata{}, err
345 1 : }
346 2 : p.addMetadata(meta)
347 2 : if objiotracing.Enabled {
348 0 : w = p.tracer.WrapWritable(ctx, w, fileNum)
349 0 : }
350 2 : return w, meta, nil
351 : }
352 :
353 : // Remove removes an object.
354 : //
355 : // Note that if the object is remote, the object is only (conceptually) removed
356 : // from this provider. If other providers have references on the remote object,
357 : // it will not be removed.
358 : //
359 : // The object is not guaranteed to be durably removed until Sync is called.
360 2 : func (p *provider) Remove(fileType base.FileType, fileNum base.DiskFileNum) error {
361 2 : meta, err := p.Lookup(fileType, fileNum)
362 2 : if err != nil {
363 1 : return err
364 1 : }
365 :
366 2 : if !meta.IsRemote() {
367 2 : err = p.vfsRemove(fileType, fileNum)
368 2 : } else {
369 2 : // TODO(radu): implement remote object removal (i.e. deref).
370 2 : err = p.sharedUnref(meta)
371 2 : if err != nil && p.isNotExistError(meta, err) {
372 0 : // Wrap the error so that IsNotExistError functions properly.
373 0 : err = errors.Mark(err, os.ErrNotExist)
374 0 : }
375 : }
376 2 : if err != nil && !p.IsNotExistError(err) {
377 1 : // We want to be able to retry a Remove, so we keep the object in our list.
378 1 : // TODO(radu): we should mark the object as "zombie" and not allow any other
379 1 : // operations.
380 1 : return errors.Wrapf(err, "removing object %s", fileNum)
381 1 : }
382 :
383 2 : p.removeMetadata(fileNum)
384 2 : return err
385 : }
386 :
387 1 : func (p *provider) isNotExistError(meta objstorage.ObjectMetadata, err error) bool {
388 1 : if meta.Remote.Storage != nil {
389 1 : return meta.Remote.Storage.IsNotExistError(err)
390 1 : }
391 0 : return oserror.IsNotExist(err)
392 : }
393 :
394 : // IsNotExistError is part of the objstorage.Provider interface.
395 2 : func (p *provider) IsNotExistError(err error) bool {
396 2 : // We use errors.Mark(err, os.ErrNotExist) for not-exist errors coming from
397 2 : // remote.Storage.
398 2 : return oserror.IsNotExist(err)
399 2 : }
400 :
401 : // Sync flushes the metadata from creation or removal of objects since the last Sync.
402 2 : func (p *provider) Sync() error {
403 2 : if err := p.vfsSync(); err != nil {
404 1 : return err
405 1 : }
406 2 : if err := p.sharedSync(); err != nil {
407 0 : return err
408 0 : }
409 2 : return nil
410 : }
411 :
412 : // LinkOrCopyFromLocal creates a new object that is either a copy of a given
413 : // local file or a hard link (if the new object is created on the same FS, and
414 : // if the FS supports it).
415 : //
416 : // The object is not guaranteed to be durable (accessible in case of crashes)
417 : // until Sync is called.
418 : func (p *provider) LinkOrCopyFromLocal(
419 : ctx context.Context,
420 : srcFS vfs.FS,
421 : srcFilePath string,
422 : dstFileType base.FileType,
423 : dstFileNum base.DiskFileNum,
424 : opts objstorage.CreateOptions,
425 2 : ) (objstorage.ObjectMetadata, error) {
426 2 : shared := opts.PreferSharedStorage && p.st.Remote.CreateOnShared != remote.CreateOnSharedNone
427 2 : if !shared && srcFS == p.st.FS {
428 2 : // Wrap the normal filesystem with one which wraps newly created files with
429 2 : // vfs.NewSyncingFile.
430 2 : fs := vfs.NewSyncingFS(p.st.FS, vfs.SyncingFileOptions{
431 2 : NoSyncOnClose: p.st.NoSyncOnClose,
432 2 : BytesPerSync: p.st.BytesPerSync,
433 2 : })
434 2 : dstPath := p.vfsPath(dstFileType, dstFileNum)
435 2 : if err := vfs.LinkOrCopy(fs, srcFilePath, dstPath); err != nil {
436 1 : return objstorage.ObjectMetadata{}, err
437 1 : }
438 :
439 2 : meta := objstorage.ObjectMetadata{
440 2 : DiskFileNum: dstFileNum,
441 2 : FileType: dstFileType,
442 2 : }
443 2 : p.addMetadata(meta)
444 2 : return meta, nil
445 : }
446 : // Create the object and copy the data.
447 2 : w, meta, err := p.Create(ctx, dstFileType, dstFileNum, opts)
448 2 : if err != nil {
449 0 : return objstorage.ObjectMetadata{}, err
450 0 : }
451 2 : f, err := srcFS.Open(srcFilePath, vfs.SequentialReadsOption)
452 2 : if err != nil {
453 0 : return objstorage.ObjectMetadata{}, err
454 0 : }
455 2 : defer f.Close()
456 2 : buf := make([]byte, 64*1024)
457 2 : for {
458 2 : n, readErr := f.Read(buf)
459 2 : if readErr != nil && readErr != io.EOF {
460 0 : w.Abort()
461 0 : return objstorage.ObjectMetadata{}, readErr
462 0 : }
463 :
464 2 : if n > 0 {
465 2 : if err := w.Write(buf[:n]); err != nil {
466 0 : w.Abort()
467 0 : return objstorage.ObjectMetadata{}, err
468 0 : }
469 : }
470 :
471 2 : if readErr == io.EOF {
472 2 : break
473 : }
474 : }
475 2 : if err := w.Finish(); err != nil {
476 0 : return objstorage.ObjectMetadata{}, err
477 0 : }
478 2 : return meta, nil
479 : }
480 :
481 : // Lookup is part of the objstorage.Provider interface.
482 : func (p *provider) Lookup(
483 : fileType base.FileType, fileNum base.DiskFileNum,
484 2 : ) (objstorage.ObjectMetadata, error) {
485 2 : p.mu.RLock()
486 2 : defer p.mu.RUnlock()
487 2 : meta, ok := p.mu.knownObjects[fileNum]
488 2 : if !ok {
489 2 : return objstorage.ObjectMetadata{}, errors.Wrapf(
490 2 : os.ErrNotExist,
491 2 : "file %s (type %d) unknown to the objstorage provider",
492 2 : fileNum, errors.Safe(fileType),
493 2 : )
494 2 : }
495 2 : if meta.FileType != fileType {
496 0 : return objstorage.ObjectMetadata{}, base.AssertionFailedf(
497 0 : "file %s type mismatch (known type %d, expected type %d)",
498 0 : fileNum, errors.Safe(meta.FileType), errors.Safe(fileType),
499 0 : )
500 0 : }
501 2 : return meta, nil
502 : }
503 :
504 : // Path is part of the objstorage.Provider interface.
505 2 : func (p *provider) Path(meta objstorage.ObjectMetadata) string {
506 2 : if !meta.IsRemote() {
507 2 : return p.vfsPath(meta.FileType, meta.DiskFileNum)
508 2 : }
509 2 : return p.remotePath(meta)
510 : }
511 :
512 : // Size returns the size of the object.
513 2 : func (p *provider) Size(meta objstorage.ObjectMetadata) (int64, error) {
514 2 : if !meta.IsRemote() {
515 2 : return p.vfsSize(meta.FileType, meta.DiskFileNum)
516 2 : }
517 2 : return p.remoteSize(meta)
518 : }
519 :
520 : // List is part of the objstorage.Provider interface.
521 2 : func (p *provider) List() []objstorage.ObjectMetadata {
522 2 : p.mu.RLock()
523 2 : defer p.mu.RUnlock()
524 2 : res := make([]objstorage.ObjectMetadata, 0, len(p.mu.knownObjects))
525 2 : for _, meta := range p.mu.knownObjects {
526 2 : res = append(res, meta)
527 2 : }
528 2 : slices.SortFunc(res, func(a, b objstorage.ObjectMetadata) int {
529 2 : return cmp.Compare(a.DiskFileNum, b.DiskFileNum)
530 2 : })
531 2 : return res
532 : }
533 :
534 : // Metrics is part of the objstorage.Provider interface.
535 2 : func (p *provider) Metrics() sharedcache.Metrics {
536 2 : if p.remote.cache != nil {
537 1 : return p.remote.cache.Metrics()
538 1 : }
539 2 : return sharedcache.Metrics{}
540 : }
541 :
542 : // CheckpointState is part of the objstorage.Provider interface.
543 : func (p *provider) CheckpointState(
544 : fs vfs.FS, dir string, fileType base.FileType, fileNums []base.DiskFileNum,
545 1 : ) error {
546 1 : p.mu.Lock()
547 1 : defer p.mu.Unlock()
548 1 : for i := range fileNums {
549 1 : if _, ok := p.mu.knownObjects[fileNums[i]]; !ok {
550 0 : return errors.Wrapf(
551 0 : os.ErrNotExist,
552 0 : "file %s (type %d) unknown to the objstorage provider",
553 0 : fileNums[i], errors.Safe(fileType),
554 0 : )
555 0 : }
556 : // Prevent this object from deletion, at least for the life of this instance.
557 1 : p.mu.protectedObjects[fileNums[i]] = p.mu.protectedObjects[fileNums[i]] + 1
558 : }
559 :
560 1 : if p.remote.catalog != nil {
561 1 : return p.remote.catalog.Checkpoint(fs, dir)
562 1 : }
563 0 : return nil
564 : }
565 :
566 2 : func (p *provider) addMetadata(meta objstorage.ObjectMetadata) {
567 2 : p.mu.Lock()
568 2 : defer p.mu.Unlock()
569 2 : p.addMetadataLocked(meta)
570 2 : }
571 :
572 2 : func (p *provider) addMetadataLocked(meta objstorage.ObjectMetadata) {
573 2 : if invariants.Enabled {
574 2 : meta.AssertValid()
575 2 : }
576 2 : p.mu.knownObjects[meta.DiskFileNum] = meta
577 2 : if meta.IsRemote() {
578 2 : p.mu.remote.catalogBatch.AddObject(remoteobjcat.RemoteObjectMetadata{
579 2 : FileNum: meta.DiskFileNum,
580 2 : FileType: meta.FileType,
581 2 : CreatorID: meta.Remote.CreatorID,
582 2 : CreatorFileNum: meta.Remote.CreatorFileNum,
583 2 : Locator: meta.Remote.Locator,
584 2 : CleanupMethod: meta.Remote.CleanupMethod,
585 2 : CustomObjectName: meta.Remote.CustomObjectName,
586 2 : })
587 2 : if meta.IsExternal() {
588 2 : p.mu.remote.addExternalObject(meta)
589 2 : }
590 2 : } else {
591 2 : p.mu.localObjectsChangeCounter++
592 2 : }
593 : }
594 :
595 2 : func (p *provider) removeMetadata(fileNum base.DiskFileNum) {
596 2 : p.mu.Lock()
597 2 : defer p.mu.Unlock()
598 2 :
599 2 : meta, ok := p.mu.knownObjects[fileNum]
600 2 : if !ok {
601 0 : return
602 0 : }
603 2 : delete(p.mu.knownObjects, fileNum)
604 2 : if meta.IsExternal() {
605 2 : p.mu.remote.removeExternalObject(meta)
606 2 : }
607 2 : if meta.IsRemote() {
608 2 : p.mu.remote.catalogBatch.DeleteObject(fileNum)
609 2 : } else {
610 2 : p.mu.localObjectsChangeCounter++
611 2 : }
612 : }
613 :
614 : // protectObject prevents the unreferencing of a remote object until
615 : // unprotectObject is called.
616 2 : func (p *provider) protectObject(fileNum base.DiskFileNum) {
617 2 : p.mu.Lock()
618 2 : defer p.mu.Unlock()
619 2 : p.mu.protectedObjects[fileNum] = p.mu.protectedObjects[fileNum] + 1
620 2 : }
621 :
622 1 : func (p *provider) unprotectObject(fileNum base.DiskFileNum) {
623 1 : p.mu.Lock()
624 1 : defer p.mu.Unlock()
625 1 : v := p.mu.protectedObjects[fileNum]
626 1 : if invariants.Enabled && v == 0 {
627 0 : panic("invalid protection count")
628 : }
629 1 : if v > 1 {
630 0 : p.mu.protectedObjects[fileNum] = v - 1
631 1 : } else {
632 1 : delete(p.mu.protectedObjects, fileNum)
633 1 : // TODO(radu): check if the object is still in knownObject; if not, unref it
634 1 : // now.
635 1 : }
636 : }
637 :
638 2 : func (p *provider) isProtected(fileNum base.DiskFileNum) bool {
639 2 : p.mu.Lock()
640 2 : defer p.mu.Unlock()
641 2 : return p.mu.protectedObjects[fileNum] > 0
642 2 : }
|