LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider/remoteobjcat - catalog.go (source / functions) Hit Total Coverage
Test: 2023-09-17 08:17Z be158640 - tests + meta.lcov Lines: 170 225 75.6 %
Date: 2023-09-17 08:18:09 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package remoteobjcat
       6             : 
       7             : import (
       8             :         "fmt"
       9             :         "io"
      10             :         "sort"
      11             :         "sync"
      12             : 
      13             :         "github.com/cockroachdb/errors"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/objstorage"
      16             :         "github.com/cockroachdb/pebble/objstorage/remote"
      17             :         "github.com/cockroachdb/pebble/record"
      18             :         "github.com/cockroachdb/pebble/vfs"
      19             :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      20             : )
      21             : 
      22             : // Catalog is used to manage the on-disk remote object catalog.
      23             : //
      24             : // The catalog file is a log of records, where each record is an encoded
      25             : // VersionEdit.
      26             : type Catalog struct {
      27             :         fs      vfs.FS
      28             :         dirname string
      29             :         mu      struct {
      30             :                 sync.Mutex
      31             : 
      32             :                 creatorID objstorage.CreatorID
      33             :                 objects   map[base.DiskFileNum]RemoteObjectMetadata
      34             : 
      35             :                 marker *atomicfs.Marker
      36             : 
      37             :                 catalogFile      vfs.File
      38             :                 catalogRecWriter *record.Writer
      39             : 
      40             :                 rotationHelper record.RotationHelper
      41             : 
      42             :                 // catalogFilename is the filename of catalogFile when catalogFile != nil, otherwise
      43             :                 // it is the filename of the last catalog file.
      44             :                 catalogFilename string
      45             :         }
      46             : }
      47             : 
      48             : // RemoteObjectMetadata encapsulates the data stored in the catalog file for each object.
      49             : type RemoteObjectMetadata struct {
      50             :         // FileNum is the identifier for the object within the context of a single DB
      51             :         // instance.
      52             :         FileNum base.DiskFileNum
      53             :         // FileType is the type of the object. Only certain FileTypes are possible.
      54             :         FileType base.FileType
      55             :         // CreatorID identifies the DB instance that originally created the object.
      56             :         CreatorID objstorage.CreatorID
      57             :         // CreatorFileNum is the identifier for the object within the context of the
      58             :         // DB instance that originally created the object.
      59             :         CreatorFileNum base.DiskFileNum
      60             :         // CleanupMethod indicates the method for cleaning up unused shared objects.
      61             :         CleanupMethod objstorage.SharedCleanupMethod
      62             :         // Locator identifies a remote.Storage implementation.
      63             :         Locator remote.Locator
      64             :         // CustomObjectName (if it is set) overrides the object name that is normally
      65             :         // derived from the CreatorID and CreatorFileNum.
      66             :         CustomObjectName string
      67             : }
      68             : 
      69             : const (
      70             :         catalogFilenameBase = "REMOTE-OBJ-CATALOG"
      71             :         catalogMarkerName   = "remote-obj-catalog"
      72             : 
      73             :         // We create a new file when the size exceeds 1MB (and some other conditions
      74             :         // hold; see record.RotationHelper).
      75             :         rotateFileSize = 1024 * 1024 // 1MB
      76             : )
      77             : 
      78             : // CatalogContents contains the remote objects in the catalog.
      79             : type CatalogContents struct {
      80             :         // CreatorID, if it is set.
      81             :         CreatorID objstorage.CreatorID
      82             :         Objects   []RemoteObjectMetadata
      83             : }
      84             : 
      85             : // Open creates a Catalog and loads any existing catalog file, returning the
      86             : // creator ID (if it is set) and the contents.
      87           2 : func Open(fs vfs.FS, dirname string) (*Catalog, CatalogContents, error) {
      88           2 :         c := &Catalog{
      89           2 :                 fs:      fs,
      90           2 :                 dirname: dirname,
      91           2 :         }
      92           2 :         c.mu.objects = make(map[base.DiskFileNum]RemoteObjectMetadata)
      93           2 : 
      94           2 :         var err error
      95           2 :         c.mu.marker, c.mu.catalogFilename, err = atomicfs.LocateMarker(fs, dirname, catalogMarkerName)
      96           2 :         if err != nil {
      97           0 :                 return nil, CatalogContents{}, err
      98           0 :         }
      99             :         // If the filename is empty, there is no existing catalog.
     100           2 :         if c.mu.catalogFilename != "" {
     101           2 :                 if err := c.loadFromCatalogFile(c.mu.catalogFilename); err != nil {
     102           0 :                         return nil, CatalogContents{}, err
     103           0 :                 }
     104           2 :                 if err := c.mu.marker.RemoveObsolete(); err != nil {
     105           0 :                         return nil, CatalogContents{}, err
     106           0 :                 }
     107             :                 // TODO(radu): remove obsolete catalog files.
     108             :         }
     109           2 :         res := CatalogContents{
     110           2 :                 CreatorID: c.mu.creatorID,
     111           2 :                 Objects:   make([]RemoteObjectMetadata, 0, len(c.mu.objects)),
     112           2 :         }
     113           2 :         for _, meta := range c.mu.objects {
     114           2 :                 res.Objects = append(res.Objects, meta)
     115           2 :         }
     116             :         // Sort the objects so the function is deterministic.
     117           2 :         sort.Slice(res.Objects, func(i, j int) bool {
     118           2 :                 return res.Objects[i].FileNum.FileNum() < res.Objects[j].FileNum.FileNum()
     119           2 :         })
     120           2 :         return c, res, nil
     121             : }
     122             : 
     123             : // SetCreatorID sets the creator ID. If it is already set, it must match.
     124           2 : func (c *Catalog) SetCreatorID(id objstorage.CreatorID) error {
     125           2 :         if !id.IsSet() {
     126           0 :                 return errors.AssertionFailedf("attempt to unset CreatorID")
     127           0 :         }
     128             : 
     129           2 :         c.mu.Lock()
     130           2 :         defer c.mu.Unlock()
     131           2 : 
     132           2 :         if c.mu.creatorID.IsSet() {
     133           1 :                 if c.mu.creatorID != id {
     134           1 :                         return errors.AssertionFailedf("attempt to change CreatorID from %s to %s", c.mu.creatorID, id)
     135           1 :                 }
     136           1 :                 return nil
     137             :         }
     138             : 
     139           2 :         ve := VersionEdit{CreatorID: id}
     140           2 :         if err := c.writeToCatalogFileLocked(&ve); err != nil {
     141           0 :                 return errors.Wrapf(err, "pebble: could not write to remote object catalog: %v", err)
     142           0 :         }
     143           2 :         c.mu.creatorID = id
     144           2 :         return nil
     145             : }
     146             : 
     147             : // Close any open files.
     148           2 : func (c *Catalog) Close() error {
     149           2 :         return c.closeCatalogFile()
     150           2 : }
     151             : 
     152           2 : func (c *Catalog) closeCatalogFile() error {
     153           2 :         if c.mu.catalogFile == nil {
     154           2 :                 return nil
     155           2 :         }
     156           2 :         err1 := c.mu.catalogRecWriter.Close()
     157           2 :         err2 := c.mu.catalogFile.Close()
     158           2 :         c.mu.catalogRecWriter = nil
     159           2 :         c.mu.catalogFile = nil
     160           2 :         if err1 != nil {
     161           0 :                 return err1
     162           0 :         }
     163           2 :         return err2
     164             : }
     165             : 
     166             : // Batch is used to perform multiple object additions/deletions at once.
     167             : type Batch struct {
     168             :         ve VersionEdit
     169             : }
     170             : 
     171             : // AddObject adds a new object to the batch.
     172             : //
     173             : // The given FileNum must be new - it must not match that of any object that was
     174             : // ever in the catalog.
     175           2 : func (b *Batch) AddObject(meta RemoteObjectMetadata) {
     176           2 :         b.ve.NewObjects = append(b.ve.NewObjects, meta)
     177           2 : }
     178             : 
     179             : // DeleteObject adds an object removal to the batch.
     180           2 : func (b *Batch) DeleteObject(fileNum base.DiskFileNum) {
     181           2 :         b.ve.DeletedObjects = append(b.ve.DeletedObjects, fileNum)
     182           2 : }
     183             : 
     184             : // Reset clears the batch.
     185           2 : func (b *Batch) Reset() {
     186           2 :         b.ve.NewObjects = b.ve.NewObjects[:0]
     187           2 :         b.ve.DeletedObjects = b.ve.DeletedObjects[:0]
     188           2 : }
     189             : 
     190             : // IsEmpty returns true if the batch is empty.
     191           2 : func (b *Batch) IsEmpty() bool {
     192           2 :         return len(b.ve.NewObjects) == 0 && len(b.ve.DeletedObjects) == 0
     193           2 : }
     194             : 
     195             : // Copy returns a copy of the Batch.
     196           2 : func (b *Batch) Copy() Batch {
     197           2 :         var res Batch
     198           2 :         if len(b.ve.NewObjects) > 0 {
     199           2 :                 res.ve.NewObjects = make([]RemoteObjectMetadata, len(b.ve.NewObjects))
     200           2 :                 copy(res.ve.NewObjects, b.ve.NewObjects)
     201           2 :         }
     202           2 :         if len(b.ve.DeletedObjects) > 0 {
     203           2 :                 res.ve.DeletedObjects = make([]base.DiskFileNum, len(b.ve.DeletedObjects))
     204           2 :                 copy(res.ve.DeletedObjects, b.ve.DeletedObjects)
     205           2 :         }
     206           2 :         return res
     207             : }
     208             : 
     209             : // Append merges two batches.
     210           0 : func (b *Batch) Append(other Batch) {
     211           0 :         b.ve.NewObjects = append(b.ve.NewObjects, other.ve.NewObjects...)
     212           0 :         b.ve.DeletedObjects = append(b.ve.DeletedObjects, other.ve.DeletedObjects...)
     213           0 : }
     214             : 
     215             : // ApplyBatch applies a batch of updates; returns after the change is stably
     216             : // recorded on storage.
     217           2 : func (c *Catalog) ApplyBatch(b Batch) error {
     218           2 :         c.mu.Lock()
     219           2 :         defer c.mu.Unlock()
     220           2 : 
     221           2 :         // Sanity checks.
     222           2 :         toAdd := make(map[base.DiskFileNum]struct{}, len(b.ve.NewObjects))
     223           2 :         exists := func(n base.DiskFileNum) bool {
     224           2 :                 _, ok := c.mu.objects[n]
     225           2 :                 if !ok {
     226           2 :                         _, ok = toAdd[n]
     227           2 :                 }
     228           2 :                 return ok
     229             :         }
     230           2 :         for _, meta := range b.ve.NewObjects {
     231           2 :                 if exists(meta.FileNum) {
     232           1 :                         return errors.AssertionFailedf("adding existing object %s", meta.FileNum)
     233           1 :                 }
     234           2 :                 toAdd[meta.FileNum] = struct{}{}
     235             :         }
     236           2 :         for _, n := range b.ve.DeletedObjects {
     237           2 :                 if !exists(n) {
     238           1 :                         return errors.AssertionFailedf("deleting non-existent object %s", n)
     239           1 :                 }
     240             :         }
     241             : 
     242           2 :         if err := c.writeToCatalogFileLocked(&b.ve); err != nil {
     243           0 :                 return errors.Wrapf(err, "pebble: could not write to remote object catalog: %v", err)
     244           0 :         }
     245             : 
     246             :         // Add new objects before deleting any objects. This allows for cases where
     247             :         // the same batch adds and deletes an object.
     248           2 :         for _, meta := range b.ve.NewObjects {
     249           2 :                 c.mu.objects[meta.FileNum] = meta
     250           2 :         }
     251           2 :         for _, n := range b.ve.DeletedObjects {
     252           2 :                 delete(c.mu.objects, n)
     253           2 :         }
     254             : 
     255           2 :         return nil
     256             : }
     257             : 
     258           2 : func (c *Catalog) loadFromCatalogFile(filename string) error {
     259           2 :         catalogPath := c.fs.PathJoin(c.dirname, filename)
     260           2 :         f, err := c.fs.Open(catalogPath)
     261           2 :         if err != nil {
     262           0 :                 return errors.Wrapf(
     263           0 :                         err, "pebble: could not open remote object catalog file %q for DB %q",
     264           0 :                         errors.Safe(filename), c.dirname,
     265           0 :                 )
     266           0 :         }
     267           2 :         defer f.Close()
     268           2 :         rr := record.NewReader(f, 0 /* logNum */)
     269           2 :         for {
     270           2 :                 r, err := rr.Next()
     271           2 :                 if err == io.EOF || record.IsInvalidRecord(err) {
     272           2 :                         break
     273             :                 }
     274           2 :                 if err != nil {
     275           0 :                         return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
     276           0 :                                 errors.Safe(filename))
     277           0 :                 }
     278           2 :                 var ve VersionEdit
     279           2 :                 if err := ve.Decode(r); err != nil {
     280           0 :                         return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
     281           0 :                                 errors.Safe(filename))
     282           0 :                 }
     283             :                 // Apply the version edit to the current state.
     284           2 :                 if err := ve.Apply(&c.mu.creatorID, c.mu.objects); err != nil {
     285           0 :                         return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
     286           0 :                                 errors.Safe(filename))
     287           0 :                 }
     288             :         }
     289           2 :         return nil
     290             : }
     291             : 
     292             : // writeToCatalogFileLocked writes a VersionEdit to the catalog file.
     293             : // Creates a new file if this is the first write.
     294           2 : func (c *Catalog) writeToCatalogFileLocked(ve *VersionEdit) error {
     295           2 :         c.mu.rotationHelper.AddRecord(int64(len(ve.NewObjects) + len(ve.DeletedObjects)))
     296           2 :         snapshotSize := int64(len(c.mu.objects))
     297           2 : 
     298           2 :         var shouldRotate bool
     299           2 :         if c.mu.catalogFile == nil {
     300           2 :                 shouldRotate = true
     301           2 :         } else if c.mu.catalogRecWriter.Size() >= rotateFileSize {
     302           1 :                 shouldRotate = c.mu.rotationHelper.ShouldRotate(snapshotSize)
     303           1 :         }
     304             : 
     305           2 :         if shouldRotate {
     306           2 :                 if c.mu.catalogFile != nil {
     307           1 :                         if err := c.closeCatalogFile(); err != nil {
     308           0 :                                 return err
     309           0 :                         }
     310             :                 }
     311           2 :                 if err := c.createNewCatalogFileLocked(); err != nil {
     312           0 :                         return err
     313           0 :                 }
     314           2 :                 c.mu.rotationHelper.Rotate(snapshotSize)
     315             :         }
     316           2 :         return writeRecord(ve, c.mu.catalogFile, c.mu.catalogRecWriter)
     317             : }
     318             : 
     319           2 : func makeCatalogFilename(iter uint64) string {
     320           2 :         return fmt.Sprintf("%s-%06d", catalogFilenameBase, iter)
     321           2 : }
     322             : 
     323             : // createNewCatalogFileLocked creates a new catalog file, populates it with the
     324             : // current catalog and sets c.mu.catalogFile and c.mu.catalogRecWriter.
     325           2 : func (c *Catalog) createNewCatalogFileLocked() (outErr error) {
     326           2 :         if c.mu.catalogFile != nil {
     327           0 :                 return errors.AssertionFailedf("catalogFile already open")
     328           0 :         }
     329           2 :         filename := makeCatalogFilename(c.mu.marker.NextIter())
     330           2 :         filepath := c.fs.PathJoin(c.dirname, filename)
     331           2 :         file, err := c.fs.Create(filepath)
     332           2 :         if err != nil {
     333           0 :                 return err
     334           0 :         }
     335           2 :         recWriter := record.NewWriter(file)
     336           2 :         err = func() error {
     337           2 :                 // Create a VersionEdit that gets us from an empty catalog to the current state.
     338           2 :                 var ve VersionEdit
     339           2 :                 ve.CreatorID = c.mu.creatorID
     340           2 :                 ve.NewObjects = make([]RemoteObjectMetadata, 0, len(c.mu.objects))
     341           2 :                 for _, meta := range c.mu.objects {
     342           2 :                         ve.NewObjects = append(ve.NewObjects, meta)
     343           2 :                 }
     344           2 :                 if err := writeRecord(&ve, file, recWriter); err != nil {
     345           0 :                         return err
     346           0 :                 }
     347             : 
     348             :                 // Move the marker to the new filename. Move handles syncing the data
     349             :                 // directory as well.
     350           2 :                 if err := c.mu.marker.Move(filename); err != nil {
     351           0 :                         return errors.Wrap(err, "moving marker")
     352           0 :                 }
     353             : 
     354           2 :                 return nil
     355             :         }()
     356             : 
     357           2 :         if err != nil {
     358           0 :                 _ = recWriter.Close()
     359           0 :                 _ = file.Close()
     360           0 :                 _ = c.fs.Remove(filepath)
     361           0 :                 return err
     362           0 :         }
     363             : 
     364             :         // Remove any previous file (ignoring any error).
     365           2 :         if c.mu.catalogFilename != "" {
     366           2 :                 _ = c.fs.Remove(c.fs.PathJoin(c.dirname, c.mu.catalogFilename))
     367           2 :         }
     368             : 
     369           2 :         c.mu.catalogFile = file
     370           2 :         c.mu.catalogRecWriter = recWriter
     371           2 :         c.mu.catalogFilename = filename
     372           2 :         return nil
     373             : }
     374             : 
     375           2 : func writeRecord(ve *VersionEdit, file vfs.File, recWriter *record.Writer) error {
     376           2 :         w, err := recWriter.Next()
     377           2 :         if err != nil {
     378           0 :                 return err
     379           0 :         }
     380           2 :         if err := ve.Encode(w); err != nil {
     381           0 :                 return err
     382           0 :         }
     383           2 :         if err := recWriter.Flush(); err != nil {
     384           0 :                 return err
     385           0 :         }
     386           2 :         return file.Sync()
     387             : }

Generated by: LCOV version 1.14