diff --git a/pkg/blobserver/diskpacked/diskpacked.go b/pkg/blobserver/diskpacked/diskpacked.go index 79c49ba..336c8e4 100644 --- a/pkg/blobserver/diskpacked/diskpacked.go +++ b/pkg/blobserver/diskpacked/diskpacked.go @@ -61,17 +61,26 @@ type storage struct { current *os.File currentN uint64 closed bool + + // the sub-partition (queue) to write to / read from, or "" for none. + partition string + // queue partitions to mirror new blobs into (when partition + // above is the empty string) + mirrorPartitions []*storage } func New(root string) (*storage, error) { fi, err := os.Stat(root) - if os.IsNotExist(err) { - return nil, fmt.Errorf("storage root %q doesn't exist", root) - } if err != nil { - return nil, fmt.Errorf("Failed to stat directory %q: %v", root, err) - } - if !fi.IsDir() { + if os.IsNotExist(err) { + log.Printf("Storage root %q doesn't exists, creating it now.", root) + if err = os.MkdirAll(root, 0700); err != nil { + return nil, fmt.Errorf("storage root %q doesn't exist and can't create it: %s", root, err) + } + } else { + return nil, fmt.Errorf("Failed to stat directory %q: %v", root, err) + } + } else if !fi.IsDir() { return nil, fmt.Errorf("storage root %q exists but is not a directory.", root) } index, closer, err := kvfile.NewStorage(filepath.Join(root, "index.kv")) @@ -232,6 +241,7 @@ func (s *storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit } func (s *storage) ReceiveBlob(br blob.Ref, source io.Reader) (brGot blob.SizedRef, err error) { + log.Printf("receiving blob %v", br) tempFile, err := ioutil.TempFile("", br.String()) if err != nil { return @@ -267,6 +277,17 @@ func (s *storage) ReceiveBlob(br blob.Ref, source io.Reader) (brGot blob.SizedRe if err = s.append(sbr, tempFile); err != nil { return } + + for _, mirror := range s.mirrorPartitions { + if _, err = tempFile.Seek(0, 0); err != nil { + return + } + if err = mirror.append(sbr, tempFile); err != nil { + return + } + log.Printf("Mirrored blob %s to partition %q", sbr, mirror.root) + } + return sbr, nil } @@ -341,3 +362,37 @@ func (m blobMeta) String() string { func (m blobMeta) SizedRef(br blob.Ref) blob.SizedRef { return blob.SizedRef{Ref: br, Size: int64(m.size)} } + +func (ds *storage) CreateQueue(name string) (blobserver.Storage, error) { + if ds.partition != "" { + return nil, fmt.Errorf("can't create queue %q on existing queue %q", + name, ds.partition) + } + if !validQueueName(name) { + return nil, fmt.Errorf("invalid queue name %q", name) + } + q, err := New(filepath.Join(ds.root, "partition", "queue-"+name)) + if err != nil { + return nil, err + } + ds.mirrorPartitions = append(ds.mirrorPartitions, q) + return q, nil +} + +func validQueueName(name string) bool { + for _, c := range name { + switch { + case c >= 'a' && c <= 'z': + fallthrough + case c >= 'A' && c <= 'Z': + fallthrough + case c >= '0' && c <= '9': + fallthrough + case c == '-' || c == '_': + continue + default: + return false + } + } + return true +}