LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider - remote.go (source / functions) Hit Total Coverage
Test: 2024-01-23 08:16Z 5b092519 - tests + meta.lcov Lines: 200 258 77.5 %
Date: 2024-01-23 08:17:09 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package objstorageprovider
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "runtime"
      11             :         "sync"
      12             :         "sync/atomic"
      13             : 
      14             :         "github.com/cockroachdb/errors"
      15             :         "github.com/cockroachdb/pebble/internal/base"
      16             :         "github.com/cockroachdb/pebble/internal/invariants"
      17             :         "github.com/cockroachdb/pebble/objstorage"
      18             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
      19             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
      20             :         "github.com/cockroachdb/pebble/objstorage/remote"
      21             :         "github.com/cockroachdb/redact"
      22             : )
      23             : 
      24             : // remoteSubsystem contains the provider fields related to remote storage.
      25             : // All fields remain unset if remote storage is not configured.
      26             : type remoteSubsystem struct {
      27             :         catalog *remoteobjcat.Catalog
      28             :         // catalogSyncMutex is used to correctly serialize two sharedSync operations.
      29             :         // It must be acquired before the provider mutex.
      30             :         catalogSyncMutex sync.Mutex
      31             : 
      32             :         cache *sharedcache.Cache
      33             : 
      34             :         // shared contains the fields relevant to shared objects, i.e. objects that
      35             :         // are created by Pebble and potentially shared between Pebble instances.
      36             :         shared struct {
      37             :                 // initialized guards access to the creatorID field.
      38             :                 initialized atomic.Bool
      39             :                 creatorID   objstorage.CreatorID
      40             :                 initOnce    sync.Once
      41             : 
      42             :                 // checkRefsOnOpen controls whether we check the ref marker file when opening
      43             :                 // an object. Normally this is true when invariants are enabled (but the provider
      44             :                 // test tweaks this field).
      45             :                 checkRefsOnOpen bool
      46             :         }
      47             : }
      48             : 
      49             : // remoteInit initializes the remote object subsystem (if configured) and finds
      50             : // any remote objects.
      51           2 : func (p *provider) remoteInit() error {
      52           2 :         if p.st.Remote.StorageFactory == nil {
      53           2 :                 return nil
      54           2 :         }
      55           2 :         catalog, contents, err := remoteobjcat.Open(p.st.FS, p.st.FSDirName)
      56           2 :         if err != nil {
      57           0 :                 return errors.Wrapf(err, "pebble: could not open remote object catalog")
      58           0 :         }
      59           2 :         p.remote.catalog = catalog
      60           2 :         p.remote.shared.checkRefsOnOpen = invariants.Enabled
      61           2 : 
      62           2 :         // The creator ID may or may not be initialized yet.
      63           2 :         if contents.CreatorID.IsSet() {
      64           2 :                 p.remote.initShared(contents.CreatorID)
      65           2 :                 p.st.Logger.Infof("remote storage configured; creatorID = %s", contents.CreatorID)
      66           2 :         } else {
      67           2 :                 p.st.Logger.Infof("remote storage configured; no creatorID yet")
      68           2 :         }
      69             : 
      70           2 :         if p.st.Remote.CacheSizeBytes > 0 {
      71           1 :                 const defaultBlockSize = 32 * 1024
      72           1 :                 blockSize := p.st.Remote.CacheBlockSize
      73           1 :                 if blockSize == 0 {
      74           1 :                         blockSize = defaultBlockSize
      75           1 :                 }
      76             : 
      77           1 :                 const defaultShardingBlockSize = 1024 * 1024
      78           1 :                 shardingBlockSize := p.st.Remote.ShardingBlockSize
      79           1 :                 if shardingBlockSize == 0 {
      80           1 :                         shardingBlockSize = defaultShardingBlockSize
      81           1 :                 }
      82             : 
      83           1 :                 numShards := p.st.Remote.CacheShardCount
      84           1 :                 if numShards == 0 {
      85           1 :                         numShards = 2 * runtime.GOMAXPROCS(0)
      86           1 :                 }
      87             : 
      88           1 :                 p.remote.cache, err = sharedcache.Open(
      89           1 :                         p.st.FS, p.st.Logger, p.st.FSDirName, blockSize, shardingBlockSize, p.st.Remote.CacheSizeBytes, numShards)
      90           1 :                 if err != nil {
      91           0 :                         return errors.Wrapf(err, "pebble: could not open remote object cache")
      92           0 :                 }
      93             :         }
      94             : 
      95           2 :         for _, meta := range contents.Objects {
      96           2 :                 o := objstorage.ObjectMetadata{
      97           2 :                         DiskFileNum: meta.FileNum,
      98           2 :                         FileType:    meta.FileType,
      99           2 :                 }
     100           2 :                 o.Remote.CreatorID = meta.CreatorID
     101           2 :                 o.Remote.CreatorFileNum = meta.CreatorFileNum
     102           2 :                 o.Remote.CleanupMethod = meta.CleanupMethod
     103           2 :                 o.Remote.Locator = meta.Locator
     104           2 :                 o.Remote.CustomObjectName = meta.CustomObjectName
     105           2 :                 o.Remote.Storage, err = p.ensureStorageLocked(o.Remote.Locator)
     106           2 :                 if err != nil {
     107           0 :                         return errors.Wrapf(err, "creating remote.Storage object for locator '%s'", o.Remote.Locator)
     108           0 :                 }
     109           2 :                 if invariants.Enabled {
     110           2 :                         o.AssertValid()
     111           2 :                 }
     112           2 :                 p.mu.knownObjects[o.DiskFileNum] = o
     113             :         }
     114           2 :         return nil
     115             : }
     116             : 
     117             : // initShared initializes the creator ID, allowing use of shared objects.
     118           2 : func (ss *remoteSubsystem) initShared(creatorID objstorage.CreatorID) {
     119           2 :         ss.shared.initOnce.Do(func() {
     120           2 :                 ss.shared.creatorID = creatorID
     121           2 :                 ss.shared.initialized.Store(true)
     122           2 :         })
     123             : }
     124             : 
     125           2 : func (p *provider) sharedClose() error {
     126           2 :         if p.st.Remote.StorageFactory == nil {
     127           2 :                 return nil
     128           2 :         }
     129           2 :         var err error
     130           2 :         if p.remote.cache != nil {
     131           1 :                 err = p.remote.cache.Close()
     132           1 :                 p.remote.cache = nil
     133           1 :         }
     134           2 :         if p.remote.catalog != nil {
     135           2 :                 err = firstError(err, p.remote.catalog.Close())
     136           2 :                 p.remote.catalog = nil
     137           2 :         }
     138           2 :         return err
     139             : }
     140             : 
     141             : // SetCreatorID is part of the objstorage.Provider interface.
     142           2 : func (p *provider) SetCreatorID(creatorID objstorage.CreatorID) error {
     143           2 :         if p.st.Remote.StorageFactory == nil {
     144           0 :                 return errors.AssertionFailedf("attempt to set CreatorID but remote storage not enabled")
     145           0 :         }
     146             :         // Note: this call is a cheap no-op if the creator ID was already set. This
     147             :         // call also checks if we are trying to change the ID.
     148           2 :         if err := p.remote.catalog.SetCreatorID(creatorID); err != nil {
     149           0 :                 return err
     150           0 :         }
     151           2 :         if !p.remote.shared.initialized.Load() {
     152           2 :                 p.st.Logger.Infof("remote storage creatorID set to %s", creatorID)
     153           2 :                 p.remote.initShared(creatorID)
     154           2 :         }
     155           2 :         return nil
     156             : }
     157             : 
     158             : // IsSharedForeign is part of the objstorage.Provider interface.
     159           2 : func (p *provider) IsSharedForeign(meta objstorage.ObjectMetadata) bool {
     160           2 :         if !p.remote.shared.initialized.Load() {
     161           0 :                 return false
     162           0 :         }
     163           2 :         return meta.IsShared() && (meta.Remote.CreatorID != p.remote.shared.creatorID)
     164             : }
     165             : 
     166           2 : func (p *provider) remoteCheckInitialized() error {
     167           2 :         if p.st.Remote.StorageFactory == nil {
     168           0 :                 return errors.Errorf("remote object support not configured")
     169           0 :         }
     170           2 :         return nil
     171             : }
     172             : 
     173           2 : func (p *provider) sharedCheckInitialized() error {
     174           2 :         if err := p.remoteCheckInitialized(); err != nil {
     175           0 :                 return err
     176           0 :         }
     177           2 :         if !p.remote.shared.initialized.Load() {
     178           0 :                 return errors.Errorf("remote object support not available: remote creator ID not yet set")
     179           0 :         }
     180           2 :         return nil
     181             : }
     182             : 
     183           2 : func (p *provider) sharedSync() error {
     184           2 :         // Serialize parallel sync operations. Note that ApplyBatch is already
     185           2 :         // serialized internally, but we want to make sure they get called with
     186           2 :         // batches in the right order.
     187           2 :         p.remote.catalogSyncMutex.Lock()
     188           2 :         defer p.remote.catalogSyncMutex.Unlock()
     189           2 : 
     190           2 :         batch := func() remoteobjcat.Batch {
     191           2 :                 p.mu.Lock()
     192           2 :                 defer p.mu.Unlock()
     193           2 :                 res := p.mu.remote.catalogBatch.Copy()
     194           2 :                 p.mu.remote.catalogBatch.Reset()
     195           2 :                 return res
     196           2 :         }()
     197             : 
     198           2 :         if batch.IsEmpty() {
     199           2 :                 return nil
     200           2 :         }
     201             : 
     202           2 :         if err := p.remote.catalog.ApplyBatch(batch); err != nil {
     203           0 :                 // Put back the batch (for the next Sync), appending any operations that
     204           0 :                 // happened in the meantime.
     205           0 :                 p.mu.Lock()
     206           0 :                 defer p.mu.Unlock()
     207           0 :                 batch.Append(p.mu.remote.catalogBatch)
     208           0 :                 p.mu.remote.catalogBatch = batch
     209           0 :                 return err
     210           0 :         }
     211             : 
     212           2 :         return nil
     213             : }
     214             : 
     215           2 : func (p *provider) remotePath(meta objstorage.ObjectMetadata) string {
     216           2 :         if meta.Remote.Locator != "" {
     217           1 :                 return fmt.Sprintf("remote-%s://%s", meta.Remote.Locator, remoteObjectName(meta))
     218           1 :         }
     219           2 :         return "remote://" + remoteObjectName(meta)
     220             : }
     221             : 
     222             : // sharedCreateRef creates a reference marker object.
     223           2 : func (p *provider) sharedCreateRef(meta objstorage.ObjectMetadata) error {
     224           2 :         if err := p.sharedCheckInitialized(); err != nil {
     225           0 :                 return err
     226           0 :         }
     227           2 :         if meta.Remote.CleanupMethod != objstorage.SharedRefTracking {
     228           1 :                 return nil
     229           1 :         }
     230           2 :         refName := p.sharedObjectRefName(meta)
     231           2 :         writer, err := meta.Remote.Storage.CreateObject(refName)
     232           2 :         if err == nil {
     233           2 :                 // The object is empty, just close the writer.
     234           2 :                 err = writer.Close()
     235           2 :         }
     236           2 :         if err != nil {
     237           0 :                 return errors.Wrapf(err, "creating marker object %q", errors.Safe(refName))
     238           0 :         }
     239           2 :         return nil
     240             : }
     241             : 
     242             : func (p *provider) sharedCreate(
     243             :         _ context.Context,
     244             :         fileType base.FileType,
     245             :         fileNum base.DiskFileNum,
     246             :         locator remote.Locator,
     247             :         opts objstorage.CreateOptions,
     248           2 : ) (objstorage.Writable, objstorage.ObjectMetadata, error) {
     249           2 :         if err := p.sharedCheckInitialized(); err != nil {
     250           0 :                 return nil, objstorage.ObjectMetadata{}, err
     251           0 :         }
     252           2 :         storage, err := p.ensureStorage(locator)
     253           2 :         if err != nil {
     254           0 :                 return nil, objstorage.ObjectMetadata{}, err
     255           0 :         }
     256           2 :         meta := objstorage.ObjectMetadata{
     257           2 :                 DiskFileNum: fileNum,
     258           2 :                 FileType:    fileType,
     259           2 :         }
     260           2 :         meta.Remote.CreatorID = p.remote.shared.creatorID
     261           2 :         meta.Remote.CreatorFileNum = fileNum
     262           2 :         meta.Remote.CleanupMethod = opts.SharedCleanupMethod
     263           2 :         meta.Remote.Locator = locator
     264           2 :         meta.Remote.Storage = storage
     265           2 : 
     266           2 :         objName := remoteObjectName(meta)
     267           2 :         writer, err := storage.CreateObject(objName)
     268           2 :         if err != nil {
     269           0 :                 return nil, objstorage.ObjectMetadata{}, errors.Wrapf(err, "creating object %q", errors.Safe(objName))
     270           0 :         }
     271           2 :         return &sharedWritable{
     272           2 :                 p:             p,
     273           2 :                 meta:          meta,
     274           2 :                 storageWriter: writer,
     275           2 :         }, meta, nil
     276             : }
     277             : 
     278             : func (p *provider) remoteOpenForReading(
     279             :         ctx context.Context, meta objstorage.ObjectMetadata, opts objstorage.OpenOptions,
     280           2 : ) (objstorage.Readable, error) {
     281           2 :         if err := p.remoteCheckInitialized(); err != nil {
     282           0 :                 return nil, err
     283           0 :         }
     284             :         // Verify we have a reference on this object; for performance reasons, we only
     285             :         // do this in testing scenarios.
     286           2 :         if p.remote.shared.checkRefsOnOpen && meta.Remote.CleanupMethod == objstorage.SharedRefTracking {
     287           2 :                 if err := p.sharedCheckInitialized(); err != nil {
     288           0 :                         return nil, err
     289           0 :                 }
     290           2 :                 refName := p.sharedObjectRefName(meta)
     291           2 :                 if _, err := meta.Remote.Storage.Size(refName); err != nil {
     292           0 :                         if meta.Remote.Storage.IsNotExistError(err) {
     293           0 :                                 if opts.MustExist {
     294           0 :                                         p.st.Logger.Fatalf("marker object %q does not exist", errors.Safe(refName))
     295           0 :                                         // TODO(radu): maybe list references for the object.
     296           0 :                                 }
     297           0 :                                 return nil, errors.Errorf("marker object %q does not exist", errors.Safe(refName))
     298             :                         }
     299           0 :                         return nil, errors.Wrapf(err, "checking marker object %q", errors.Safe(refName))
     300             :                 }
     301             :         }
     302           2 :         objName := remoteObjectName(meta)
     303           2 :         reader, size, err := meta.Remote.Storage.ReadObject(ctx, objName)
     304           2 :         if err != nil {
     305           1 :                 if opts.MustExist && meta.Remote.Storage.IsNotExistError(err) {
     306           0 :                         p.st.Logger.Fatalf("object %q does not exist", redact.SafeString(objName))
     307           0 :                         // TODO(radu): maybe list references for the object.
     308           0 :                 }
     309           1 :                 return nil, err
     310             :         }
     311           2 :         return p.newRemoteReadable(reader, size, meta.DiskFileNum), nil
     312             : }
     313             : 
     314           2 : func (p *provider) remoteSize(meta objstorage.ObjectMetadata) (int64, error) {
     315           2 :         if err := p.remoteCheckInitialized(); err != nil {
     316           0 :                 return 0, err
     317           0 :         }
     318           2 :         objName := remoteObjectName(meta)
     319           2 :         return meta.Remote.Storage.Size(objName)
     320             : }
     321             : 
     322             : // sharedUnref implements object "removal" with the remote backend. The ref
     323             : // marker object is removed and the backing object is removed only if there are
     324             : // no other ref markers.
     325           2 : func (p *provider) sharedUnref(meta objstorage.ObjectMetadata) error {
     326           2 :         if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
     327           1 :                 // Never delete objects in this mode.
     328           1 :                 return nil
     329           1 :         }
     330           2 :         if p.isProtected(meta.DiskFileNum) {
     331           2 :                 // TODO(radu): we need a mechanism to unref the object when it becomes
     332           2 :                 // unprotected.
     333           2 :                 return nil
     334           2 :         }
     335             : 
     336           2 :         refName := p.sharedObjectRefName(meta)
     337           2 :         // Tolerate a not-exists error.
     338           2 :         if err := meta.Remote.Storage.Delete(refName); err != nil && !meta.Remote.Storage.IsNotExistError(err) {
     339           0 :                 return err
     340           0 :         }
     341           2 :         otherRefs, err := meta.Remote.Storage.List(sharedObjectRefPrefix(meta), "" /* delimiter */)
     342           2 :         if err != nil {
     343           0 :                 return err
     344           0 :         }
     345           2 :         if len(otherRefs) == 0 {
     346           2 :                 objName := remoteObjectName(meta)
     347           2 :                 if err := meta.Remote.Storage.Delete(objName); err != nil && !meta.Remote.Storage.IsNotExistError(err) {
     348           0 :                         return err
     349           0 :                 }
     350             :         }
     351           2 :         return nil
     352             : }
     353             : 
     354             : // ensureStorageLocked populates the remote.Storage object for the given
     355             : // locator, if necessary. p.mu must be held.
     356           2 : func (p *provider) ensureStorageLocked(locator remote.Locator) (remote.Storage, error) {
     357           2 :         if p.mu.remote.storageObjects == nil {
     358           2 :                 p.mu.remote.storageObjects = make(map[remote.Locator]remote.Storage)
     359           2 :         }
     360           2 :         if res, ok := p.mu.remote.storageObjects[locator]; ok {
     361           2 :                 return res, nil
     362           2 :         }
     363           2 :         res, err := p.st.Remote.StorageFactory.CreateStorage(locator)
     364           2 :         if err != nil {
     365           1 :                 return nil, err
     366           1 :         }
     367             : 
     368           2 :         p.mu.remote.storageObjects[locator] = res
     369           2 :         return res, nil
     370             : }
     371             : 
     372             : // ensureStorage populates the remote.Storage object for the given locator, if necessary.
     373           2 : func (p *provider) ensureStorage(locator remote.Locator) (remote.Storage, error) {
     374           2 :         p.mu.Lock()
     375           2 :         defer p.mu.Unlock()
     376           2 :         return p.ensureStorageLocked(locator)
     377           2 : }

Generated by: LCOV version 1.14