Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package objstorageprovider
6 :
7 : import (
8 : "context"
9 :
10 : "github.com/cockroachdb/errors"
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/objstorage"
13 : "github.com/cockroachdb/pebble/vfs"
14 : )
15 :
16 1 : func (p *provider) vfsPath(fileType base.FileType, fileNum base.DiskFileNum) string {
17 1 : return base.MakeFilepath(p.st.FS, p.st.FSDirName, fileType, fileNum)
18 1 : }
19 :
20 : func (p *provider) vfsOpenForReading(
21 : ctx context.Context,
22 : fileType base.FileType,
23 : fileNum base.DiskFileNum,
24 : opts objstorage.OpenOptions,
25 1 : ) (objstorage.Readable, error) {
26 1 : filename := p.vfsPath(fileType, fileNum)
27 1 : file, err := p.st.FS.Open(filename, vfs.RandomReadsOption)
28 1 : if err != nil {
29 0 : if opts.MustExist {
30 0 : base.MustExist(p.st.FS, filename, p.st.Logger, err)
31 0 : }
32 0 : return nil, err
33 : }
34 1 : return newFileReadable(file, p.st.FS, p.st.Local.ReadaheadConfig, filename)
35 : }
36 :
37 : func (p *provider) vfsCreate(
38 : _ context.Context,
39 : fileType base.FileType,
40 : fileNum base.DiskFileNum,
41 : category vfs.DiskWriteCategory,
42 1 : ) (objstorage.Writable, objstorage.ObjectMetadata, error) {
43 1 : filename := p.vfsPath(fileType, fileNum)
44 1 : file, err := p.st.FS.Create(filename, category)
45 1 : if err != nil {
46 0 : return nil, objstorage.ObjectMetadata{}, err
47 0 : }
48 1 : file = vfs.NewSyncingFile(file, vfs.SyncingFileOptions{
49 1 : NoSyncOnClose: p.st.NoSyncOnClose,
50 1 : BytesPerSync: p.st.BytesPerSync,
51 1 : })
52 1 : meta := objstorage.ObjectMetadata{
53 1 : DiskFileNum: fileNum,
54 1 : FileType: fileType,
55 1 : }
56 1 : return newFileBufferedWritable(file), meta, nil
57 : }
58 :
59 1 : func (p *provider) vfsRemove(fileType base.FileType, fileNum base.DiskFileNum) error {
60 1 : return p.st.FSCleaner.Clean(p.st.FS, fileType, p.vfsPath(fileType, fileNum))
61 1 : }
62 :
63 : // vfsInit finds any local FS objects.
64 1 : func (p *provider) vfsInit() error {
65 1 : listing := p.st.FSDirInitialListing
66 1 : if listing == nil {
67 0 : var err error
68 0 : listing, err = p.st.FS.List(p.st.FSDirName)
69 0 : if err != nil {
70 0 : return errors.Wrapf(err, "pebble: could not list store directory")
71 0 : }
72 : }
73 :
74 1 : for _, filename := range listing {
75 1 : fileType, fileNum, ok := base.ParseFilename(p.st.FS, filename)
76 1 : if ok && fileType == base.FileTypeTable {
77 1 : o := objstorage.ObjectMetadata{
78 1 : FileType: fileType,
79 1 : DiskFileNum: fileNum,
80 1 : }
81 1 : p.mu.knownObjects[o.DiskFileNum] = o
82 1 : }
83 : }
84 1 : return nil
85 : }
86 :
87 1 : func (p *provider) vfsSync() error {
88 1 : p.mu.Lock()
89 1 : counterVal := p.mu.localObjectsChangeCounter
90 1 : lastSynced := p.mu.localObjectsChangeCounterSynced
91 1 : p.mu.Unlock()
92 1 :
93 1 : if lastSynced >= counterVal {
94 1 : return nil
95 1 : }
96 1 : if err := p.fsDir.Sync(); err != nil {
97 0 : return err
98 0 : }
99 :
100 1 : p.mu.Lock()
101 1 : if p.mu.localObjectsChangeCounterSynced < counterVal {
102 1 : p.mu.localObjectsChangeCounterSynced = counterVal
103 1 : }
104 1 : p.mu.Unlock()
105 1 :
106 1 : return nil
107 : }
108 :
109 1 : func (p *provider) vfsSize(fileType base.FileType, fileNum base.DiskFileNum) (int64, error) {
110 1 : filename := p.vfsPath(fileType, fileNum)
111 1 : stat, err := p.st.FS.Stat(filename)
112 1 : if err != nil {
113 1 : return 0, err
114 1 : }
115 1 : return stat.Size(), nil
116 : }
|