Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package objstorageprovider
6 :
7 : import (
8 : "bytes"
9 : "encoding/binary"
10 : "io"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/objstorage"
15 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
16 : "github.com/cockroachdb/pebble/objstorage/remote"
17 : )
18 :
19 : const (
20 : tagCreatorID = 1
21 : tagCreatorFileNum = 2
22 : tagCleanupMethod = 3
23 : // tagRefCheckID encodes the information for a ref marker that needs to be
24 : // checked when attaching this object to another provider. This is set to the
25 : // creator ID and FileNum for the provider that encodes the backing, and
26 : // allows the "target" provider to check that the "source" provider kept its
27 : // reference on the object alive.
28 : tagRefCheckID = 4
29 : // tagLocator encodes the remote.Locator; if absent the locator is "". It is
30 : // followed by the locator string length and the locator string.
31 : tagLocator = 5
32 : // tagLocator encodes a custom object name (if present). It is followed by the
33 : // custom name string length and the string.
34 : tagCustomObjectName = 6
35 :
36 : // Any new tags that don't have the tagNotSafeToIgnoreMask bit set must be
37 : // followed by the length of the data (so they can be skipped).
38 :
39 : // Any new tags that have the tagNotSafeToIgnoreMask bit set cause errors if
40 : // they are encountered by earlier code that doesn't know the tag.
41 : tagNotSafeToIgnoreMask = 64
42 : )
43 :
44 : func (p *provider) encodeRemoteObjectBacking(
45 : meta *objstorage.ObjectMetadata,
46 1 : ) (objstorage.RemoteObjectBacking, error) {
47 1 : if !meta.IsRemote() {
48 1 : return nil, errors.AssertionFailedf("object %s not on remote storage", meta.DiskFileNum)
49 1 : }
50 :
51 1 : buf := make([]byte, 0, binary.MaxVarintLen64*4)
52 1 : buf = binary.AppendUvarint(buf, tagCreatorID)
53 1 : buf = binary.AppendUvarint(buf, uint64(meta.Remote.CreatorID))
54 1 : // TODO(radu): encode file type as well?
55 1 : buf = binary.AppendUvarint(buf, tagCreatorFileNum)
56 1 : buf = binary.AppendUvarint(buf, uint64(meta.Remote.CreatorFileNum.FileNum()))
57 1 : buf = binary.AppendUvarint(buf, tagCleanupMethod)
58 1 : buf = binary.AppendUvarint(buf, uint64(meta.Remote.CleanupMethod))
59 1 : if meta.Remote.CleanupMethod == objstorage.SharedRefTracking {
60 1 : buf = binary.AppendUvarint(buf, tagRefCheckID)
61 1 : buf = binary.AppendUvarint(buf, uint64(p.remote.shared.creatorID))
62 1 : buf = binary.AppendUvarint(buf, uint64(meta.DiskFileNum.FileNum()))
63 1 : }
64 1 : if meta.Remote.Locator != "" {
65 1 : buf = binary.AppendUvarint(buf, tagLocator)
66 1 : buf = encodeString(buf, string(meta.Remote.Locator))
67 1 : }
68 1 : if meta.Remote.CustomObjectName != "" {
69 1 : buf = binary.AppendUvarint(buf, tagCustomObjectName)
70 1 : buf = encodeString(buf, meta.Remote.CustomObjectName)
71 1 : }
72 1 : return buf, nil
73 : }
74 :
75 : type remoteObjectBackingHandle struct {
76 : backing objstorage.RemoteObjectBacking
77 : fileNum base.DiskFileNum
78 : p *provider
79 : }
80 :
81 1 : func (s *remoteObjectBackingHandle) Get() (objstorage.RemoteObjectBacking, error) {
82 1 : if s.backing == nil {
83 1 : return nil, errors.Errorf("RemoteObjectBackingHandle.Get() called after Close()")
84 1 : }
85 1 : return s.backing, nil
86 : }
87 :
88 1 : func (s *remoteObjectBackingHandle) Close() {
89 1 : if s.backing != nil {
90 1 : s.backing = nil
91 1 : s.p.unprotectObject(s.fileNum)
92 1 : }
93 : }
94 :
95 : var _ objstorage.RemoteObjectBackingHandle = (*remoteObjectBackingHandle)(nil)
96 :
97 : // RemoteObjectBacking is part of the objstorage.Provider interface.
98 : func (p *provider) RemoteObjectBacking(
99 : meta *objstorage.ObjectMetadata,
100 1 : ) (objstorage.RemoteObjectBackingHandle, error) {
101 1 : backing, err := p.encodeRemoteObjectBacking(meta)
102 1 : if err != nil {
103 1 : return nil, err
104 1 : }
105 1 : p.protectObject(meta.DiskFileNum)
106 1 : return &remoteObjectBackingHandle{
107 1 : backing: backing,
108 1 : fileNum: meta.DiskFileNum,
109 1 : p: p,
110 1 : }, nil
111 : }
112 :
113 : // CreateExternalObjectBacking is part of the objstorage.Provider interface.
114 : func (p *provider) CreateExternalObjectBacking(
115 : locator remote.Locator, objName string,
116 1 : ) (objstorage.RemoteObjectBacking, error) {
117 1 : var meta objstorage.ObjectMetadata
118 1 : meta.Remote.Locator = locator
119 1 : meta.Remote.CustomObjectName = objName
120 1 : meta.Remote.CleanupMethod = objstorage.SharedNoCleanup
121 1 : return p.encodeRemoteObjectBacking(&meta)
122 1 : }
123 :
124 : type decodedBacking struct {
125 : meta objstorage.ObjectMetadata
126 : // refToCheck is set only when meta.Remote.CleanupMethod is RefTracking
127 : refToCheck struct {
128 : creatorID objstorage.CreatorID
129 : fileNum base.DiskFileNum
130 : }
131 : }
132 :
133 : // decodeRemoteObjectBacking decodes the remote object metadata.
134 : //
135 : // Note that the meta.Remote.Storage field is not set.
136 : func decodeRemoteObjectBacking(
137 : fileType base.FileType, fileNum base.DiskFileNum, buf objstorage.RemoteObjectBacking,
138 1 : ) (decodedBacking, error) {
139 1 : var creatorID, creatorFileNum, cleanupMethod, refCheckCreatorID, refCheckFileNum uint64
140 1 : var locator, customObjName string
141 1 : br := bytes.NewReader(buf)
142 1 : for {
143 1 : tag, err := binary.ReadUvarint(br)
144 1 : if err == io.EOF {
145 1 : break
146 : }
147 1 : if err != nil {
148 0 : return decodedBacking{}, err
149 0 : }
150 1 : switch tag {
151 1 : case tagCreatorID:
152 1 : creatorID, err = binary.ReadUvarint(br)
153 :
154 1 : case tagCreatorFileNum:
155 1 : creatorFileNum, err = binary.ReadUvarint(br)
156 :
157 1 : case tagCleanupMethod:
158 1 : cleanupMethod, err = binary.ReadUvarint(br)
159 :
160 1 : case tagRefCheckID:
161 1 : refCheckCreatorID, err = binary.ReadUvarint(br)
162 1 : if err == nil {
163 1 : refCheckFileNum, err = binary.ReadUvarint(br)
164 1 : }
165 :
166 1 : case tagLocator:
167 1 : locator, err = decodeString(br)
168 :
169 1 : case tagCustomObjectName:
170 1 : customObjName, err = decodeString(br)
171 :
172 1 : default:
173 1 : // Ignore unknown tags, unless they're not safe to ignore.
174 1 : if tag&tagNotSafeToIgnoreMask != 0 {
175 1 : return decodedBacking{}, errors.Newf("unknown tag %d", tag)
176 1 : }
177 1 : var dataLen uint64
178 1 : dataLen, err = binary.ReadUvarint(br)
179 1 : if err == nil {
180 1 : _, err = br.Seek(int64(dataLen), io.SeekCurrent)
181 1 : }
182 : }
183 1 : if err != nil {
184 0 : return decodedBacking{}, err
185 0 : }
186 : }
187 1 : if customObjName == "" {
188 1 : if creatorID == 0 {
189 0 : return decodedBacking{}, errors.Newf("remote object backing missing creator ID")
190 0 : }
191 1 : if creatorFileNum == 0 {
192 0 : return decodedBacking{}, errors.Newf("remote object backing missing creator file num")
193 0 : }
194 : }
195 1 : var res decodedBacking
196 1 : res.meta.DiskFileNum = fileNum
197 1 : res.meta.FileType = fileType
198 1 : res.meta.Remote.CreatorID = objstorage.CreatorID(creatorID)
199 1 : res.meta.Remote.CreatorFileNum = base.FileNum(creatorFileNum).DiskFileNum()
200 1 : res.meta.Remote.CleanupMethod = objstorage.SharedCleanupMethod(cleanupMethod)
201 1 : if res.meta.Remote.CleanupMethod == objstorage.SharedRefTracking {
202 1 : if refCheckCreatorID == 0 || refCheckFileNum == 0 {
203 0 : return decodedBacking{}, errors.Newf("remote object backing missing ref to check")
204 0 : }
205 1 : res.refToCheck.creatorID = objstorage.CreatorID(refCheckCreatorID)
206 1 : res.refToCheck.fileNum = base.FileNum(refCheckFileNum).DiskFileNum()
207 : }
208 1 : res.meta.Remote.Locator = remote.Locator(locator)
209 1 : res.meta.Remote.CustomObjectName = customObjName
210 1 : return res, nil
211 : }
212 :
213 1 : func encodeString(buf []byte, s string) []byte {
214 1 : buf = binary.AppendUvarint(buf, uint64(len(s)))
215 1 : buf = append(buf, []byte(s)...)
216 1 : return buf
217 1 : }
218 :
219 1 : func decodeString(br io.ByteReader) (string, error) {
220 1 : length, err := binary.ReadUvarint(br)
221 1 : if err != nil || length == 0 {
222 0 : return "", err
223 0 : }
224 1 : buf := make([]byte, length)
225 1 : for i := range buf {
226 1 : buf[i], err = br.ReadByte()
227 1 : if err != nil {
228 0 : return "", err
229 0 : }
230 : }
231 1 : return string(buf), nil
232 : }
233 :
234 : // AttachRemoteObjects is part of the objstorage.Provider interface.
235 : func (p *provider) AttachRemoteObjects(
236 : objs []objstorage.RemoteObjectToAttach,
237 1 : ) ([]objstorage.ObjectMetadata, error) {
238 1 : decoded := make([]decodedBacking, len(objs))
239 1 : for i, o := range objs {
240 1 : var err error
241 1 : decoded[i], err = decodeRemoteObjectBacking(o.FileType, o.FileNum, o.Backing)
242 1 : if err != nil {
243 0 : return nil, err
244 0 : }
245 1 : decoded[i].meta.Remote.Storage, err = p.ensureStorage(decoded[i].meta.Remote.Locator)
246 1 : if err != nil {
247 1 : return nil, err
248 1 : }
249 : }
250 :
251 : // Create the reference marker objects.
252 : // TODO(radu): parallelize this.
253 1 : for _, d := range decoded {
254 1 : if d.meta.Remote.CleanupMethod != objstorage.SharedRefTracking {
255 1 : continue
256 : }
257 1 : if err := p.sharedCreateRef(d.meta); err != nil {
258 0 : // TODO(radu): clean up references previously created in this loop.
259 0 : return nil, err
260 0 : }
261 : // Check the "origin"'s reference.
262 1 : refName := sharedObjectRefName(d.meta, d.refToCheck.creatorID, d.refToCheck.fileNum)
263 1 : if _, err := d.meta.Remote.Storage.Size(refName); err != nil {
264 1 : _ = p.sharedUnref(d.meta)
265 1 : // TODO(radu): clean up references previously created in this loop.
266 1 : if d.meta.Remote.Storage.IsNotExistError(err) {
267 1 : return nil, errors.Errorf("origin marker object %q does not exist;"+
268 1 : " object probably removed from the provider which created the backing", refName)
269 1 : }
270 0 : return nil, errors.Wrapf(err, "checking origin's marker object %s", refName)
271 : }
272 : }
273 :
274 1 : func() {
275 1 : p.mu.Lock()
276 1 : defer p.mu.Unlock()
277 1 : for _, d := range decoded {
278 1 : p.mu.remote.catalogBatch.AddObject(remoteobjcat.RemoteObjectMetadata{
279 1 : FileNum: d.meta.DiskFileNum,
280 1 : FileType: d.meta.FileType,
281 1 : CreatorID: d.meta.Remote.CreatorID,
282 1 : CreatorFileNum: d.meta.Remote.CreatorFileNum,
283 1 : CleanupMethod: d.meta.Remote.CleanupMethod,
284 1 : Locator: d.meta.Remote.Locator,
285 1 : })
286 1 : }
287 : }()
288 1 : if err := p.sharedSync(); err != nil {
289 0 : return nil, err
290 0 : }
291 :
292 1 : metas := make([]objstorage.ObjectMetadata, len(decoded))
293 1 : for i, d := range decoded {
294 1 : metas[i] = d.meta
295 1 : }
296 :
297 1 : p.mu.Lock()
298 1 : defer p.mu.Unlock()
299 1 : for _, meta := range metas {
300 1 : p.mu.knownObjects[meta.DiskFileNum] = meta
301 1 : }
302 1 : return metas, nil
303 : }
|