LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider - provider.go (source / functions) Hit Total Coverage
Test: 2024-09-24 08:18Z 785dc8d8 - tests + meta.lcov Lines: 276 328 84.1 %
Date: 2024-09-24 08:19:01 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package objstorageprovider
       6             : 
       7             : import (
       8             :         "cmp"
       9             :         "context"
      10             :         "io"
      11             :         "os"
      12             :         "slices"
      13             :         "sync"
      14             :         "sync/atomic"
      15             : 
      16             :         "github.com/cockroachdb/errors"
      17             :         "github.com/cockroachdb/errors/oserror"
      18             :         "github.com/cockroachdb/pebble/internal/base"
      19             :         "github.com/cockroachdb/pebble/internal/invariants"
      20             :         "github.com/cockroachdb/pebble/objstorage"
      21             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      22             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
      23             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
      24             :         "github.com/cockroachdb/pebble/objstorage/remote"
      25             :         "github.com/cockroachdb/pebble/vfs"
      26             : )
      27             : 
      28             : // provider is the implementation of objstorage.Provider.
      29             : type provider struct {
      30             :         st Settings
      31             : 
      32             :         fsDir vfs.File
      33             : 
      34             :         tracer *objiotracing.Tracer
      35             : 
      36             :         remote remoteSubsystem
      37             : 
      38             :         mu struct {
      39             :                 sync.RWMutex
      40             : 
      41             :                 remote remoteLockedState
      42             : 
      43             :                 // TODO(radu): move these fields to a localLockedState struct.
      44             :                 // localObjectsChanged is incremented whenever non-remote objects are created.
      45             :                 // The purpose of this counter is to avoid syncing the local filesystem when
      46             :                 // only remote objects are changed.
      47             :                 localObjectsChangeCounter uint64
      48             :                 // localObjectsChangeCounterSynced is the value of localObjectsChangeCounter
      49             :                 // value at the time the last completed sync was launched.
      50             :                 localObjectsChangeCounterSynced uint64
      51             : 
      52             :                 // knownObjects maintains information about objects that are known to the provider.
      53             :                 // It is initialized with the list of files in the manifest when we open a DB.
      54             :                 knownObjects map[base.DiskFileNum]objstorage.ObjectMetadata
      55             : 
      56             :                 // protectedObjects are objects that cannot be unreferenced because they
      57             :                 // have outstanding SharedObjectBackingHandles. The value is a count of outstanding handles
      58             :                 protectedObjects map[base.DiskFileNum]int
      59             :         }
      60             : }
      61             : 
      62             : var _ objstorage.Provider = (*provider)(nil)
      63             : 
      64             : // Settings that must be specified when creating the provider.
      65             : type Settings struct {
      66             :         Logger base.Logger
      67             : 
      68             :         // Local filesystem configuration.
      69             :         FS        vfs.FS
      70             :         FSDirName string
      71             : 
      72             :         // FSDirInitialListing is a listing of FSDirName at the time of calling Open.
      73             :         //
      74             :         // This is an optional optimization to avoid double listing on Open when the
      75             :         // higher layer already has a listing. When nil, we obtain the listing on
      76             :         // Open.
      77             :         FSDirInitialListing []string
      78             : 
      79             :         // Cleaner cleans obsolete files from the local filesystem.
      80             :         //
      81             :         // The default cleaner uses the DeleteCleaner.
      82             :         FSCleaner base.Cleaner
      83             : 
      84             :         // NoSyncOnClose decides whether the implementation will enforce a
      85             :         // close-time synchronization (e.g., fdatasync() or sync_file_range())
      86             :         // on files it writes to. Setting this to true removes the guarantee for a
      87             :         // sync on close. Some implementations can still issue a non-blocking sync.
      88             :         NoSyncOnClose bool
      89             : 
      90             :         // BytesPerSync enables periodic syncing of files in order to smooth out
      91             :         // writes to disk. This option does not provide any persistence guarantee, but
      92             :         // is used to avoid latency spikes if the OS automatically decides to write
      93             :         // out a large chunk of dirty filesystem buffers.
      94             :         BytesPerSync int
      95             : 
      96             :         // Local contains fields that are only relevant for files stored on the local
      97             :         // filesystem.
      98             :         Local struct {
      99             :                 // TODO(radu): move FSCleaner, NoSyncOnClose, BytesPerSync here.
     100             : 
     101             :                 // ReadaheadConfig is used to retrieve the current readahead mode; it is
     102             :                 // consulted whenever a read handle is initialized.
     103             :                 ReadaheadConfig *ReadaheadConfig
     104             :         }
     105             : 
     106             :         // Fields here are set only if the provider is to support remote objects
     107             :         // (experimental).
     108             :         Remote struct {
     109             :                 StorageFactory remote.StorageFactory
     110             : 
     111             :                 // If CreateOnShared is non-zero, sstables are created on remote storage using
     112             :                 // the CreateOnSharedLocator (when the PreferSharedStorage create option is
     113             :                 // true).
     114             :                 CreateOnShared        remote.CreateOnSharedStrategy
     115             :                 CreateOnSharedLocator remote.Locator
     116             : 
     117             :                 // CacheSizeBytes is the size of the on-disk block cache for objects
     118             :                 // on remote storage. If it is 0, no cache is used.
     119             :                 CacheSizeBytes int64
     120             : 
     121             :                 // CacheBlockSize is the block size of the cache; if 0, the default of 32KB is used.
     122             :                 CacheBlockSize int
     123             : 
     124             :                 // ShardingBlockSize is the size of a shard block. The cache is split into contiguous
     125             :                 // ShardingBlockSize units. The units are distributed across multiple independent shards
     126             :                 // of the cache, via a hash(offset) modulo num shards operation. The cache replacement
     127             :                 // policies operate at the level of shard, not whole cache. This is done to reduce lock
     128             :                 // contention.
     129             :                 //
     130             :                 // If ShardingBlockSize is 0, the default of 1 MB is used.
     131             :                 ShardingBlockSize int64
     132             : 
     133             :                 // The number of independent shards the cache leverages. Each shard is the same size,
     134             :                 // and a hash of filenum & offset map a read to a certain shard. If set to 0,
     135             :                 // 2*runtime.GOMAXPROCS is used as the shard count.
     136             :                 CacheShardCount int
     137             : 
     138             :                 // TODO(radu): allow the cache to live on another FS/location (e.g. to use
     139             :                 // instance-local SSD).
     140             :         }
     141             : }
     142             : 
     143             : // ReadaheadConfig is a container for the settings that control the use of
     144             : // read-ahead.
     145             : //
     146             : // It stores two ReadaheadModes:
     147             : //   - Informed is the type of read-ahead for operations that are known to read a
     148             : //     large consecutive chunk of a file.
     149             : //   - Speculative is the type of read-ahead used automatically, when consecutive
     150             : //     reads are detected.
     151             : //
     152             : // The settings can be changed and read atomically.
     153             : type ReadaheadConfig struct {
     154             :         value atomic.Uint32
     155             : }
     156             : 
     157             : // These are the default readahead modes when a config is not specified.
     158             : const (
     159             :         defaultReadaheadInformed    = FadviseSequential
     160             :         defaultReadaheadSpeculative = FadviseSequential
     161             : )
     162             : 
     163             : // NewReadaheadConfig returns a new readahead config container initialized with
     164             : // default values.
     165           2 : func NewReadaheadConfig() *ReadaheadConfig {
     166           2 :         rc := &ReadaheadConfig{}
     167           2 :         rc.Set(defaultReadaheadInformed, defaultReadaheadSpeculative)
     168           2 :         return rc
     169           2 : }
     170             : 
     171             : // Set the informed and speculative readahead modes.
     172           2 : func (rc *ReadaheadConfig) Set(informed, speculative ReadaheadMode) {
     173           2 :         rc.value.Store(uint32(speculative)<<8 | uint32(informed))
     174           2 : }
     175             : 
     176             : // Informed returns the type of read-ahead for operations that are known to read
     177             : // a large consecutive chunk of a file.
     178           2 : func (rc *ReadaheadConfig) Informed() ReadaheadMode {
     179           2 :         return ReadaheadMode(rc.value.Load() & 0xff)
     180           2 : }
     181             : 
     182             : // Speculative returns the type of read-ahead used automatically, when
     183             : // consecutive reads are detected.
     184           2 : func (rc *ReadaheadConfig) Speculative() ReadaheadMode {
     185           2 :         return ReadaheadMode(rc.value.Load() >> 8)
     186           2 : }
     187             : 
     188             : // ReadaheadMode indicates the type of read-ahead to use, either for informed
     189             : // read-ahead (e.g. compactions) or speculative read-ahead.
     190             : type ReadaheadMode uint8
     191             : 
     192             : const (
     193             :         // NoReadahead disables readahead altogether.
     194             :         NoReadahead ReadaheadMode = iota
     195             : 
     196             :         // SysReadahead enables the use of SYS_READAHEAD call to prefetch data.
     197             :         // The prefetch window grows dynamically as consecutive writes are detected.
     198             :         SysReadahead
     199             : 
     200             :         // FadviseSequential enables the use of FADV_SEQUENTIAL. For informed
     201             :         // read-ahead, FADV_SEQUENTIAL is used from the beginning. For speculative
     202             :         // read-ahead, SYS_READAHEAD is first used until the window reaches the
     203             :         // maximum size, then we switch to FADV_SEQUENTIAL.
     204             :         FadviseSequential
     205             : )
     206             : 
     207             : // DefaultSettings initializes default settings (with no remote storage),
     208             : // suitable for tests and tools.
     209           1 : func DefaultSettings(fs vfs.FS, dirName string) Settings {
     210           1 :         return Settings{
     211           1 :                 Logger:        base.DefaultLogger,
     212           1 :                 FS:            fs,
     213           1 :                 FSDirName:     dirName,
     214           1 :                 FSCleaner:     base.DeleteCleaner{},
     215           1 :                 NoSyncOnClose: false,
     216           1 :                 BytesPerSync:  512 * 1024, // 512KB
     217           1 :         }
     218           1 : }
     219             : 
     220             : // Open creates the provider.
     221           2 : func Open(settings Settings) (objstorage.Provider, error) {
     222           2 :         // Note: we can't just `return open(settings)` because in an error case we
     223           2 :         // would return (*provider)(nil) which is not objstorage.Provider(nil).
     224           2 :         p, err := open(settings)
     225           2 :         if err != nil {
     226           1 :                 return nil, err
     227           1 :         }
     228           2 :         return p, nil
     229             : }
     230             : 
     231           2 : func open(settings Settings) (p *provider, _ error) {
     232           2 :         fsDir, err := settings.FS.OpenDir(settings.FSDirName)
     233           2 :         if err != nil {
     234           1 :                 return nil, err
     235           1 :         }
     236             : 
     237           2 :         defer func() {
     238           2 :                 if p == nil {
     239           0 :                         fsDir.Close()
     240           0 :                 }
     241             :         }()
     242             : 
     243           2 :         if settings.Local.ReadaheadConfig == nil {
     244           2 :                 settings.Local.ReadaheadConfig = NewReadaheadConfig()
     245           2 :         }
     246             : 
     247           2 :         p = &provider{
     248           2 :                 st:    settings,
     249           2 :                 fsDir: fsDir,
     250           2 :         }
     251           2 :         p.mu.knownObjects = make(map[base.DiskFileNum]objstorage.ObjectMetadata)
     252           2 :         p.mu.protectedObjects = make(map[base.DiskFileNum]int)
     253           2 : 
     254           2 :         if objiotracing.Enabled {
     255           0 :                 p.tracer = objiotracing.Open(settings.FS, settings.FSDirName)
     256           0 :         }
     257             : 
     258             :         // Add local FS objects.
     259           2 :         if err := p.vfsInit(); err != nil {
     260           0 :                 return nil, err
     261           0 :         }
     262             : 
     263             :         // Initialize remote subsystem (if configured) and add remote objects.
     264           2 :         if err := p.remoteInit(); err != nil {
     265           0 :                 return nil, err
     266           0 :         }
     267             : 
     268           2 :         return p, nil
     269             : }
     270             : 
     271             : // Close is part of the objstorage.Provider interface.
     272           2 : func (p *provider) Close() error {
     273           2 :         err := p.sharedClose()
     274           2 :         if p.fsDir != nil {
     275           2 :                 err = firstError(err, p.fsDir.Close())
     276           2 :                 p.fsDir = nil
     277           2 :         }
     278           2 :         if objiotracing.Enabled {
     279           0 :                 if p.tracer != nil {
     280           0 :                         p.tracer.Close()
     281           0 :                         p.tracer = nil
     282           0 :                 }
     283             :         }
     284           2 :         return err
     285             : }
     286             : 
     287             : // OpenForReading opens an existing object.
     288             : func (p *provider) OpenForReading(
     289             :         ctx context.Context,
     290             :         fileType base.FileType,
     291             :         fileNum base.DiskFileNum,
     292             :         opts objstorage.OpenOptions,
     293           2 : ) (objstorage.Readable, error) {
     294           2 :         meta, err := p.Lookup(fileType, fileNum)
     295           2 :         if err != nil {
     296           1 :                 if opts.MustExist {
     297           0 :                         p.st.Logger.Fatalf("%v", err)
     298           0 :                 }
     299           1 :                 return nil, err
     300             :         }
     301             : 
     302           2 :         var r objstorage.Readable
     303           2 :         if !meta.IsRemote() {
     304           2 :                 r, err = p.vfsOpenForReading(ctx, fileType, fileNum, opts)
     305           2 :         } else {
     306           2 :                 r, err = p.remoteOpenForReading(ctx, meta, opts)
     307           2 :                 if err != nil && p.isNotExistError(meta, err) {
     308           1 :                         // Wrap the error so that IsNotExistError functions properly.
     309           1 :                         err = errors.Mark(err, os.ErrNotExist)
     310           1 :                 }
     311             :         }
     312           2 :         if err != nil {
     313           1 :                 return nil, err
     314           1 :         }
     315           2 :         if objiotracing.Enabled {
     316           0 :                 r = p.tracer.WrapReadable(ctx, r, fileNum)
     317           0 :         }
     318           2 :         return r, nil
     319             : }
     320             : 
     321             : // Create creates a new object and opens it for writing.
     322             : //
     323             : // The object is not guaranteed to be durable (accessible in case of crashes)
     324             : // until Sync is called.
     325             : func (p *provider) Create(
     326             :         ctx context.Context,
     327             :         fileType base.FileType,
     328             :         fileNum base.DiskFileNum,
     329             :         opts objstorage.CreateOptions,
     330           2 : ) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error) {
     331           2 :         if opts.PreferSharedStorage && p.st.Remote.CreateOnShared != remote.CreateOnSharedNone {
     332           2 :                 w, meta, err = p.sharedCreate(ctx, fileType, fileNum, p.st.Remote.CreateOnSharedLocator, opts)
     333           2 :         } else {
     334           2 :                 var category vfs.DiskWriteCategory
     335           2 :                 if opts.WriteCategory != "" {
     336           2 :                         category = opts.WriteCategory
     337           2 :                 } else {
     338           2 :                         category = vfs.WriteCategoryUnspecified
     339           2 :                 }
     340           2 :                 w, meta, err = p.vfsCreate(ctx, fileType, fileNum, category)
     341             :         }
     342           2 :         if err != nil {
     343           1 :                 err = errors.Wrapf(err, "creating object %s", fileNum)
     344           1 :                 return nil, objstorage.ObjectMetadata{}, err
     345           1 :         }
     346           2 :         p.addMetadata(meta)
     347           2 :         if objiotracing.Enabled {
     348           0 :                 w = p.tracer.WrapWritable(ctx, w, fileNum)
     349           0 :         }
     350           2 :         return w, meta, nil
     351             : }
     352             : 
     353             : // Remove removes an object.
     354             : //
     355             : // Note that if the object is remote, the object is only (conceptually) removed
     356             : // from this provider. If other providers have references on the remote object,
     357             : // it will not be removed.
     358             : //
     359             : // The object is not guaranteed to be durably removed until Sync is called.
     360           2 : func (p *provider) Remove(fileType base.FileType, fileNum base.DiskFileNum) error {
     361           2 :         meta, err := p.Lookup(fileType, fileNum)
     362           2 :         if err != nil {
     363           1 :                 return err
     364           1 :         }
     365             : 
     366           2 :         if !meta.IsRemote() {
     367           2 :                 err = p.vfsRemove(fileType, fileNum)
     368           2 :         } else {
     369           2 :                 // TODO(radu): implement remote object removal (i.e. deref).
     370           2 :                 err = p.sharedUnref(meta)
     371           2 :                 if err != nil && p.isNotExistError(meta, err) {
     372           0 :                         // Wrap the error so that IsNotExistError functions properly.
     373           0 :                         err = errors.Mark(err, os.ErrNotExist)
     374           0 :                 }
     375             :         }
     376           2 :         if err != nil && !p.IsNotExistError(err) {
     377           1 :                 // We want to be able to retry a Remove, so we keep the object in our list.
     378           1 :                 // TODO(radu): we should mark the object as "zombie" and not allow any other
     379           1 :                 // operations.
     380           1 :                 return errors.Wrapf(err, "removing object %s", fileNum)
     381           1 :         }
     382             : 
     383           2 :         p.removeMetadata(fileNum)
     384           2 :         return err
     385             : }
     386             : 
     387           1 : func (p *provider) isNotExistError(meta objstorage.ObjectMetadata, err error) bool {
     388           1 :         if meta.Remote.Storage != nil {
     389           1 :                 return meta.Remote.Storage.IsNotExistError(err)
     390           1 :         }
     391           0 :         return oserror.IsNotExist(err)
     392             : }
     393             : 
     394             : // IsNotExistError is part of the objstorage.Provider interface.
     395           2 : func (p *provider) IsNotExistError(err error) bool {
     396           2 :         // We use errors.Mark(err, os.ErrNotExist) for not-exist errors coming from
     397           2 :         // remote.Storage.
     398           2 :         return oserror.IsNotExist(err)
     399           2 : }
     400             : 
     401             : // Sync flushes the metadata from creation or removal of objects since the last Sync.
     402           2 : func (p *provider) Sync() error {
     403           2 :         if err := p.vfsSync(); err != nil {
     404           1 :                 return err
     405           1 :         }
     406           2 :         if err := p.sharedSync(); err != nil {
     407           0 :                 return err
     408           0 :         }
     409           2 :         return nil
     410             : }
     411             : 
     412             : // LinkOrCopyFromLocal creates a new object that is either a copy of a given
     413             : // local file or a hard link (if the new object is created on the same FS, and
     414             : // if the FS supports it).
     415             : //
     416             : // The object is not guaranteed to be durable (accessible in case of crashes)
     417             : // until Sync is called.
     418             : func (p *provider) LinkOrCopyFromLocal(
     419             :         ctx context.Context,
     420             :         srcFS vfs.FS,
     421             :         srcFilePath string,
     422             :         dstFileType base.FileType,
     423             :         dstFileNum base.DiskFileNum,
     424             :         opts objstorage.CreateOptions,
     425           2 : ) (objstorage.ObjectMetadata, error) {
     426           2 :         shared := opts.PreferSharedStorage && p.st.Remote.CreateOnShared != remote.CreateOnSharedNone
     427           2 :         if !shared && srcFS == p.st.FS {
     428           2 :                 // Wrap the normal filesystem with one which wraps newly created files with
     429           2 :                 // vfs.NewSyncingFile.
     430           2 :                 fs := vfs.NewSyncingFS(p.st.FS, vfs.SyncingFileOptions{
     431           2 :                         NoSyncOnClose: p.st.NoSyncOnClose,
     432           2 :                         BytesPerSync:  p.st.BytesPerSync,
     433           2 :                 })
     434           2 :                 dstPath := p.vfsPath(dstFileType, dstFileNum)
     435           2 :                 if err := vfs.LinkOrCopy(fs, srcFilePath, dstPath); err != nil {
     436           1 :                         return objstorage.ObjectMetadata{}, err
     437           1 :                 }
     438             : 
     439           2 :                 meta := objstorage.ObjectMetadata{
     440           2 :                         DiskFileNum: dstFileNum,
     441           2 :                         FileType:    dstFileType,
     442           2 :                 }
     443           2 :                 p.addMetadata(meta)
     444           2 :                 return meta, nil
     445             :         }
     446             :         // Create the object and copy the data.
     447           2 :         w, meta, err := p.Create(ctx, dstFileType, dstFileNum, opts)
     448           2 :         if err != nil {
     449           0 :                 return objstorage.ObjectMetadata{}, err
     450           0 :         }
     451           2 :         f, err := srcFS.Open(srcFilePath, vfs.SequentialReadsOption)
     452           2 :         if err != nil {
     453           0 :                 return objstorage.ObjectMetadata{}, err
     454           0 :         }
     455           2 :         defer f.Close()
     456           2 :         buf := make([]byte, 64*1024)
     457           2 :         for {
     458           2 :                 n, readErr := f.Read(buf)
     459           2 :                 if readErr != nil && readErr != io.EOF {
     460           0 :                         w.Abort()
     461           0 :                         return objstorage.ObjectMetadata{}, readErr
     462           0 :                 }
     463             : 
     464           2 :                 if n > 0 {
     465           2 :                         if err := w.Write(buf[:n]); err != nil {
     466           0 :                                 w.Abort()
     467           0 :                                 return objstorage.ObjectMetadata{}, err
     468           0 :                         }
     469             :                 }
     470             : 
     471           2 :                 if readErr == io.EOF {
     472           2 :                         break
     473             :                 }
     474             :         }
     475           2 :         if err := w.Finish(); err != nil {
     476           0 :                 return objstorage.ObjectMetadata{}, err
     477           0 :         }
     478           2 :         return meta, nil
     479             : }
     480             : 
     481             : // Lookup is part of the objstorage.Provider interface.
     482             : func (p *provider) Lookup(
     483             :         fileType base.FileType, fileNum base.DiskFileNum,
     484           2 : ) (objstorage.ObjectMetadata, error) {
     485           2 :         p.mu.RLock()
     486           2 :         defer p.mu.RUnlock()
     487           2 :         meta, ok := p.mu.knownObjects[fileNum]
     488           2 :         if !ok {
     489           2 :                 return objstorage.ObjectMetadata{}, errors.Wrapf(
     490           2 :                         os.ErrNotExist,
     491           2 :                         "file %s (type %d) unknown to the objstorage provider",
     492           2 :                         fileNum, errors.Safe(fileType),
     493           2 :                 )
     494           2 :         }
     495           2 :         if meta.FileType != fileType {
     496           0 :                 return objstorage.ObjectMetadata{}, base.AssertionFailedf(
     497           0 :                         "file %s type mismatch (known type %d, expected type %d)",
     498           0 :                         fileNum, errors.Safe(meta.FileType), errors.Safe(fileType),
     499           0 :                 )
     500           0 :         }
     501           2 :         return meta, nil
     502             : }
     503             : 
     504             : // Path is part of the objstorage.Provider interface.
     505           2 : func (p *provider) Path(meta objstorage.ObjectMetadata) string {
     506           2 :         if !meta.IsRemote() {
     507           2 :                 return p.vfsPath(meta.FileType, meta.DiskFileNum)
     508           2 :         }
     509           2 :         return p.remotePath(meta)
     510             : }
     511             : 
     512             : // Size returns the size of the object.
     513           2 : func (p *provider) Size(meta objstorage.ObjectMetadata) (int64, error) {
     514           2 :         if !meta.IsRemote() {
     515           2 :                 return p.vfsSize(meta.FileType, meta.DiskFileNum)
     516           2 :         }
     517           2 :         return p.remoteSize(meta)
     518             : }
     519             : 
     520             : // List is part of the objstorage.Provider interface.
     521           2 : func (p *provider) List() []objstorage.ObjectMetadata {
     522           2 :         p.mu.RLock()
     523           2 :         defer p.mu.RUnlock()
     524           2 :         res := make([]objstorage.ObjectMetadata, 0, len(p.mu.knownObjects))
     525           2 :         for _, meta := range p.mu.knownObjects {
     526           2 :                 res = append(res, meta)
     527           2 :         }
     528           2 :         slices.SortFunc(res, func(a, b objstorage.ObjectMetadata) int {
     529           2 :                 return cmp.Compare(a.DiskFileNum, b.DiskFileNum)
     530           2 :         })
     531           2 :         return res
     532             : }
     533             : 
     534             : // Metrics is part of the objstorage.Provider interface.
     535           2 : func (p *provider) Metrics() sharedcache.Metrics {
     536           2 :         if p.remote.cache != nil {
     537           1 :                 return p.remote.cache.Metrics()
     538           1 :         }
     539           2 :         return sharedcache.Metrics{}
     540             : }
     541             : 
     542             : // CheckpointState is part of the objstorage.Provider interface.
     543             : func (p *provider) CheckpointState(
     544             :         fs vfs.FS, dir string, fileType base.FileType, fileNums []base.DiskFileNum,
     545           1 : ) error {
     546           1 :         p.mu.Lock()
     547           1 :         defer p.mu.Unlock()
     548           1 :         for i := range fileNums {
     549           1 :                 if _, ok := p.mu.knownObjects[fileNums[i]]; !ok {
     550           0 :                         return errors.Wrapf(
     551           0 :                                 os.ErrNotExist,
     552           0 :                                 "file %s (type %d) unknown to the objstorage provider",
     553           0 :                                 fileNums[i], errors.Safe(fileType),
     554           0 :                         )
     555           0 :                 }
     556             :                 // Prevent this object from deletion, at least for the life of this instance.
     557           1 :                 p.mu.protectedObjects[fileNums[i]] = p.mu.protectedObjects[fileNums[i]] + 1
     558             :         }
     559             : 
     560           1 :         if p.remote.catalog != nil {
     561           1 :                 return p.remote.catalog.Checkpoint(fs, dir)
     562           1 :         }
     563           0 :         return nil
     564             : }
     565             : 
     566           2 : func (p *provider) addMetadata(meta objstorage.ObjectMetadata) {
     567           2 :         p.mu.Lock()
     568           2 :         defer p.mu.Unlock()
     569           2 :         p.addMetadataLocked(meta)
     570           2 : }
     571             : 
     572           2 : func (p *provider) addMetadataLocked(meta objstorage.ObjectMetadata) {
     573           2 :         if invariants.Enabled {
     574           2 :                 meta.AssertValid()
     575           2 :         }
     576           2 :         p.mu.knownObjects[meta.DiskFileNum] = meta
     577           2 :         if meta.IsRemote() {
     578           2 :                 p.mu.remote.catalogBatch.AddObject(remoteobjcat.RemoteObjectMetadata{
     579           2 :                         FileNum:          meta.DiskFileNum,
     580           2 :                         FileType:         meta.FileType,
     581           2 :                         CreatorID:        meta.Remote.CreatorID,
     582           2 :                         CreatorFileNum:   meta.Remote.CreatorFileNum,
     583           2 :                         Locator:          meta.Remote.Locator,
     584           2 :                         CleanupMethod:    meta.Remote.CleanupMethod,
     585           2 :                         CustomObjectName: meta.Remote.CustomObjectName,
     586           2 :                 })
     587           2 :                 if meta.IsExternal() {
     588           2 :                         p.mu.remote.addExternalObject(meta)
     589           2 :                 }
     590           2 :         } else {
     591           2 :                 p.mu.localObjectsChangeCounter++
     592           2 :         }
     593             : }
     594             : 
     595           2 : func (p *provider) removeMetadata(fileNum base.DiskFileNum) {
     596           2 :         p.mu.Lock()
     597           2 :         defer p.mu.Unlock()
     598           2 : 
     599           2 :         meta, ok := p.mu.knownObjects[fileNum]
     600           2 :         if !ok {
     601           0 :                 return
     602           0 :         }
     603           2 :         delete(p.mu.knownObjects, fileNum)
     604           2 :         if meta.IsExternal() {
     605           2 :                 p.mu.remote.removeExternalObject(meta)
     606           2 :         }
     607           2 :         if meta.IsRemote() {
     608           2 :                 p.mu.remote.catalogBatch.DeleteObject(fileNum)
     609           2 :         } else {
     610           2 :                 p.mu.localObjectsChangeCounter++
     611           2 :         }
     612             : }
     613             : 
     614             : // protectObject prevents the unreferencing of a remote object until
     615             : // unprotectObject is called.
     616           2 : func (p *provider) protectObject(fileNum base.DiskFileNum) {
     617           2 :         p.mu.Lock()
     618           2 :         defer p.mu.Unlock()
     619           2 :         p.mu.protectedObjects[fileNum] = p.mu.protectedObjects[fileNum] + 1
     620           2 : }
     621             : 
     622           1 : func (p *provider) unprotectObject(fileNum base.DiskFileNum) {
     623           1 :         p.mu.Lock()
     624           1 :         defer p.mu.Unlock()
     625           1 :         v := p.mu.protectedObjects[fileNum]
     626           1 :         if invariants.Enabled && v == 0 {
     627           0 :                 panic("invalid protection count")
     628             :         }
     629           1 :         if v > 1 {
     630           0 :                 p.mu.protectedObjects[fileNum] = v - 1
     631           1 :         } else {
     632           1 :                 delete(p.mu.protectedObjects, fileNum)
     633           1 :                 // TODO(radu): check if the object is still in knownObject; if not, unref it
     634           1 :                 // now.
     635           1 :         }
     636             : }
     637             : 
     638           2 : func (p *provider) isProtected(fileNum base.DiskFileNum) bool {
     639           2 :         p.mu.Lock()
     640           2 :         defer p.mu.Unlock()
     641           2 :         return p.mu.protectedObjects[fileNum] > 0
     642           2 : }

Generated by: LCOV version 1.14