LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider - provider.go (source / functions) Hit Total Coverage
Test: 2024-02-11 08:15Z 3e083df5 - tests + meta.lcov Lines: 239 284 84.2 %
Date: 2024-02-11 08:16:29 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package objstorageprovider
       6             : 
       7             : import (
       8             :         "cmp"
       9             :         "context"
      10             :         "io"
      11             :         "os"
      12             :         "slices"
      13             :         "sync"
      14             : 
      15             :         "github.com/cockroachdb/errors"
      16             :         "github.com/cockroachdb/errors/oserror"
      17             :         "github.com/cockroachdb/pebble/internal/base"
      18             :         "github.com/cockroachdb/pebble/internal/invariants"
      19             :         "github.com/cockroachdb/pebble/objstorage"
      20             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      21             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
      22             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
      23             :         "github.com/cockroachdb/pebble/objstorage/remote"
      24             :         "github.com/cockroachdb/pebble/vfs"
      25             : )
      26             : 
      27             : // provider is the implementation of objstorage.Provider.
      28             : type provider struct {
      29             :         st Settings
      30             : 
      31             :         fsDir vfs.File
      32             : 
      33             :         tracer *objiotracing.Tracer
      34             : 
      35             :         remote remoteSubsystem
      36             : 
      37             :         mu struct {
      38             :                 sync.RWMutex
      39             : 
      40             :                 remote struct {
      41             :                         // catalogBatch accumulates remote object creations and deletions until
      42             :                         // Sync is called.
      43             :                         catalogBatch remoteobjcat.Batch
      44             : 
      45             :                         storageObjects map[remote.Locator]remote.Storage
      46             :                 }
      47             : 
      48             :                 // localObjectsChanged is set if non-remote objects were created or deleted
      49             :                 // but Sync was not yet called.
      50             :                 localObjectsChanged bool
      51             : 
      52             :                 // knownObjects maintains information about objects that are known to the provider.
      53             :                 // It is initialized with the list of files in the manifest when we open a DB.
      54             :                 knownObjects map[base.DiskFileNum]objstorage.ObjectMetadata
      55             : 
      56             :                 // protectedObjects are objects that cannot be unreferenced because they
      57             :                 // have outstanding SharedObjectBackingHandles. The value is a count of outstanding handles
      58             :                 protectedObjects map[base.DiskFileNum]int
      59             :         }
      60             : }
      61             : 
      62             : var _ objstorage.Provider = (*provider)(nil)
      63             : 
      64             : // Settings that must be specified when creating the provider.
      65             : type Settings struct {
      66             :         Logger base.Logger
      67             : 
      68             :         // Local filesystem configuration.
      69             :         FS        vfs.FS
      70             :         FSDirName string
      71             : 
      72             :         // FSDirInitialListing is a listing of FSDirName at the time of calling Open.
      73             :         //
      74             :         // This is an optional optimization to avoid double listing on Open when the
      75             :         // higher layer already has a listing. When nil, we obtain the listing on
      76             :         // Open.
      77             :         FSDirInitialListing []string
      78             : 
      79             :         // Cleaner cleans obsolete files from the local filesystem.
      80             :         //
      81             :         // The default cleaner uses the DeleteCleaner.
      82             :         FSCleaner base.Cleaner
      83             : 
      84             :         // NoSyncOnClose decides whether the implementation will enforce a
      85             :         // close-time synchronization (e.g., fdatasync() or sync_file_range())
      86             :         // on files it writes to. Setting this to true removes the guarantee for a
      87             :         // sync on close. Some implementations can still issue a non-blocking sync.
      88             :         NoSyncOnClose bool
      89             : 
      90             :         // BytesPerSync enables periodic syncing of files in order to smooth out
      91             :         // writes to disk. This option does not provide any persistence guarantee, but
      92             :         // is used to avoid latency spikes if the OS automatically decides to write
      93             :         // out a large chunk of dirty filesystem buffers.
      94             :         BytesPerSync int
      95             : 
      96             :         // Fields here are set only if the provider is to support remote objects
      97             :         // (experimental).
      98             :         Remote struct {
      99             :                 StorageFactory remote.StorageFactory
     100             : 
     101             :                 // If CreateOnShared is non-zero, sstables are created on remote storage using
     102             :                 // the CreateOnSharedLocator (when the PreferSharedStorage create option is
     103             :                 // true).
     104             :                 CreateOnShared        remote.CreateOnSharedStrategy
     105             :                 CreateOnSharedLocator remote.Locator
     106             : 
     107             :                 // CacheSizeBytes is the size of the on-disk block cache for objects
     108             :                 // on remote storage. If it is 0, no cache is used.
     109             :                 CacheSizeBytes int64
     110             : 
     111             :                 // CacheBlockSize is the block size of the cache; if 0, the default of 32KB is used.
     112             :                 CacheBlockSize int
     113             : 
     114             :                 // ShardingBlockSize is the size of a shard block. The cache is split into contiguous
     115             :                 // ShardingBlockSize units. The units are distributed across multiple independent shards
     116             :                 // of the cache, via a hash(offset) modulo num shards operation. The cache replacement
     117             :                 // policies operate at the level of shard, not whole cache. This is done to reduce lock
     118             :                 // contention.
     119             :                 //
     120             :                 // If ShardingBlockSize is 0, the default of 1 MB is used.
     121             :                 ShardingBlockSize int64
     122             : 
     123             :                 // The number of independent shards the cache leverages. Each shard is the same size,
     124             :                 // and a hash of filenum & offset map a read to a certain shard. If set to 0,
     125             :                 // 2*runtime.GOMAXPROCS is used as the shard count.
     126             :                 CacheShardCount int
     127             : 
     128             :                 // TODO(radu): allow the cache to live on another FS/location (e.g. to use
     129             :                 // instance-local SSD).
     130             :         }
     131             : }
     132             : 
     133             : // DefaultSettings initializes default settings (with no remote storage),
     134             : // suitable for tests and tools.
     135           1 : func DefaultSettings(fs vfs.FS, dirName string) Settings {
     136           1 :         return Settings{
     137           1 :                 Logger:        base.DefaultLogger,
     138           1 :                 FS:            fs,
     139           1 :                 FSDirName:     dirName,
     140           1 :                 FSCleaner:     base.DeleteCleaner{},
     141           1 :                 NoSyncOnClose: false,
     142           1 :                 BytesPerSync:  512 * 1024, // 512KB
     143           1 :         }
     144           1 : }
     145             : 
     146             : // Open creates the provider.
     147           2 : func Open(settings Settings) (objstorage.Provider, error) {
     148           2 :         // Note: we can't just `return open(settings)` because in an error case we
     149           2 :         // would return (*provider)(nil) which is not objstorage.Provider(nil).
     150           2 :         p, err := open(settings)
     151           2 :         if err != nil {
     152           1 :                 return nil, err
     153           1 :         }
     154           2 :         return p, nil
     155             : }
     156             : 
     157           2 : func open(settings Settings) (p *provider, _ error) {
     158           2 :         fsDir, err := settings.FS.OpenDir(settings.FSDirName)
     159           2 :         if err != nil {
     160           1 :                 return nil, err
     161           1 :         }
     162             : 
     163           2 :         defer func() {
     164           2 :                 if p == nil {
     165           0 :                         fsDir.Close()
     166           0 :                 }
     167             :         }()
     168             : 
     169           2 :         p = &provider{
     170           2 :                 st:    settings,
     171           2 :                 fsDir: fsDir,
     172           2 :         }
     173           2 :         p.mu.knownObjects = make(map[base.DiskFileNum]objstorage.ObjectMetadata)
     174           2 :         p.mu.protectedObjects = make(map[base.DiskFileNum]int)
     175           2 : 
     176           2 :         if objiotracing.Enabled {
     177           0 :                 p.tracer = objiotracing.Open(settings.FS, settings.FSDirName)
     178           0 :         }
     179             : 
     180             :         // Add local FS objects.
     181           2 :         if err := p.vfsInit(); err != nil {
     182           0 :                 return nil, err
     183           0 :         }
     184             : 
     185             :         // Initialize remote subsystem (if configured) and add remote objects.
     186           2 :         if err := p.remoteInit(); err != nil {
     187           0 :                 return nil, err
     188           0 :         }
     189             : 
     190           2 :         return p, nil
     191             : }
     192             : 
     193             : // Close is part of the objstorage.Provider interface.
     194           2 : func (p *provider) Close() error {
     195           2 :         err := p.sharedClose()
     196           2 :         if p.fsDir != nil {
     197           2 :                 err = firstError(err, p.fsDir.Close())
     198           2 :                 p.fsDir = nil
     199           2 :         }
     200           2 :         if objiotracing.Enabled {
     201           0 :                 if p.tracer != nil {
     202           0 :                         p.tracer.Close()
     203           0 :                         p.tracer = nil
     204           0 :                 }
     205             :         }
     206           2 :         return err
     207             : }
     208             : 
     209             : // OpenForReading opens an existing object.
     210             : func (p *provider) OpenForReading(
     211             :         ctx context.Context,
     212             :         fileType base.FileType,
     213             :         fileNum base.DiskFileNum,
     214             :         opts objstorage.OpenOptions,
     215           2 : ) (objstorage.Readable, error) {
     216           2 :         meta, err := p.Lookup(fileType, fileNum)
     217           2 :         if err != nil {
     218           1 :                 if opts.MustExist {
     219           0 :                         p.st.Logger.Fatalf("%v", err)
     220           0 :                 }
     221           1 :                 return nil, err
     222             :         }
     223             : 
     224           2 :         var r objstorage.Readable
     225           2 :         if !meta.IsRemote() {
     226           2 :                 r, err = p.vfsOpenForReading(ctx, fileType, fileNum, opts)
     227           2 :         } else {
     228           2 :                 r, err = p.remoteOpenForReading(ctx, meta, opts)
     229           2 :                 if err != nil && p.isNotExistError(meta, err) {
     230           1 :                         // Wrap the error so that IsNotExistError functions properly.
     231           1 :                         err = errors.Mark(err, os.ErrNotExist)
     232           1 :                 }
     233             :         }
     234           2 :         if err != nil {
     235           1 :                 return nil, err
     236           1 :         }
     237           2 :         if objiotracing.Enabled {
     238           0 :                 r = p.tracer.WrapReadable(ctx, r, fileNum)
     239           0 :         }
     240           2 :         return r, nil
     241             : }
     242             : 
     243             : // Create creates a new object and opens it for writing.
     244             : //
     245             : // The object is not guaranteed to be durable (accessible in case of crashes)
     246             : // until Sync is called.
     247             : func (p *provider) Create(
     248             :         ctx context.Context,
     249             :         fileType base.FileType,
     250             :         fileNum base.DiskFileNum,
     251             :         opts objstorage.CreateOptions,
     252           2 : ) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error) {
     253           2 :         if opts.PreferSharedStorage && p.st.Remote.CreateOnShared != remote.CreateOnSharedNone {
     254           2 :                 w, meta, err = p.sharedCreate(ctx, fileType, fileNum, p.st.Remote.CreateOnSharedLocator, opts)
     255           2 :         } else {
     256           2 :                 w, meta, err = p.vfsCreate(ctx, fileType, fileNum)
     257           2 :         }
     258           2 :         if err != nil {
     259           1 :                 err = errors.Wrapf(err, "creating object %s", fileNum)
     260           1 :                 return nil, objstorage.ObjectMetadata{}, err
     261           1 :         }
     262           2 :         p.addMetadata(meta)
     263           2 :         if objiotracing.Enabled {
     264           0 :                 w = p.tracer.WrapWritable(ctx, w, fileNum)
     265           0 :         }
     266           2 :         return w, meta, nil
     267             : }
     268             : 
     269             : // Remove removes an object.
     270             : //
     271             : // Note that if the object is remote, the object is only (conceptually) removed
     272             : // from this provider. If other providers have references on the remote object,
     273             : // it will not be removed.
     274             : //
     275             : // The object is not guaranteed to be durably removed until Sync is called.
     276           2 : func (p *provider) Remove(fileType base.FileType, fileNum base.DiskFileNum) error {
     277           2 :         meta, err := p.Lookup(fileType, fileNum)
     278           2 :         if err != nil {
     279           1 :                 return err
     280           1 :         }
     281             : 
     282           2 :         if !meta.IsRemote() {
     283           2 :                 err = p.vfsRemove(fileType, fileNum)
     284           2 :         } else {
     285           2 :                 // TODO(radu): implement remote object removal (i.e. deref).
     286           2 :                 err = p.sharedUnref(meta)
     287           2 :                 if err != nil && p.isNotExistError(meta, err) {
     288           0 :                         // Wrap the error so that IsNotExistError functions properly.
     289           0 :                         err = errors.Mark(err, os.ErrNotExist)
     290           0 :                 }
     291             :         }
     292           2 :         if err != nil && !p.IsNotExistError(err) {
     293           1 :                 // We want to be able to retry a Remove, so we keep the object in our list.
     294           1 :                 // TODO(radu): we should mark the object as "zombie" and not allow any other
     295           1 :                 // operations.
     296           1 :                 return errors.Wrapf(err, "removing object %s", fileNum)
     297           1 :         }
     298             : 
     299           2 :         p.removeMetadata(fileNum)
     300           2 :         return err
     301             : }
     302             : 
     303           1 : func (p *provider) isNotExistError(meta objstorage.ObjectMetadata, err error) bool {
     304           1 :         if meta.Remote.Storage != nil {
     305           1 :                 return meta.Remote.Storage.IsNotExistError(err)
     306           1 :         }
     307           0 :         return oserror.IsNotExist(err)
     308             : }
     309             : 
     310             : // IsNotExistError is part of the objstorage.Provider interface.
     311           2 : func (p *provider) IsNotExistError(err error) bool {
     312           2 :         // We use errors.Mark(err, os.ErrNotExist) for not-exist errors coming from
     313           2 :         // remote.Storage.
     314           2 :         return oserror.IsNotExist(err)
     315           2 : }
     316             : 
     317             : // Sync flushes the metadata from creation or removal of objects since the last Sync.
     318           2 : func (p *provider) Sync() error {
     319           2 :         if err := p.vfsSync(); err != nil {
     320           1 :                 return err
     321           1 :         }
     322           2 :         if err := p.sharedSync(); err != nil {
     323           0 :                 return err
     324           0 :         }
     325           2 :         return nil
     326             : }
     327             : 
     328             : // LinkOrCopyFromLocal creates a new object that is either a copy of a given
     329             : // local file or a hard link (if the new object is created on the same FS, and
     330             : // if the FS supports it).
     331             : //
     332             : // The object is not guaranteed to be durable (accessible in case of crashes)
     333             : // until Sync is called.
     334             : func (p *provider) LinkOrCopyFromLocal(
     335             :         ctx context.Context,
     336             :         srcFS vfs.FS,
     337             :         srcFilePath string,
     338             :         dstFileType base.FileType,
     339             :         dstFileNum base.DiskFileNum,
     340             :         opts objstorage.CreateOptions,
     341           2 : ) (objstorage.ObjectMetadata, error) {
     342           2 :         shared := opts.PreferSharedStorage && p.st.Remote.CreateOnShared != remote.CreateOnSharedNone
     343           2 :         if !shared && srcFS == p.st.FS {
     344           2 :                 // Wrap the normal filesystem with one which wraps newly created files with
     345           2 :                 // vfs.NewSyncingFile.
     346           2 :                 fs := vfs.NewSyncingFS(p.st.FS, vfs.SyncingFileOptions{
     347           2 :                         NoSyncOnClose: p.st.NoSyncOnClose,
     348           2 :                         BytesPerSync:  p.st.BytesPerSync,
     349           2 :                 })
     350           2 :                 dstPath := p.vfsPath(dstFileType, dstFileNum)
     351           2 :                 if err := vfs.LinkOrCopy(fs, srcFilePath, dstPath); err != nil {
     352           1 :                         return objstorage.ObjectMetadata{}, err
     353           1 :                 }
     354             : 
     355           2 :                 meta := objstorage.ObjectMetadata{
     356           2 :                         DiskFileNum: dstFileNum,
     357           2 :                         FileType:    dstFileType,
     358           2 :                 }
     359           2 :                 p.addMetadata(meta)
     360           2 :                 return meta, nil
     361             :         }
     362             :         // Create the object and copy the data.
     363           2 :         w, meta, err := p.Create(ctx, dstFileType, dstFileNum, opts)
     364           2 :         if err != nil {
     365           0 :                 return objstorage.ObjectMetadata{}, err
     366           0 :         }
     367           2 :         f, err := srcFS.Open(srcFilePath, vfs.SequentialReadsOption)
     368           2 :         if err != nil {
     369           0 :                 return objstorage.ObjectMetadata{}, err
     370           0 :         }
     371           2 :         defer f.Close()
     372           2 :         buf := make([]byte, 64*1024)
     373           2 :         for {
     374           2 :                 n, readErr := f.Read(buf)
     375           2 :                 if readErr != nil && readErr != io.EOF {
     376           0 :                         w.Abort()
     377           0 :                         return objstorage.ObjectMetadata{}, readErr
     378           0 :                 }
     379             : 
     380           2 :                 if n > 0 {
     381           2 :                         if err := w.Write(buf[:n]); err != nil {
     382           0 :                                 w.Abort()
     383           0 :                                 return objstorage.ObjectMetadata{}, err
     384           0 :                         }
     385             :                 }
     386             : 
     387           2 :                 if readErr == io.EOF {
     388           2 :                         break
     389             :                 }
     390             :         }
     391           2 :         if err := w.Finish(); err != nil {
     392           0 :                 return objstorage.ObjectMetadata{}, err
     393           0 :         }
     394           2 :         return meta, nil
     395             : }
     396             : 
     397             : // Lookup is part of the objstorage.Provider interface.
     398             : func (p *provider) Lookup(
     399             :         fileType base.FileType, fileNum base.DiskFileNum,
     400           2 : ) (objstorage.ObjectMetadata, error) {
     401           2 :         p.mu.RLock()
     402           2 :         defer p.mu.RUnlock()
     403           2 :         meta, ok := p.mu.knownObjects[fileNum]
     404           2 :         if !ok {
     405           1 :                 return objstorage.ObjectMetadata{}, errors.Wrapf(
     406           1 :                         os.ErrNotExist,
     407           1 :                         "file %s (type %d) unknown to the objstorage provider",
     408           1 :                         fileNum, errors.Safe(fileType),
     409           1 :                 )
     410           1 :         }
     411           2 :         if meta.FileType != fileType {
     412           0 :                 return objstorage.ObjectMetadata{}, errors.AssertionFailedf(
     413           0 :                         "file %s type mismatch (known type %d, expected type %d)",
     414           0 :                         fileNum, errors.Safe(meta.FileType), errors.Safe(fileType),
     415           0 :                 )
     416           0 :         }
     417           2 :         return meta, nil
     418             : }
     419             : 
     420             : // Path is part of the objstorage.Provider interface.
     421           2 : func (p *provider) Path(meta objstorage.ObjectMetadata) string {
     422           2 :         if !meta.IsRemote() {
     423           2 :                 return p.vfsPath(meta.FileType, meta.DiskFileNum)
     424           2 :         }
     425           2 :         return p.remotePath(meta)
     426             : }
     427             : 
     428             : // Size returns the size of the object.
     429           2 : func (p *provider) Size(meta objstorage.ObjectMetadata) (int64, error) {
     430           2 :         if !meta.IsRemote() {
     431           2 :                 return p.vfsSize(meta.FileType, meta.DiskFileNum)
     432           2 :         }
     433           2 :         return p.remoteSize(meta)
     434             : }
     435             : 
     436             : // List is part of the objstorage.Provider interface.
     437           2 : func (p *provider) List() []objstorage.ObjectMetadata {
     438           2 :         p.mu.RLock()
     439           2 :         defer p.mu.RUnlock()
     440           2 :         res := make([]objstorage.ObjectMetadata, 0, len(p.mu.knownObjects))
     441           2 :         for _, meta := range p.mu.knownObjects {
     442           2 :                 res = append(res, meta)
     443           2 :         }
     444           2 :         slices.SortFunc(res, func(a, b objstorage.ObjectMetadata) int {
     445           2 :                 return cmp.Compare(a.DiskFileNum, b.DiskFileNum)
     446           2 :         })
     447           2 :         return res
     448             : }
     449             : 
     450             : // Metrics is part of the objstorage.Provider interface.
     451           2 : func (p *provider) Metrics() sharedcache.Metrics {
     452           2 :         if p.remote.cache != nil {
     453           1 :                 return p.remote.cache.Metrics()
     454           1 :         }
     455           2 :         return sharedcache.Metrics{}
     456             : }
     457             : 
     458           2 : func (p *provider) addMetadata(meta objstorage.ObjectMetadata) {
     459           2 :         p.mu.Lock()
     460           2 :         defer p.mu.Unlock()
     461           2 :         p.addMetadataLocked(meta)
     462           2 : }
     463             : 
     464           2 : func (p *provider) addMetadataLocked(meta objstorage.ObjectMetadata) {
     465           2 :         if invariants.Enabled {
     466           2 :                 meta.AssertValid()
     467           2 :         }
     468           2 :         p.mu.knownObjects[meta.DiskFileNum] = meta
     469           2 :         if meta.IsRemote() {
     470           2 :                 p.mu.remote.catalogBatch.AddObject(remoteobjcat.RemoteObjectMetadata{
     471           2 :                         FileNum:          meta.DiskFileNum,
     472           2 :                         FileType:         meta.FileType,
     473           2 :                         CreatorID:        meta.Remote.CreatorID,
     474           2 :                         CreatorFileNum:   meta.Remote.CreatorFileNum,
     475           2 :                         Locator:          meta.Remote.Locator,
     476           2 :                         CleanupMethod:    meta.Remote.CleanupMethod,
     477           2 :                         CustomObjectName: meta.Remote.CustomObjectName,
     478           2 :                 })
     479           2 :         } else {
     480           2 :                 p.mu.localObjectsChanged = true
     481           2 :         }
     482             : }
     483             : 
     484           2 : func (p *provider) removeMetadata(fileNum base.DiskFileNum) {
     485           2 :         p.mu.Lock()
     486           2 :         defer p.mu.Unlock()
     487           2 : 
     488           2 :         meta, ok := p.mu.knownObjects[fileNum]
     489           2 :         if !ok {
     490           0 :                 return
     491           0 :         }
     492           2 :         delete(p.mu.knownObjects, fileNum)
     493           2 :         if meta.IsRemote() {
     494           2 :                 p.mu.remote.catalogBatch.DeleteObject(fileNum)
     495           2 :         } else {
     496           2 :                 p.mu.localObjectsChanged = true
     497           2 :         }
     498             : }
     499             : 
     500             : // protectObject prevents the unreferencing of a remote object until
     501             : // unprotectObject is called.
     502           2 : func (p *provider) protectObject(fileNum base.DiskFileNum) {
     503           2 :         p.mu.Lock()
     504           2 :         defer p.mu.Unlock()
     505           2 :         p.mu.protectedObjects[fileNum] = p.mu.protectedObjects[fileNum] + 1
     506           2 : }
     507             : 
     508           1 : func (p *provider) unprotectObject(fileNum base.DiskFileNum) {
     509           1 :         p.mu.Lock()
     510           1 :         defer p.mu.Unlock()
     511           1 :         v := p.mu.protectedObjects[fileNum]
     512           1 :         if invariants.Enabled && v == 0 {
     513           0 :                 panic("invalid protection count")
     514             :         }
     515           1 :         if v > 1 {
     516           0 :                 p.mu.protectedObjects[fileNum] = v - 1
     517           1 :         } else {
     518           1 :                 delete(p.mu.protectedObjects, fileNum)
     519           1 :                 // TODO(radu): check if the object is still in knownObject; if not, unref it
     520           1 :                 // now.
     521           1 :         }
     522             : }
     523             : 
     524           2 : func (p *provider) isProtected(fileNum base.DiskFileNum) bool {
     525           2 :         p.mu.Lock()
     526           2 :         defer p.mu.Unlock()
     527           2 :         return p.mu.protectedObjects[fileNum] > 0
     528           2 : }

Generated by: LCOV version 1.14