LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider - provider.go (source / functions) Hit Total Coverage
Test: 2023-09-25 08:17Z 86593692 - meta test only.lcov Lines: 174 280 62.1 %
Date: 2023-09-25 08:18:44 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package objstorageprovider
       6             : 
       7             : import (
       8             :         "context"
       9             :         "io"
      10             :         "os"
      11             :         "sort"
      12             :         "sync"
      13             : 
      14             :         "github.com/cockroachdb/errors"
      15             :         "github.com/cockroachdb/errors/oserror"
      16             :         "github.com/cockroachdb/pebble/internal/base"
      17             :         "github.com/cockroachdb/pebble/internal/invariants"
      18             :         "github.com/cockroachdb/pebble/objstorage"
      19             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      20             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
      21             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
      22             :         "github.com/cockroachdb/pebble/objstorage/remote"
      23             :         "github.com/cockroachdb/pebble/vfs"
      24             : )
      25             : 
      26             : // provider is the implementation of objstorage.Provider.
      27             : type provider struct {
      28             :         st Settings
      29             : 
      30             :         fsDir vfs.File
      31             : 
      32             :         tracer *objiotracing.Tracer
      33             : 
      34             :         remote remoteSubsystem
      35             : 
      36             :         mu struct {
      37             :                 sync.RWMutex
      38             : 
      39             :                 remote struct {
      40             :                         // catalogBatch accumulates remote object creations and deletions until
      41             :                         // Sync is called.
      42             :                         catalogBatch remoteobjcat.Batch
      43             : 
      44             :                         storageObjects map[remote.Locator]remote.Storage
      45             :                 }
      46             : 
      47             :                 // localObjectsChanged is set if non-remote objects were created or deleted
      48             :                 // but Sync was not yet called.
      49             :                 localObjectsChanged bool
      50             : 
      51             :                 // knownObjects maintains information about objects that are known to the provider.
      52             :                 // It is initialized with the list of files in the manifest when we open a DB.
      53             :                 knownObjects map[base.DiskFileNum]objstorage.ObjectMetadata
      54             : 
      55             :                 // protectedObjects are objects that cannot be unreferenced because they
      56             :                 // have outstanding SharedObjectBackingHandles. The value is a count of outstanding handles
      57             :                 protectedObjects map[base.DiskFileNum]int
      58             :         }
      59             : }
      60             : 
      61             : var _ objstorage.Provider = (*provider)(nil)
      62             : 
      63             : // Settings that must be specified when creating the provider.
      64             : type Settings struct {
      65             :         Logger base.Logger
      66             : 
      67             :         // Local filesystem configuration.
      68             :         FS        vfs.FS
      69             :         FSDirName string
      70             : 
      71             :         // FSDirInitialListing is a listing of FSDirName at the time of calling Open.
      72             :         //
      73             :         // This is an optional optimization to avoid double listing on Open when the
      74             :         // higher layer already has a listing. When nil, we obtain the listing on
      75             :         // Open.
      76             :         FSDirInitialListing []string
      77             : 
      78             :         // Cleaner cleans obsolete files from the local filesystem.
      79             :         //
      80             :         // The default cleaner uses the DeleteCleaner.
      81             :         FSCleaner base.Cleaner
      82             : 
      83             :         // NoSyncOnClose decides whether the implementation will enforce a
      84             :         // close-time synchronization (e.g., fdatasync() or sync_file_range())
      85             :         // on files it writes to. Setting this to true removes the guarantee for a
      86             :         // sync on close. Some implementations can still issue a non-blocking sync.
      87             :         NoSyncOnClose bool
      88             : 
      89             :         // BytesPerSync enables periodic syncing of files in order to smooth out
      90             :         // writes to disk. This option does not provide any persistence guarantee, but
      91             :         // is used to avoid latency spikes if the OS automatically decides to write
      92             :         // out a large chunk of dirty filesystem buffers.
      93             :         BytesPerSync int
      94             : 
      95             :         // Fields here are set only if the provider is to support remote objects
      96             :         // (experimental).
      97             :         Remote struct {
      98             :                 StorageFactory remote.StorageFactory
      99             : 
     100             :                 // If CreateOnShared is true, sstables are created on remote storage using
     101             :                 // the CreateOnSharedLocator (when the PreferSharedStorage create option is
     102             :                 // true).
     103             :                 CreateOnShared        bool
     104             :                 CreateOnSharedLocator remote.Locator
     105             : 
     106             :                 // CacheSizeBytes is the size of the on-disk block cache for objects
     107             :                 // on remote storage. If it is 0, no cache is used.
     108             :                 CacheSizeBytes int64
     109             : 
     110             :                 // CacheBlockSize is the block size of the cache; if 0, the default of 32KB is used.
     111             :                 CacheBlockSize int
     112             : 
     113             :                 // ShardingBlockSize is the size of a shard block. The cache is split into contiguous
     114             :                 // ShardingBlockSize units. The units are distributed across multiple independent shards
     115             :                 // of the cache, via a hash(offset) modulo num shards operation. The cache replacement
     116             :                 // policies operate at the level of shard, not whole cache. This is done to reduce lock
     117             :                 // contention.
     118             :                 //
     119             :                 // If ShardingBlockSize is 0, the default of 1 MB is used.
     120             :                 ShardingBlockSize int64
     121             : 
     122             :                 // The number of independent shards the cache leverages. Each shard is the same size,
     123             :                 // and a hash of filenum & offset map a read to a certain shard. If set to 0,
     124             :                 // 2*runtime.GOMAXPROCS is used as the shard count.
     125             :                 CacheShardCount int
     126             : 
     127             :                 // TODO(radu): allow the cache to live on another FS/location (e.g. to use
     128             :                 // instance-local SSD).
     129             :         }
     130             : }
     131             : 
     132             : // DefaultSettings initializes default settings (with no remote storage),
     133             : // suitable for tests and tools.
     134           0 : func DefaultSettings(fs vfs.FS, dirName string) Settings {
     135           0 :         return Settings{
     136           0 :                 Logger:        base.DefaultLogger,
     137           0 :                 FS:            fs,
     138           0 :                 FSDirName:     dirName,
     139           0 :                 FSCleaner:     base.DeleteCleaner{},
     140           0 :                 NoSyncOnClose: false,
     141           0 :                 BytesPerSync:  512 * 1024, // 512KB
     142           0 :         }
     143           0 : }
     144             : 
     145             : // Open creates the provider.
     146           1 : func Open(settings Settings) (objstorage.Provider, error) {
     147           1 :         // Note: we can't just `return open(settings)` because in an error case we
     148           1 :         // would return (*provider)(nil) which is not objstorage.Provider(nil).
     149           1 :         p, err := open(settings)
     150           1 :         if err != nil {
     151           0 :                 return nil, err
     152           0 :         }
     153           1 :         return p, nil
     154             : }
     155             : 
     156           1 : func open(settings Settings) (p *provider, _ error) {
     157           1 :         fsDir, err := settings.FS.OpenDir(settings.FSDirName)
     158           1 :         if err != nil {
     159           0 :                 return nil, err
     160           0 :         }
     161             : 
     162           1 :         defer func() {
     163           1 :                 if p == nil {
     164           0 :                         fsDir.Close()
     165           0 :                 }
     166             :         }()
     167             : 
     168           1 :         p = &provider{
     169           1 :                 st:    settings,
     170           1 :                 fsDir: fsDir,
     171           1 :         }
     172           1 :         p.mu.knownObjects = make(map[base.DiskFileNum]objstorage.ObjectMetadata)
     173           1 :         p.mu.protectedObjects = make(map[base.DiskFileNum]int)
     174           1 : 
     175           1 :         if objiotracing.Enabled {
     176           0 :                 p.tracer = objiotracing.Open(settings.FS, settings.FSDirName)
     177           0 :         }
     178             : 
     179             :         // Add local FS objects.
     180           1 :         if err := p.vfsInit(); err != nil {
     181           0 :                 return nil, err
     182           0 :         }
     183             : 
     184             :         // Initialize remote subsystem (if configured) and add remote objects.
     185           1 :         if err := p.remoteInit(); err != nil {
     186           0 :                 return nil, err
     187           0 :         }
     188             : 
     189           1 :         return p, nil
     190             : }
     191             : 
     192             : // Close is part of the objstorage.Provider interface.
     193           1 : func (p *provider) Close() error {
     194           1 :         err := p.sharedClose()
     195           1 :         if p.fsDir != nil {
     196           1 :                 err = firstError(err, p.fsDir.Close())
     197           1 :                 p.fsDir = nil
     198           1 :         }
     199           1 :         if objiotracing.Enabled {
     200           0 :                 if p.tracer != nil {
     201           0 :                         p.tracer.Close()
     202           0 :                         p.tracer = nil
     203           0 :                 }
     204             :         }
     205           1 :         return err
     206             : }
     207             : 
     208             : // OpenForReading opens an existing object.
     209             : func (p *provider) OpenForReading(
     210             :         ctx context.Context,
     211             :         fileType base.FileType,
     212             :         fileNum base.DiskFileNum,
     213             :         opts objstorage.OpenOptions,
     214           1 : ) (objstorage.Readable, error) {
     215           1 :         meta, err := p.Lookup(fileType, fileNum)
     216           1 :         if err != nil {
     217           0 :                 if opts.MustExist {
     218           0 :                         p.st.Logger.Fatalf("%v", err)
     219           0 :                 }
     220           0 :                 return nil, err
     221             :         }
     222             : 
     223           1 :         var r objstorage.Readable
     224           1 :         if !meta.IsRemote() {
     225           1 :                 r, err = p.vfsOpenForReading(ctx, fileType, fileNum, opts)
     226           1 :         } else {
     227           1 :                 r, err = p.remoteOpenForReading(ctx, meta, opts)
     228           1 :                 if err != nil && p.isNotExistError(meta, err) {
     229           0 :                         // Wrap the error so that IsNotExistError functions properly.
     230           0 :                         err = errors.Mark(err, os.ErrNotExist)
     231           0 :                 }
     232             :         }
     233           1 :         if err != nil {
     234           0 :                 return nil, err
     235           0 :         }
     236           1 :         if objiotracing.Enabled {
     237           0 :                 r = p.tracer.WrapReadable(ctx, r, fileNum)
     238           0 :         }
     239           1 :         return r, nil
     240             : }
     241             : 
     242             : // Create creates a new object and opens it for writing.
     243             : //
     244             : // The object is not guaranteed to be durable (accessible in case of crashes)
     245             : // until Sync is called.
     246             : func (p *provider) Create(
     247             :         ctx context.Context,
     248             :         fileType base.FileType,
     249             :         fileNum base.DiskFileNum,
     250             :         opts objstorage.CreateOptions,
     251           1 : ) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error) {
     252           1 :         if opts.PreferSharedStorage && p.st.Remote.CreateOnShared {
     253           1 :                 w, meta, err = p.sharedCreate(ctx, fileType, fileNum, p.st.Remote.CreateOnSharedLocator, opts)
     254           1 :         } else {
     255           1 :                 w, meta, err = p.vfsCreate(ctx, fileType, fileNum)
     256           1 :         }
     257           1 :         if err != nil {
     258           0 :                 err = errors.Wrapf(err, "creating object %s", errors.Safe(fileNum))
     259           0 :                 return nil, objstorage.ObjectMetadata{}, err
     260           0 :         }
     261           1 :         p.addMetadata(meta)
     262           1 :         if objiotracing.Enabled {
     263           0 :                 w = p.tracer.WrapWritable(ctx, w, fileNum)
     264           0 :         }
     265           1 :         return w, meta, nil
     266             : }
     267             : 
     268             : // Remove removes an object.
     269             : //
     270             : // Note that if the object is remote, the object is only (conceptually) removed
     271             : // from this provider. If other providers have references on the remote object,
     272             : // it will not be removed.
     273             : //
     274             : // The object is not guaranteed to be durably removed until Sync is called.
     275           1 : func (p *provider) Remove(fileType base.FileType, fileNum base.DiskFileNum) error {
     276           1 :         meta, err := p.Lookup(fileType, fileNum)
     277           1 :         if err != nil {
     278           0 :                 return err
     279           0 :         }
     280             : 
     281           1 :         if !meta.IsRemote() {
     282           1 :                 err = p.vfsRemove(fileType, fileNum)
     283           1 :         } else {
     284           1 :                 // TODO(radu): implement remote object removal (i.e. deref).
     285           1 :                 err = p.sharedUnref(meta)
     286           1 :                 if err != nil && p.isNotExistError(meta, err) {
     287           0 :                         // Wrap the error so that IsNotExistError functions properly.
     288           0 :                         err = errors.Mark(err, os.ErrNotExist)
     289           0 :                 }
     290             :         }
     291           1 :         if err != nil && !p.IsNotExistError(err) {
     292           0 :                 // We want to be able to retry a Remove, so we keep the object in our list.
     293           0 :                 // TODO(radu): we should mark the object as "zombie" and not allow any other
     294           0 :                 // operations.
     295           0 :                 return errors.Wrapf(err, "removing object %s", errors.Safe(fileNum))
     296           0 :         }
     297             : 
     298           1 :         p.removeMetadata(fileNum)
     299           1 :         return err
     300             : }
     301             : 
     302           0 : func (p *provider) isNotExistError(meta objstorage.ObjectMetadata, err error) bool {
     303           0 :         if meta.Remote.Storage != nil {
     304           0 :                 return meta.Remote.Storage.IsNotExistError(err)
     305           0 :         }
     306           0 :         return oserror.IsNotExist(err)
     307             : }
     308             : 
     309             : // IsNotExistError is part of the objstorage.Provider interface.
     310           1 : func (p *provider) IsNotExistError(err error) bool {
     311           1 :         // We use errors.Mark(err, os.ErrNotExist) for not-exist errors coming from
     312           1 :         // remote.Storage.
     313           1 :         return oserror.IsNotExist(err)
     314           1 : }
     315             : 
     316             : // Sync flushes the metadata from creation or removal of objects since the last Sync.
     317           1 : func (p *provider) Sync() error {
     318           1 :         if err := p.vfsSync(); err != nil {
     319           0 :                 return err
     320           0 :         }
     321           1 :         if err := p.sharedSync(); err != nil {
     322           0 :                 return err
     323           0 :         }
     324           1 :         return nil
     325             : }
     326             : 
     327             : // LinkOrCopyFromLocal creates a new object that is either a copy of a given
     328             : // local file or a hard link (if the new object is created on the same FS, and
     329             : // if the FS supports it).
     330             : //
     331             : // The object is not guaranteed to be durable (accessible in case of crashes)
     332             : // until Sync is called.
     333             : func (p *provider) LinkOrCopyFromLocal(
     334             :         ctx context.Context,
     335             :         srcFS vfs.FS,
     336             :         srcFilePath string,
     337             :         dstFileType base.FileType,
     338             :         dstFileNum base.DiskFileNum,
     339             :         opts objstorage.CreateOptions,
     340           1 : ) (objstorage.ObjectMetadata, error) {
     341           1 :         shared := opts.PreferSharedStorage && p.st.Remote.CreateOnShared
     342           1 :         if !shared && srcFS == p.st.FS {
     343           1 :                 // Wrap the normal filesystem with one which wraps newly created files with
     344           1 :                 // vfs.NewSyncingFile.
     345           1 :                 fs := vfs.NewSyncingFS(p.st.FS, vfs.SyncingFileOptions{
     346           1 :                         NoSyncOnClose: p.st.NoSyncOnClose,
     347           1 :                         BytesPerSync:  p.st.BytesPerSync,
     348           1 :                 })
     349           1 :                 dstPath := p.vfsPath(dstFileType, dstFileNum)
     350           1 :                 if err := vfs.LinkOrCopy(fs, srcFilePath, dstPath); err != nil {
     351           0 :                         return objstorage.ObjectMetadata{}, err
     352           0 :                 }
     353             : 
     354           1 :                 meta := objstorage.ObjectMetadata{
     355           1 :                         DiskFileNum: dstFileNum,
     356           1 :                         FileType:    dstFileType,
     357           1 :                 }
     358           1 :                 p.addMetadata(meta)
     359           1 :                 return meta, nil
     360             :         }
     361             :         // Create the object and copy the data.
     362           1 :         w, meta, err := p.Create(ctx, dstFileType, dstFileNum, opts)
     363           1 :         if err != nil {
     364           0 :                 return objstorage.ObjectMetadata{}, err
     365           0 :         }
     366           1 :         f, err := srcFS.Open(srcFilePath, vfs.SequentialReadsOption)
     367           1 :         if err != nil {
     368           0 :                 return objstorage.ObjectMetadata{}, err
     369           0 :         }
     370           1 :         defer f.Close()
     371           1 :         buf := make([]byte, 64*1024)
     372           1 :         for {
     373           1 :                 n, readErr := f.Read(buf)
     374           1 :                 if readErr != nil && readErr != io.EOF {
     375           0 :                         w.Abort()
     376           0 :                         return objstorage.ObjectMetadata{}, readErr
     377           0 :                 }
     378             : 
     379           1 :                 if n > 0 {
     380           1 :                         if err := w.Write(buf[:n]); err != nil {
     381           0 :                                 w.Abort()
     382           0 :                                 return objstorage.ObjectMetadata{}, err
     383           0 :                         }
     384             :                 }
     385             : 
     386           1 :                 if readErr == io.EOF {
     387           1 :                         break
     388             :                 }
     389             :         }
     390           1 :         if err := w.Finish(); err != nil {
     391           0 :                 return objstorage.ObjectMetadata{}, err
     392           0 :         }
     393           1 :         return meta, nil
     394             : }
     395             : 
     396             : // Lookup is part of the objstorage.Provider interface.
     397             : func (p *provider) Lookup(
     398             :         fileType base.FileType, fileNum base.DiskFileNum,
     399           1 : ) (objstorage.ObjectMetadata, error) {
     400           1 :         p.mu.RLock()
     401           1 :         defer p.mu.RUnlock()
     402           1 :         meta, ok := p.mu.knownObjects[fileNum]
     403           1 :         if !ok {
     404           0 :                 return objstorage.ObjectMetadata{}, errors.Wrapf(
     405           0 :                         os.ErrNotExist,
     406           0 :                         "file %s (type %d) unknown to the objstorage provider",
     407           0 :                         errors.Safe(fileNum), errors.Safe(fileType),
     408           0 :                 )
     409           0 :         }
     410           1 :         if meta.FileType != fileType {
     411           0 :                 return objstorage.ObjectMetadata{}, errors.AssertionFailedf(
     412           0 :                         "file %s type mismatch (known type %d, expected type %d)",
     413           0 :                         errors.Safe(fileNum), errors.Safe(meta.FileType), errors.Safe(fileType),
     414           0 :                 )
     415           0 :         }
     416           1 :         return meta, nil
     417             : }
     418             : 
     419             : // Path is part of the objstorage.Provider interface.
     420           1 : func (p *provider) Path(meta objstorage.ObjectMetadata) string {
     421           1 :         if !meta.IsRemote() {
     422           1 :                 return p.vfsPath(meta.FileType, meta.DiskFileNum)
     423           1 :         }
     424           1 :         return p.remotePath(meta)
     425             : }
     426             : 
     427             : // Size returns the size of the object.
     428           1 : func (p *provider) Size(meta objstorage.ObjectMetadata) (int64, error) {
     429           1 :         if !meta.IsRemote() {
     430           1 :                 return p.vfsSize(meta.FileType, meta.DiskFileNum)
     431           1 :         }
     432           1 :         return p.remoteSize(meta)
     433             : }
     434             : 
     435             : // List is part of the objstorage.Provider interface.
     436           1 : func (p *provider) List() []objstorage.ObjectMetadata {
     437           1 :         p.mu.RLock()
     438           1 :         defer p.mu.RUnlock()
     439           1 :         res := make([]objstorage.ObjectMetadata, 0, len(p.mu.knownObjects))
     440           1 :         for _, meta := range p.mu.knownObjects {
     441           1 :                 res = append(res, meta)
     442           1 :         }
     443           1 :         sort.Slice(res, func(i, j int) bool {
     444           1 :                 return res[i].DiskFileNum.FileNum() < res[j].DiskFileNum.FileNum()
     445           1 :         })
     446           1 :         return res
     447             : }
     448             : 
     449             : // Metrics is part of the objstorage.Provider interface.
     450           1 : func (p *provider) Metrics() sharedcache.Metrics {
     451           1 :         if p.remote.cache != nil {
     452           1 :                 return p.remote.cache.Metrics()
     453           1 :         }
     454           1 :         return sharedcache.Metrics{}
     455             : }
     456             : 
     457           1 : func (p *provider) addMetadata(meta objstorage.ObjectMetadata) {
     458           1 :         if invariants.Enabled {
     459           1 :                 meta.AssertValid()
     460           1 :         }
     461           1 :         p.mu.Lock()
     462           1 :         defer p.mu.Unlock()
     463           1 :         p.mu.knownObjects[meta.DiskFileNum] = meta
     464           1 :         if meta.IsRemote() {
     465           1 :                 p.mu.remote.catalogBatch.AddObject(remoteobjcat.RemoteObjectMetadata{
     466           1 :                         FileNum:        meta.DiskFileNum,
     467           1 :                         FileType:       meta.FileType,
     468           1 :                         CreatorID:      meta.Remote.CreatorID,
     469           1 :                         CreatorFileNum: meta.Remote.CreatorFileNum,
     470           1 :                         Locator:        meta.Remote.Locator,
     471           1 :                         CleanupMethod:  meta.Remote.CleanupMethod,
     472           1 :                 })
     473           1 :         } else {
     474           1 :                 p.mu.localObjectsChanged = true
     475           1 :         }
     476             : }
     477             : 
     478           1 : func (p *provider) removeMetadata(fileNum base.DiskFileNum) {
     479           1 :         p.mu.Lock()
     480           1 :         defer p.mu.Unlock()
     481           1 : 
     482           1 :         meta, ok := p.mu.knownObjects[fileNum]
     483           1 :         if !ok {
     484           0 :                 return
     485           0 :         }
     486           1 :         delete(p.mu.knownObjects, fileNum)
     487           1 :         if meta.IsRemote() {
     488           1 :                 p.mu.remote.catalogBatch.DeleteObject(fileNum)
     489           1 :         } else {
     490           1 :                 p.mu.localObjectsChanged = true
     491           1 :         }
     492             : }
     493             : 
     494             : // protectObject prevents the unreferencing of a remote object until
     495             : // unprotectObject is called.
     496           0 : func (p *provider) protectObject(fileNum base.DiskFileNum) {
     497           0 :         p.mu.Lock()
     498           0 :         defer p.mu.Unlock()
     499           0 :         p.mu.protectedObjects[fileNum] = p.mu.protectedObjects[fileNum] + 1
     500           0 : }
     501             : 
     502           0 : func (p *provider) unprotectObject(fileNum base.DiskFileNum) {
     503           0 :         p.mu.Lock()
     504           0 :         defer p.mu.Unlock()
     505           0 :         v := p.mu.protectedObjects[fileNum]
     506           0 :         if invariants.Enabled && v == 0 {
     507           0 :                 panic("invalid protection count")
     508             :         }
     509           0 :         if v > 1 {
     510           0 :                 p.mu.protectedObjects[fileNum] = v - 1
     511           0 :         } else {
     512           0 :                 delete(p.mu.protectedObjects, fileNum)
     513           0 :                 // TODO(radu): check if the object is still in knownObject; if not, unref it
     514           0 :                 // now.
     515           0 :         }
     516             : }
     517             : 
     518           1 : func (p *provider) isProtected(fileNum base.DiskFileNum) bool {
     519           1 :         p.mu.Lock()
     520           1 :         defer p.mu.Unlock()
     521           1 :         return p.mu.protectedObjects[fileNum] > 0
     522           1 : }

Generated by: LCOV version 1.14