LCOV - code coverage report
Current view: top level - pebble/objstorage/objstorageprovider/remoteobjcat - catalog.go (source / functions) Hit Total Coverage
Test: 2024-06-15 08:15Z 0e4fb77b - tests only.lcov Lines: 187 248 75.4 %
Date: 2024-06-15 08:16:19 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package remoteobjcat
       6             : 
       7             : import (
       8             :         "cmp"
       9             :         "fmt"
      10             :         "io"
      11             :         "path/filepath"
      12             :         "slices"
      13             :         "sync"
      14             : 
      15             :         "github.com/cockroachdb/errors"
      16             :         "github.com/cockroachdb/pebble/internal/base"
      17             :         "github.com/cockroachdb/pebble/objstorage"
      18             :         "github.com/cockroachdb/pebble/objstorage/remote"
      19             :         "github.com/cockroachdb/pebble/record"
      20             :         "github.com/cockroachdb/pebble/vfs"
      21             :         "github.com/cockroachdb/pebble/vfs/atomicfs"
      22             : )
      23             : 
      24             : // Catalog is used to manage the on-disk remote object catalog.
      25             : //
      26             : // The catalog file is a log of records, where each record is an encoded
      27             : // VersionEdit.
      28             : type Catalog struct {
      29             :         fs      vfs.FS
      30             :         dirname string
      31             :         mu      struct {
      32             :                 sync.Mutex
      33             : 
      34             :                 creatorID objstorage.CreatorID
      35             :                 objects   map[base.DiskFileNum]RemoteObjectMetadata
      36             : 
      37             :                 marker *atomicfs.Marker
      38             : 
      39             :                 catalogFile      vfs.File
      40             :                 catalogRecWriter *record.Writer
      41             : 
      42             :                 rotationHelper record.RotationHelper
      43             : 
      44             :                 // catalogFilename is the filename of catalogFile when catalogFile != nil, otherwise
      45             :                 // it is the filename of the last catalog file.
      46             :                 catalogFilename string
      47             :         }
      48             : }
      49             : 
      50             : // RemoteObjectMetadata encapsulates the data stored in the catalog file for each object.
      51             : type RemoteObjectMetadata struct {
      52             :         // FileNum is the identifier for the object within the context of a single DB
      53             :         // instance.
      54             :         FileNum base.DiskFileNum
      55             :         // FileType is the type of the object. Only certain FileTypes are possible.
      56             :         FileType base.FileType
      57             :         // CreatorID identifies the DB instance that originally created the object.
      58             :         CreatorID objstorage.CreatorID
      59             :         // CreatorFileNum is the identifier for the object within the context of the
      60             :         // DB instance that originally created the object.
      61             :         CreatorFileNum base.DiskFileNum
      62             :         // CleanupMethod indicates the method for cleaning up unused shared objects.
      63             :         CleanupMethod objstorage.SharedCleanupMethod
      64             :         // Locator identifies a remote.Storage implementation.
      65             :         Locator remote.Locator
      66             :         // CustomObjectName (if it is set) overrides the object name that is normally
      67             :         // derived from the CreatorID and CreatorFileNum.
      68             :         CustomObjectName string
      69             : }
      70             : 
      71             : const (
      72             :         catalogFilenameBase = "REMOTE-OBJ-CATALOG"
      73             :         catalogMarkerName   = "remote-obj-catalog"
      74             : 
      75             :         // We create a new file when the size exceeds 1MB (and some other conditions
      76             :         // hold; see record.RotationHelper).
      77             :         rotateFileSize = 1024 * 1024 // 1MB
      78             : )
      79             : 
      80             : // CatalogContents contains the remote objects in the catalog.
      81             : type CatalogContents struct {
      82             :         // CreatorID, if it is set.
      83             :         CreatorID objstorage.CreatorID
      84             :         Objects   []RemoteObjectMetadata
      85             : }
      86             : 
      87             : // Open creates a Catalog and loads any existing catalog file, returning the
      88             : // creator ID (if it is set) and the contents.
      89           1 : func Open(fs vfs.FS, dirname string) (*Catalog, CatalogContents, error) {
      90           1 :         c := &Catalog{
      91           1 :                 fs:      fs,
      92           1 :                 dirname: dirname,
      93           1 :         }
      94           1 :         c.mu.objects = make(map[base.DiskFileNum]RemoteObjectMetadata)
      95           1 : 
      96           1 :         var err error
      97           1 :         c.mu.marker, c.mu.catalogFilename, err = atomicfs.LocateMarker(fs, dirname, catalogMarkerName)
      98           1 :         if err != nil {
      99           0 :                 return nil, CatalogContents{}, err
     100           0 :         }
     101             :         // If the filename is empty, there is no existing catalog.
     102           1 :         if c.mu.catalogFilename != "" {
     103           1 :                 if err := c.loadFromCatalogFile(c.mu.catalogFilename); err != nil {
     104           0 :                         return nil, CatalogContents{}, err
     105           0 :                 }
     106           1 :                 if err := c.mu.marker.RemoveObsolete(); err != nil {
     107           0 :                         return nil, CatalogContents{}, err
     108           0 :                 }
     109             :                 // TODO(radu): remove obsolete catalog files.
     110             :         }
     111           1 :         res := CatalogContents{
     112           1 :                 CreatorID: c.mu.creatorID,
     113           1 :                 Objects:   make([]RemoteObjectMetadata, 0, len(c.mu.objects)),
     114           1 :         }
     115           1 :         for _, meta := range c.mu.objects {
     116           1 :                 res.Objects = append(res.Objects, meta)
     117           1 :         }
     118             :         // Sort the objects so the function is deterministic.
     119           1 :         slices.SortFunc(res.Objects, func(a, b RemoteObjectMetadata) int {
     120           1 :                 return cmp.Compare(a.FileNum, b.FileNum)
     121           1 :         })
     122           1 :         return c, res, nil
     123             : }
     124             : 
     125             : // SetCreatorID sets the creator ID. If it is already set, it must match.
     126           1 : func (c *Catalog) SetCreatorID(id objstorage.CreatorID) error {
     127           1 :         if !id.IsSet() {
     128           0 :                 return base.AssertionFailedf("attempt to unset CreatorID")
     129           0 :         }
     130             : 
     131           1 :         c.mu.Lock()
     132           1 :         defer c.mu.Unlock()
     133           1 : 
     134           1 :         if c.mu.creatorID.IsSet() {
     135           1 :                 if c.mu.creatorID != id {
     136           1 :                         return base.AssertionFailedf("attempt to change CreatorID from %s to %s", c.mu.creatorID, id)
     137           1 :                 }
     138           1 :                 return nil
     139             :         }
     140             : 
     141           1 :         ve := VersionEdit{CreatorID: id}
     142           1 :         if err := c.writeToCatalogFileLocked(&ve); err != nil {
     143           0 :                 return errors.Wrapf(err, "pebble: could not write to remote object catalog")
     144           0 :         }
     145           1 :         c.mu.creatorID = id
     146           1 :         return nil
     147             : }
     148             : 
     149             : // Close any open files.
     150           1 : func (c *Catalog) Close() error {
     151           1 :         var err error
     152           1 :         if c.mu.marker != nil {
     153           1 :                 err = c.mu.marker.Close()
     154           1 :                 c.mu.marker = nil
     155           1 :         }
     156           1 :         return errors.CombineErrors(err, c.closeCatalogFile())
     157             : }
     158             : 
     159           1 : func (c *Catalog) closeCatalogFile() error {
     160           1 :         if c.mu.catalogFile == nil {
     161           1 :                 return nil
     162           1 :         }
     163           1 :         err1 := c.mu.catalogRecWriter.Close()
     164           1 :         err2 := c.mu.catalogFile.Close()
     165           1 :         c.mu.catalogRecWriter = nil
     166           1 :         c.mu.catalogFile = nil
     167           1 :         if err1 != nil {
     168           0 :                 return err1
     169           0 :         }
     170           1 :         return err2
     171             : }
     172             : 
     173             : // Batch is used to perform multiple object additions/deletions at once.
     174             : type Batch struct {
     175             :         ve VersionEdit
     176             : }
     177             : 
     178             : // AddObject adds a new object to the batch.
     179             : //
     180             : // The given FileNum must be new - it must not match that of any object that was
     181             : // ever in the catalog.
     182           1 : func (b *Batch) AddObject(meta RemoteObjectMetadata) {
     183           1 :         b.ve.NewObjects = append(b.ve.NewObjects, meta)
     184           1 : }
     185             : 
     186             : // DeleteObject adds an object removal to the batch.
     187           1 : func (b *Batch) DeleteObject(fileNum base.DiskFileNum) {
     188           1 :         b.ve.DeletedObjects = append(b.ve.DeletedObjects, fileNum)
     189           1 : }
     190             : 
     191             : // Reset clears the batch.
     192           1 : func (b *Batch) Reset() {
     193           1 :         b.ve.NewObjects = b.ve.NewObjects[:0]
     194           1 :         b.ve.DeletedObjects = b.ve.DeletedObjects[:0]
     195           1 : }
     196             : 
     197             : // IsEmpty returns true if the batch is empty.
     198           1 : func (b *Batch) IsEmpty() bool {
     199           1 :         return len(b.ve.NewObjects) == 0 && len(b.ve.DeletedObjects) == 0
     200           1 : }
     201             : 
     202             : // Copy returns a copy of the Batch.
     203           1 : func (b *Batch) Copy() Batch {
     204           1 :         var res Batch
     205           1 :         if len(b.ve.NewObjects) > 0 {
     206           1 :                 res.ve.NewObjects = make([]RemoteObjectMetadata, len(b.ve.NewObjects))
     207           1 :                 copy(res.ve.NewObjects, b.ve.NewObjects)
     208           1 :         }
     209           1 :         if len(b.ve.DeletedObjects) > 0 {
     210           1 :                 res.ve.DeletedObjects = make([]base.DiskFileNum, len(b.ve.DeletedObjects))
     211           1 :                 copy(res.ve.DeletedObjects, b.ve.DeletedObjects)
     212           1 :         }
     213           1 :         return res
     214             : }
     215             : 
     216             : // Append merges two batches.
     217           0 : func (b *Batch) Append(other Batch) {
     218           0 :         b.ve.NewObjects = append(b.ve.NewObjects, other.ve.NewObjects...)
     219           0 :         b.ve.DeletedObjects = append(b.ve.DeletedObjects, other.ve.DeletedObjects...)
     220           0 : }
     221             : 
     222             : // ApplyBatch applies a batch of updates; returns after the change is stably
     223             : // recorded on storage.
     224           1 : func (c *Catalog) ApplyBatch(b Batch) error {
     225           1 :         c.mu.Lock()
     226           1 :         defer c.mu.Unlock()
     227           1 : 
     228           1 :         // Sanity checks.
     229           1 :         toAdd := make(map[base.DiskFileNum]struct{}, len(b.ve.NewObjects))
     230           1 :         exists := func(n base.DiskFileNum) bool {
     231           1 :                 _, ok := c.mu.objects[n]
     232           1 :                 if !ok {
     233           1 :                         _, ok = toAdd[n]
     234           1 :                 }
     235           1 :                 return ok
     236             :         }
     237           1 :         for _, meta := range b.ve.NewObjects {
     238           1 :                 if exists(meta.FileNum) {
     239           1 :                         return base.AssertionFailedf("adding existing object %s", meta.FileNum)
     240           1 :                 }
     241           1 :                 toAdd[meta.FileNum] = struct{}{}
     242             :         }
     243           1 :         for _, n := range b.ve.DeletedObjects {
     244           1 :                 if !exists(n) {
     245           1 :                         return base.AssertionFailedf("deleting non-existent object %s", n)
     246           1 :                 }
     247             :         }
     248             : 
     249           1 :         if err := c.writeToCatalogFileLocked(&b.ve); err != nil {
     250           0 :                 return errors.Wrapf(err, "pebble: could not write to remote object catalog")
     251           0 :         }
     252             : 
     253             :         // Add new objects before deleting any objects. This allows for cases where
     254             :         // the same batch adds and deletes an object.
     255           1 :         for _, meta := range b.ve.NewObjects {
     256           1 :                 c.mu.objects[meta.FileNum] = meta
     257           1 :         }
     258           1 :         for _, n := range b.ve.DeletedObjects {
     259           1 :                 delete(c.mu.objects, n)
     260           1 :         }
     261             : 
     262           1 :         return nil
     263             : }
     264             : 
     265           1 : func (c *Catalog) loadFromCatalogFile(filename string) error {
     266           1 :         catalogPath := c.fs.PathJoin(c.dirname, filename)
     267           1 :         f, err := c.fs.Open(catalogPath)
     268           1 :         if err != nil {
     269           0 :                 return errors.Wrapf(
     270           0 :                         err, "pebble: could not open remote object catalog file %q for DB %q",
     271           0 :                         errors.Safe(filename), c.dirname,
     272           0 :                 )
     273           0 :         }
     274           1 :         defer f.Close()
     275           1 :         rr := record.NewReader(f, 0 /* logNum */)
     276           1 :         for {
     277           1 :                 r, err := rr.Next()
     278           1 :                 if err == io.EOF || record.IsInvalidRecord(err) {
     279           1 :                         break
     280             :                 }
     281           1 :                 if err != nil {
     282           0 :                         return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
     283           0 :                                 errors.Safe(filename))
     284           0 :                 }
     285           1 :                 var ve VersionEdit
     286           1 :                 if err := ve.Decode(r); err != nil {
     287           0 :                         return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
     288           0 :                                 errors.Safe(filename))
     289           0 :                 }
     290             :                 // Apply the version edit to the current state.
     291           1 :                 if err := ve.Apply(&c.mu.creatorID, c.mu.objects); err != nil {
     292           0 :                         return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
     293           0 :                                 errors.Safe(filename))
     294           0 :                 }
     295             :         }
     296           1 :         return nil
     297             : }
     298             : 
     299             : // writeToCatalogFileLocked writes a VersionEdit to the catalog file.
     300             : // Creates a new file if this is the first write.
     301           1 : func (c *Catalog) writeToCatalogFileLocked(ve *VersionEdit) error {
     302           1 :         c.mu.rotationHelper.AddRecord(int64(len(ve.NewObjects) + len(ve.DeletedObjects)))
     303           1 :         snapshotSize := int64(len(c.mu.objects))
     304           1 : 
     305           1 :         var shouldRotate bool
     306           1 :         if c.mu.catalogFile == nil {
     307           1 :                 shouldRotate = true
     308           1 :         } else if c.mu.catalogRecWriter.Size() >= rotateFileSize {
     309           1 :                 shouldRotate = c.mu.rotationHelper.ShouldRotate(snapshotSize)
     310           1 :         }
     311             : 
     312           1 :         if shouldRotate {
     313           1 :                 if c.mu.catalogFile != nil {
     314           1 :                         if err := c.closeCatalogFile(); err != nil {
     315           0 :                                 return err
     316           0 :                         }
     317             :                 }
     318           1 :                 if err := c.createNewCatalogFileLocked(); err != nil {
     319           0 :                         return err
     320           0 :                 }
     321           1 :                 c.mu.rotationHelper.Rotate(snapshotSize)
     322             :         }
     323           1 :         return writeRecord(ve, c.mu.catalogFile, c.mu.catalogRecWriter)
     324             : }
     325             : 
     326           1 : func makeCatalogFilename(iter uint64) string {
     327           1 :         return fmt.Sprintf("%s-%06d", catalogFilenameBase, iter)
     328           1 : }
     329             : 
     330             : // createNewCatalogFileLocked creates a new catalog file, populates it with the
     331             : // current catalog and sets c.mu.catalogFile and c.mu.catalogRecWriter.
     332           1 : func (c *Catalog) createNewCatalogFileLocked() (outErr error) {
     333           1 :         if c.mu.catalogFile != nil {
     334           0 :                 return base.AssertionFailedf("catalogFile already open")
     335           0 :         }
     336           1 :         filename := makeCatalogFilename(c.mu.marker.NextIter())
     337           1 :         filepath := c.fs.PathJoin(c.dirname, filename)
     338           1 :         file, err := c.fs.Create(filepath, "pebble-manifest")
     339           1 :         if err != nil {
     340           0 :                 return err
     341           0 :         }
     342           1 :         recWriter := record.NewWriter(file)
     343           1 :         err = func() error {
     344           1 :                 // Create a VersionEdit that gets us from an empty catalog to the current state.
     345           1 :                 var ve VersionEdit
     346           1 :                 ve.CreatorID = c.mu.creatorID
     347           1 :                 ve.NewObjects = make([]RemoteObjectMetadata, 0, len(c.mu.objects))
     348           1 :                 for _, meta := range c.mu.objects {
     349           1 :                         ve.NewObjects = append(ve.NewObjects, meta)
     350           1 :                 }
     351           1 :                 if err := writeRecord(&ve, file, recWriter); err != nil {
     352           0 :                         return err
     353           0 :                 }
     354             : 
     355             :                 // Move the marker to the new filename. Move handles syncing the data
     356             :                 // directory as well.
     357           1 :                 if err := c.mu.marker.Move(filename); err != nil {
     358           0 :                         return errors.Wrap(err, "moving marker")
     359           0 :                 }
     360             : 
     361           1 :                 return nil
     362             :         }()
     363             : 
     364           1 :         if err != nil {
     365           0 :                 _ = recWriter.Close()
     366           0 :                 _ = file.Close()
     367           0 :                 _ = c.fs.Remove(filepath)
     368           0 :                 return err
     369           0 :         }
     370             : 
     371             :         // Remove any previous file (ignoring any error).
     372           1 :         if c.mu.catalogFilename != "" {
     373           1 :                 _ = c.fs.Remove(c.fs.PathJoin(c.dirname, c.mu.catalogFilename))
     374           1 :         }
     375             : 
     376           1 :         c.mu.catalogFile = file
     377           1 :         c.mu.catalogRecWriter = recWriter
     378           1 :         c.mu.catalogFilename = filename
     379           1 :         return nil
     380             : }
     381             : 
     382             : // Checkpoint copies catalog state to a file in the specified directory
     383           1 : func (c *Catalog) Checkpoint(fs vfs.FS, dir string) error {
     384           1 :         c.mu.Lock()
     385           1 :         defer c.mu.Unlock()
     386           1 : 
     387           1 :         // NB: Every write to recWriter is flushed. We don't need to worry about
     388           1 :         // this new file descriptor not getting all the saved catalog entries.
     389           1 :         existingCatalogFilepath := filepath.Join(c.dirname, c.mu.catalogFilename)
     390           1 :         destPath := filepath.Join(dir, c.mu.catalogFilename)
     391           1 :         if err := vfs.CopyAcrossFS(c.fs, existingCatalogFilepath, fs, destPath); err != nil {
     392           0 :                 return err
     393           0 :         }
     394           1 :         catalogMarker, _, err := atomicfs.LocateMarker(fs, dir, catalogMarkerName)
     395           1 :         if err != nil {
     396           0 :                 return err
     397           0 :         }
     398           1 :         if err := catalogMarker.Move(c.mu.catalogFilename); err != nil {
     399           0 :                 return err
     400           0 :         }
     401           1 :         return catalogMarker.Close()
     402             : }
     403             : 
     404           1 : func writeRecord(ve *VersionEdit, file vfs.File, recWriter *record.Writer) error {
     405           1 :         w, err := recWriter.Next()
     406           1 :         if err != nil {
     407           0 :                 return err
     408           0 :         }
     409           1 :         if err := ve.Encode(w); err != nil {
     410           0 :                 return err
     411           0 :         }
     412           1 :         if err := recWriter.Flush(); err != nil {
     413           0 :                 return err
     414           0 :         }
     415           1 :         return file.Sync()
     416             : }

Generated by: LCOV version 1.14