Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package objstorageprovider
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "runtime"
11 : "sync"
12 : "sync/atomic"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/invariants"
17 : "github.com/cockroachdb/pebble/objstorage"
18 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
19 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
20 : "github.com/cockroachdb/pebble/objstorage/remote"
21 : "github.com/cockroachdb/redact"
22 : )
23 :
24 : // remoteSubsystem contains the provider fields related to remote storage.
25 : // All fields remain unset if remote storage is not configured.
26 : type remoteSubsystem struct {
27 : catalog *remoteobjcat.Catalog
28 : // catalogSyncMutex is used to correctly serialize two sharedSync operations.
29 : // It must be acquired before the provider mutex.
30 : catalogSyncMutex sync.Mutex
31 :
32 : cache *sharedcache.Cache
33 :
34 : // shared contains the fields relevant to shared objects, i.e. objects that
35 : // are created by Pebble and potentially shared between Pebble instances.
36 : shared struct {
37 : // initialized guards access to the creatorID field.
38 : initialized atomic.Bool
39 : creatorID objstorage.CreatorID
40 : initOnce sync.Once
41 :
42 : // checkRefsOnOpen controls whether we check the ref marker file when opening
43 : // an object. Normally this is true when invariants are enabled (but the provider
44 : // test tweaks this field).
45 : checkRefsOnOpen bool
46 : }
47 : }
48 :
49 : // remoteInit initializes the remote object subsystem (if configured) and finds
50 : // any remote objects.
51 2 : func (p *provider) remoteInit() error {
52 2 : if p.st.Remote.StorageFactory == nil {
53 2 : return nil
54 2 : }
55 2 : catalog, contents, err := remoteobjcat.Open(p.st.FS, p.st.FSDirName)
56 2 : if err != nil {
57 0 : return errors.Wrapf(err, "pebble: could not open remote object catalog")
58 0 : }
59 2 : p.remote.catalog = catalog
60 2 : p.remote.shared.checkRefsOnOpen = invariants.Enabled
61 2 :
62 2 : // The creator ID may or may not be initialized yet.
63 2 : if contents.CreatorID.IsSet() {
64 2 : p.remote.initShared(contents.CreatorID)
65 2 : p.st.Logger.Infof("remote storage configured; creatorID = %s", contents.CreatorID)
66 2 : } else {
67 2 : p.st.Logger.Infof("remote storage configured; no creatorID yet")
68 2 : }
69 :
70 2 : if p.st.Remote.CacheSizeBytes > 0 {
71 1 : const defaultBlockSize = 32 * 1024
72 1 : blockSize := p.st.Remote.CacheBlockSize
73 1 : if blockSize == 0 {
74 1 : blockSize = defaultBlockSize
75 1 : }
76 :
77 1 : const defaultShardingBlockSize = 1024 * 1024
78 1 : shardingBlockSize := p.st.Remote.ShardingBlockSize
79 1 : if shardingBlockSize == 0 {
80 1 : shardingBlockSize = defaultShardingBlockSize
81 1 : }
82 :
83 1 : numShards := p.st.Remote.CacheShardCount
84 1 : if numShards == 0 {
85 1 : numShards = 2 * runtime.GOMAXPROCS(0)
86 1 : }
87 :
88 1 : p.remote.cache, err = sharedcache.Open(
89 1 : p.st.FS, p.st.Logger, p.st.FSDirName, blockSize, shardingBlockSize, p.st.Remote.CacheSizeBytes, numShards)
90 1 : if err != nil {
91 0 : return errors.Wrapf(err, "pebble: could not open remote object cache")
92 0 : }
93 : }
94 :
95 2 : for _, meta := range contents.Objects {
96 2 : o := objstorage.ObjectMetadata{
97 2 : DiskFileNum: meta.FileNum,
98 2 : FileType: meta.FileType,
99 2 : }
100 2 : o.Remote.CreatorID = meta.CreatorID
101 2 : o.Remote.CreatorFileNum = meta.CreatorFileNum
102 2 : o.Remote.CleanupMethod = meta.CleanupMethod
103 2 : o.Remote.Locator = meta.Locator
104 2 : o.Remote.CustomObjectName = meta.CustomObjectName
105 2 : o.Remote.Storage, err = p.ensureStorageLocked(o.Remote.Locator)
106 2 : if err != nil {
107 0 : return errors.Wrapf(err, "creating remote.Storage object for locator '%s'", o.Remote.Locator)
108 0 : }
109 2 : if invariants.Enabled {
110 2 : o.AssertValid()
111 2 : }
112 2 : p.mu.knownObjects[o.DiskFileNum] = o
113 : }
114 2 : return nil
115 : }
116 :
117 : // initShared initializes the creator ID, allowing use of shared objects.
118 2 : func (ss *remoteSubsystem) initShared(creatorID objstorage.CreatorID) {
119 2 : ss.shared.initOnce.Do(func() {
120 2 : ss.shared.creatorID = creatorID
121 2 : ss.shared.initialized.Store(true)
122 2 : })
123 : }
124 :
125 2 : func (p *provider) sharedClose() error {
126 2 : if p.st.Remote.StorageFactory == nil {
127 2 : return nil
128 2 : }
129 2 : var err error
130 2 : if p.remote.cache != nil {
131 1 : err = p.remote.cache.Close()
132 1 : p.remote.cache = nil
133 1 : }
134 2 : if p.remote.catalog != nil {
135 2 : err = firstError(err, p.remote.catalog.Close())
136 2 : p.remote.catalog = nil
137 2 : }
138 2 : return err
139 : }
140 :
141 : // SetCreatorID is part of the objstorage.Provider interface.
142 2 : func (p *provider) SetCreatorID(creatorID objstorage.CreatorID) error {
143 2 : if p.st.Remote.StorageFactory == nil {
144 0 : return errors.AssertionFailedf("attempt to set CreatorID but remote storage not enabled")
145 0 : }
146 : // Note: this call is a cheap no-op if the creator ID was already set. This
147 : // call also checks if we are trying to change the ID.
148 2 : if err := p.remote.catalog.SetCreatorID(creatorID); err != nil {
149 0 : return err
150 0 : }
151 2 : if !p.remote.shared.initialized.Load() {
152 2 : p.st.Logger.Infof("remote storage creatorID set to %s", creatorID)
153 2 : p.remote.initShared(creatorID)
154 2 : }
155 2 : return nil
156 : }
157 :
158 : // IsSharedForeign is part of the objstorage.Provider interface.
159 2 : func (p *provider) IsSharedForeign(meta objstorage.ObjectMetadata) bool {
160 2 : if !p.remote.shared.initialized.Load() {
161 2 : return false
162 2 : }
163 2 : return meta.IsShared() && (meta.Remote.CreatorID != p.remote.shared.creatorID)
164 : }
165 :
166 2 : func (p *provider) remoteCheckInitialized() error {
167 2 : if p.st.Remote.StorageFactory == nil {
168 0 : return errors.Errorf("remote object support not configured")
169 0 : }
170 2 : return nil
171 : }
172 :
173 2 : func (p *provider) sharedCheckInitialized() error {
174 2 : if err := p.remoteCheckInitialized(); err != nil {
175 0 : return err
176 0 : }
177 2 : if !p.remote.shared.initialized.Load() {
178 0 : return errors.Errorf("remote object support not available: remote creator ID not yet set")
179 0 : }
180 2 : return nil
181 : }
182 :
183 2 : func (p *provider) sharedSync() error {
184 2 : // Serialize parallel sync operations. Note that ApplyBatch is already
185 2 : // serialized internally, but we want to make sure they get called with
186 2 : // batches in the right order.
187 2 : p.remote.catalogSyncMutex.Lock()
188 2 : defer p.remote.catalogSyncMutex.Unlock()
189 2 :
190 2 : batch := func() remoteobjcat.Batch {
191 2 : p.mu.Lock()
192 2 : defer p.mu.Unlock()
193 2 : res := p.mu.remote.catalogBatch.Copy()
194 2 : p.mu.remote.catalogBatch.Reset()
195 2 : return res
196 2 : }()
197 :
198 2 : if batch.IsEmpty() {
199 2 : return nil
200 2 : }
201 :
202 2 : if err := p.remote.catalog.ApplyBatch(batch); err != nil {
203 0 : // Put back the batch (for the next Sync), appending any operations that
204 0 : // happened in the meantime.
205 0 : p.mu.Lock()
206 0 : defer p.mu.Unlock()
207 0 : batch.Append(p.mu.remote.catalogBatch)
208 0 : p.mu.remote.catalogBatch = batch
209 0 : return err
210 0 : }
211 :
212 2 : return nil
213 : }
214 :
215 2 : func (p *provider) remotePath(meta objstorage.ObjectMetadata) string {
216 2 : if meta.Remote.Locator != "" {
217 1 : return fmt.Sprintf("remote-%s://%s", meta.Remote.Locator, remoteObjectName(meta))
218 1 : }
219 2 : return "remote://" + remoteObjectName(meta)
220 : }
221 :
222 : // sharedCreateRef creates a reference marker object.
223 2 : func (p *provider) sharedCreateRef(meta objstorage.ObjectMetadata) error {
224 2 : if err := p.sharedCheckInitialized(); err != nil {
225 0 : return err
226 0 : }
227 2 : if meta.Remote.CleanupMethod != objstorage.SharedRefTracking {
228 1 : return nil
229 1 : }
230 2 : refName := p.sharedObjectRefName(meta)
231 2 : writer, err := meta.Remote.Storage.CreateObject(refName)
232 2 : if err == nil {
233 2 : // The object is empty, just close the writer.
234 2 : err = writer.Close()
235 2 : }
236 2 : if err != nil {
237 0 : return errors.Wrapf(err, "creating marker object %q", errors.Safe(refName))
238 0 : }
239 2 : return nil
240 : }
241 :
242 : func (p *provider) sharedCreate(
243 : _ context.Context,
244 : fileType base.FileType,
245 : fileNum base.DiskFileNum,
246 : locator remote.Locator,
247 : opts objstorage.CreateOptions,
248 2 : ) (objstorage.Writable, objstorage.ObjectMetadata, error) {
249 2 : if err := p.sharedCheckInitialized(); err != nil {
250 0 : return nil, objstorage.ObjectMetadata{}, err
251 0 : }
252 2 : storage, err := p.ensureStorage(locator)
253 2 : if err != nil {
254 0 : return nil, objstorage.ObjectMetadata{}, err
255 0 : }
256 2 : meta := objstorage.ObjectMetadata{
257 2 : DiskFileNum: fileNum,
258 2 : FileType: fileType,
259 2 : }
260 2 : meta.Remote.CreatorID = p.remote.shared.creatorID
261 2 : meta.Remote.CreatorFileNum = fileNum
262 2 : meta.Remote.CleanupMethod = opts.SharedCleanupMethod
263 2 : meta.Remote.Locator = locator
264 2 : meta.Remote.Storage = storage
265 2 :
266 2 : objName := remoteObjectName(meta)
267 2 : writer, err := storage.CreateObject(objName)
268 2 : if err != nil {
269 0 : return nil, objstorage.ObjectMetadata{}, errors.Wrapf(err, "creating object %q", errors.Safe(objName))
270 0 : }
271 2 : return &sharedWritable{
272 2 : p: p,
273 2 : meta: meta,
274 2 : storageWriter: writer,
275 2 : }, meta, nil
276 : }
277 :
278 : func (p *provider) remoteOpenForReading(
279 : ctx context.Context, meta objstorage.ObjectMetadata, opts objstorage.OpenOptions,
280 2 : ) (objstorage.Readable, error) {
281 2 : if err := p.remoteCheckInitialized(); err != nil {
282 0 : return nil, err
283 0 : }
284 : // Verify we have a reference on this object; for performance reasons, we only
285 : // do this in testing scenarios.
286 2 : if p.remote.shared.checkRefsOnOpen && meta.Remote.CleanupMethod == objstorage.SharedRefTracking {
287 2 : if err := p.sharedCheckInitialized(); err != nil {
288 0 : return nil, err
289 0 : }
290 2 : refName := p.sharedObjectRefName(meta)
291 2 : if _, err := meta.Remote.Storage.Size(refName); err != nil {
292 0 : if meta.Remote.Storage.IsNotExistError(err) {
293 0 : if opts.MustExist {
294 0 : p.st.Logger.Fatalf("marker object %q does not exist", errors.Safe(refName))
295 0 : // TODO(radu): maybe list references for the object.
296 0 : }
297 0 : return nil, errors.Errorf("marker object %q does not exist", errors.Safe(refName))
298 : }
299 0 : return nil, errors.Wrapf(err, "checking marker object %q", errors.Safe(refName))
300 : }
301 : }
302 2 : objName := remoteObjectName(meta)
303 2 : reader, size, err := meta.Remote.Storage.ReadObject(ctx, objName)
304 2 : if err != nil {
305 1 : if opts.MustExist && meta.Remote.Storage.IsNotExistError(err) {
306 0 : p.st.Logger.Fatalf("object %q does not exist", redact.SafeString(objName))
307 0 : // TODO(radu): maybe list references for the object.
308 0 : }
309 1 : return nil, err
310 : }
311 2 : return p.newRemoteReadable(reader, size, meta.DiskFileNum), nil
312 : }
313 :
314 2 : func (p *provider) remoteSize(meta objstorage.ObjectMetadata) (int64, error) {
315 2 : if err := p.remoteCheckInitialized(); err != nil {
316 0 : return 0, err
317 0 : }
318 2 : objName := remoteObjectName(meta)
319 2 : return meta.Remote.Storage.Size(objName)
320 : }
321 :
322 : // sharedUnref implements object "removal" with the remote backend. The ref
323 : // marker object is removed and the backing object is removed only if there are
324 : // no other ref markers.
325 2 : func (p *provider) sharedUnref(meta objstorage.ObjectMetadata) error {
326 2 : if meta.Remote.CleanupMethod == objstorage.SharedNoCleanup {
327 1 : // Never delete objects in this mode.
328 1 : return nil
329 1 : }
330 2 : if p.isProtected(meta.DiskFileNum) {
331 1 : // TODO(radu): we need a mechanism to unref the object when it becomes
332 1 : // unprotected.
333 1 : return nil
334 1 : }
335 :
336 2 : refName := p.sharedObjectRefName(meta)
337 2 : // Tolerate a not-exists error.
338 2 : if err := meta.Remote.Storage.Delete(refName); err != nil && !meta.Remote.Storage.IsNotExistError(err) {
339 0 : return err
340 0 : }
341 2 : otherRefs, err := meta.Remote.Storage.List(sharedObjectRefPrefix(meta), "" /* delimiter */)
342 2 : if err != nil {
343 0 : return err
344 0 : }
345 2 : if len(otherRefs) == 0 {
346 2 : objName := remoteObjectName(meta)
347 2 : if err := meta.Remote.Storage.Delete(objName); err != nil && !meta.Remote.Storage.IsNotExistError(err) {
348 0 : return err
349 0 : }
350 : }
351 2 : return nil
352 : }
353 :
354 : // ensureStorageLocked populates the remote.Storage object for the given
355 : // locator, if necessary. p.mu must be held.
356 2 : func (p *provider) ensureStorageLocked(locator remote.Locator) (remote.Storage, error) {
357 2 : if p.mu.remote.storageObjects == nil {
358 2 : p.mu.remote.storageObjects = make(map[remote.Locator]remote.Storage)
359 2 : }
360 2 : if res, ok := p.mu.remote.storageObjects[locator]; ok {
361 2 : return res, nil
362 2 : }
363 2 : res, err := p.st.Remote.StorageFactory.CreateStorage(locator)
364 2 : if err != nil {
365 1 : return nil, err
366 1 : }
367 :
368 2 : p.mu.remote.storageObjects[locator] = res
369 2 : return res, nil
370 : }
371 :
372 : // ensureStorage populates the remote.Storage object for the given locator, if necessary.
373 2 : func (p *provider) ensureStorage(locator remote.Locator) (remote.Storage, error) {
374 2 : p.mu.Lock()
375 2 : defer p.mu.Unlock()
376 2 : return p.ensureStorageLocked(locator)
377 2 : }
|