Skip to content

Commit

Permalink
Interruptible and continuous migration of metabase to metabucket (#3209)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov authored Mar 6, 2025
2 parents e593f30 + eea3ce1 commit 4e74f0d
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 67 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ Changelog for NeoFS Node
- IR cache size for handled notary requests (#3205)
- Metabase V3->4 migration routine no longer breaks on broken binary or proto-violating object encounter (#3203)
- Metabase V3->4 migration routine skips objects from the removed containers (#3203)
- Metabase V3->4 migration routine processes up to 1000 objects per BoltDB transaction and keeps the progress (#3202)
- Metabase V3->4 migration routine stops gracefully on OS signal (#3202)

### Removed

Expand Down
1 change: 1 addition & 0 deletions cmd/neofs-node/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (c *cfg) shardOpts() []shardOptsWithID {
meta.WithLogger(c.log),
meta.WithEpochState(c.cfgNetmap.state),
meta.WithContainers(containerPresenceChecker{src: c.cfgObject.cnrSource}),
meta.WithInitContext(c.ctx),
),
shard.WithPiloramaOptions(piloramaOpts...),
shard.WithWriteCache(shCfg.WritecacheCfg.Enabled),
Expand Down
3 changes: 2 additions & 1 deletion pkg/core/object/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ func calcMaxUniqueSearchResults(lim uint16, sets [][]client.SearchResultItem) ui
}
}
}
if n++; n == lim {
n++
if n == lim {
return n
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/local_object_storage/blobstor/iterate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ func TestIterate_IgnoreErrors(t *testing.T) {
// n := 0
// expectedErr := errors.New("expected error")
// prm.SetIterationHandler(func(e IterationElement) error {
// if n++; n == objCount/2 {
// n++
// if n == objCount/2 {
// return expectedErr
// }
// return nil
Expand Down
14 changes: 7 additions & 7 deletions pkg/local_object_storage/metabase/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ func (db *DB) init(reset bool) error {
string(bucketNameLocked): {},
}

if !reset {
// Normal open, check version and update if not initialized.
if err := db.checkVersion(); err != nil {
return err
}
}

return db.boltDB.Update(func(tx *bbolt.Tx) error {
var err error
if !reset {
// Normal open, check version and update if not initialized.
err := db.checkVersion(tx)
if err != nil {
return err
}
}
for k := range mStaticBuckets {
name := []byte(k)
if reset {
Expand Down
11 changes: 11 additions & 0 deletions pkg/local_object_storage/metabase/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package meta

import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"io/fs"
Expand Down Expand Up @@ -65,6 +66,8 @@ type cfg struct {

epochState EpochState
containers Containers

initCtx context.Context
}

func defaultCfg() *cfg {
Expand All @@ -75,6 +78,7 @@ func defaultCfg() *cfg {
boltBatchDelay: bbolt.DefaultMaxBatchDelay,
boltBatchSize: bbolt.DefaultMaxBatchSize,
log: zap.L(),
initCtx: context.Background(),
}
}

Expand Down Expand Up @@ -357,3 +361,10 @@ func WithEpochState(s EpochState) Option {

// WithContainers return option to specify container source.
func WithContainers(cs Containers) Option { return func(c *cfg) { c.containers = cs } }

// WithInitContext allows to specify context for [DB.Init] operation.
func WithInitContext(ctx context.Context) Option {
// TODO: accept context parameter instead. Current approach allowed to reduce
// code changes.
return func(c *cfg) { c.initCtx = ctx }
}
192 changes: 134 additions & 58 deletions pkg/local_object_storage/metabase/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package meta

import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"slices"

objectconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
Expand All @@ -25,13 +27,20 @@ var versionKey = []byte("version")
// the current code version.
var ErrOutdatedVersion = logicerr.New("invalid version, resynchronization is required")

func (db *DB) checkVersion(tx *bbolt.Tx) error {
stored, knownVersion := getVersion(tx)
func (db *DB) checkVersion() error {
var stored uint64
var knownVersion bool
if err := db.boltDB.View(func(tx *bbolt.Tx) error {
stored, knownVersion = getVersion(tx)
return nil
}); err != nil {
return err
}

switch {
case !knownVersion:
// new database, write version
return updateVersion(tx, currentMetaVersion)
return db.boltDB.Update(func(tx *bbolt.Tx) error { return updateVersion(tx, currentMetaVersion) })
case stored == currentMetaVersion:
return nil
case stored > currentMetaVersion:
Expand All @@ -45,7 +54,7 @@ func (db *DB) checkVersion(tx *bbolt.Tx) error {
return fmt.Errorf("%w: expected=%d, stored=%d", ErrOutdatedVersion, currentMetaVersion, stored)
}

err := migrate(db, tx)
err := migrate(db)
if err != nil {
return fmt.Errorf("migrating from meta version %d failed, consider database resync: %w", i, err)
}
Expand Down Expand Up @@ -77,13 +86,17 @@ func getVersion(tx *bbolt.Tx) (uint64, bool) {
return 0, false
}

var migrateFrom = map[uint64]func(*DB, *bbolt.Tx) error{
var migrateFrom = map[uint64]func(*DB) error{
2: migrateFrom2Version,
3: migrateFrom3Version,
}

func migrateFrom2Version(db *DB, tx *bbolt.Tx) error {
tsExpiration := db.epochState.CurrentEpoch() + objectconfig.DefaultTombstoneLifetime
func migrateFrom2Version(db *DB) error {
return db.boltDB.Update(func(tx *bbolt.Tx) error { return migrateFrom2VersionTx(tx, db.epochState) })
}

func migrateFrom2VersionTx(tx *bbolt.Tx, epochState EpochState) error {
tsExpiration := epochState.CurrentEpoch() + objectconfig.DefaultTombstoneLifetime
bkt := tx.Bucket(graveyardBucketName)
if bkt == nil {
return errors.New("graveyard bucket is nil")
Expand Down Expand Up @@ -112,69 +125,132 @@ func migrateFrom2Version(db *DB, tx *bbolt.Tx) error {
return updateVersion(tx, 3)
}

func migrateFrom3Version(db *DB, tx *bbolt.Tx) error {
func migrateFrom3Version(db *DB) error {
var fromBkt, afterObj []byte
for {
select {
case <-db.initCtx.Done():
return context.Cause(db.initCtx)
default:
}
if err := db.boltDB.Update(func(tx *bbolt.Tx) error {
var err error
if fromBkt, afterObj, err = migrateContainersToMetaBucket(db.log, db.cfg.containers, tx, fromBkt, afterObj); err == nil {
fromBkt, afterObj = slices.Clone(fromBkt), slices.Clone(afterObj) // needed after the tx lifetime
}
return err
}); err != nil {
return err
}
if fromBkt == nil {
break
}
}
return db.boltDB.Update(func(tx *bbolt.Tx) error { return updateVersion(tx, 4) })
}

func migrateContainersToMetaBucket(l *zap.Logger, cs Containers, tx *bbolt.Tx, fromBkt, afterObj []byte) ([]byte, []byte, error) {
c := tx.Cursor()
pref := []byte{metadataPrefix}
if k, _ := c.Seek(pref); bytes.HasPrefix(k, pref) {
return fmt.Errorf("key with prefix 0x%X detected, metadata space is occupied by unexpected data or the version has not been updated to #%d", pref, currentMetaVersion)
var name []byte
if fromBkt != nil {
name, _ = c.Seek(fromBkt)
} else {
name, _ = c.First()
}
err := tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
rem := uint(1000)
var done uint
var err error
for ; name != nil; name, _ = c.Next() {
switch name[0] {
default:
return nil
continue
case primaryPrefix, tombstonePrefix, storageGroupPrefix, lockersPrefix, linkObjectsPrefix:
}
if len(name[1:]) != cid.Size {
return fmt.Errorf("invalid container bucket with prefix 0x%X: wrong CID len %d", name[0], len(name[1:]))
return nil, nil, fmt.Errorf("invalid container bucket with prefix 0x%X: wrong CID len %d", name[0], len(name[1:]))
}
cnr := cid.ID(name[1:])
if exists, err := db.cfg.containers.Exists(cnr); err != nil {
return fmt.Errorf("check container presence: %w", err)
if exists, err := cs.Exists(cnr); err != nil {
return nil, nil, fmt.Errorf("check container presence: %w", err)
} else if !exists {
db.log.Info("container no longer exists, ignoring", zap.Stringer("container", cnr))
return nil
l.Info("container no longer exists, ignoring", zap.Stringer("container", cnr))
continue
}
err := b.ForEach(func(k, v []byte) error {
if len(k) != oid.Size {
return fmt.Errorf("wrong OID key len %d", len(k))
}
id := oid.ID(k)
var hdr object.Object
if err := hdr.Unmarshal(v); err != nil {
db.log.Info("invalid object binary in the container bucket's value, ignoring", zap.Error(err),
zap.Stringer("container", cnr), zap.Stringer("object", id), zap.Binary("data", v))
return nil
}
if err := verifyHeaderForMetadata(hdr); err != nil {
db.log.Info("invalid header in the container bucket, ignoring", zap.Error(err),
zap.Stringer("container", cnr), zap.Stringer("object", id), zap.Binary("data", v))
return nil
}
par := hdr.Parent()
hasParent := par != nil
if err := putMetadataForObject(tx, hdr, hasParent, true); err != nil {
return fmt.Errorf("put metadata for object %s: %w", id, err)
}
if hasParent && !par.GetID().IsZero() { // skip the first object without useful info similar to DB.put
if err := verifyHeaderForMetadata(hdr); err != nil {
db.log.Info("invalid parent header in the container bucket, ignoring", zap.Error(err),
zap.Stringer("container", cnr), zap.Stringer("child", id),
zap.Stringer("parent", par.GetID()), zap.Binary("data", v))
return nil
}
if err := putMetadataForObject(tx, *par, false, false); err != nil {
return fmt.Errorf("put metadata for parent of object %s: %w", id, err)
}
}
return nil
})
b := tx.Bucket(name) // must not be nil, bbolt/Tx.ForEach follows the same assumption
if done, afterObj, err = migrateContainerToMetaBucket(l, tx, b.Cursor(), cnr, afterObj, rem); err != nil {
return nil, nil, fmt.Errorf("process container 0x%X%s bucket: %w", name[0], cnr, err)
}
if done == rem {
break
}
rem -= done
}
return name, afterObj, nil
}

func migrateContainerToMetaBucket(l *zap.Logger, tx *bbolt.Tx, c *bbolt.Cursor, cnr cid.ID, after []byte, limit uint) (uint, []byte, error) {
var k, v []byte
if after != nil {
if k, v = c.Seek(after); bytes.Equal(k, after) {
k, v = c.Next()
}
} else {
k, v = c.First()
}
metaBkt := tx.Bucket(metaBucketKey(cnr)) // may be nil
var done uint
for ; k != nil; k, v = c.Next() {
ok, err := migrateObjectToMetaBucket(l, tx, metaBkt, cnr, k, v)
if err != nil {
return fmt.Errorf("process container 0x%X%s bucket: %w", name[0], cnr, err)
return 0, nil, err
}
if ok {
done++
if done == limit {
break
}
}
}
return done, k, nil
}

func migrateObjectToMetaBucket(l *zap.Logger, tx *bbolt.Tx, metaBkt *bbolt.Bucket, cnr cid.ID, id, bin []byte) (bool, error) {
if len(id) != oid.Size {
return false, fmt.Errorf("wrong OID key len %d", len(id))
}
var hdr object.Object
if err := hdr.Unmarshal(bin); err != nil {
l.Info("invalid object binary in the container bucket's value, ignoring", zap.Error(err),
zap.Stringer("container", cnr), zap.Stringer("object", oid.ID(id)), zap.Binary("data", bin))
return false, nil
}
if err := verifyHeaderForMetadata(hdr); err != nil {
l.Info("invalid header in the container bucket, ignoring", zap.Error(err),
zap.Stringer("container", cnr), zap.Stringer("object", oid.ID(id)), zap.Binary("data", bin))
return false, nil
}
if metaBkt != nil {
key := [1 + oid.Size]byte{metaPrefixID}
copy(key[1:], id)
if metaBkt.Get(key[:]) != nil {
return false, nil
}
}
par := hdr.Parent()
hasParent := par != nil
if err := putMetadataForObject(tx, hdr, hasParent, true); err != nil {
return false, fmt.Errorf("put metadata for object %s: %w", oid.ID(id), err)
}
if hasParent && !par.GetID().IsZero() { // skip the first object without useful info similar to DB.put
if err := verifyHeaderForMetadata(hdr); err != nil {
l.Info("invalid parent header in the container bucket, ignoring", zap.Error(err),
zap.Stringer("container", cnr), zap.Stringer("child", oid.ID(id)),
zap.Stringer("parent", par.GetID()), zap.Binary("data", bin))
return false, nil
}
if err := putMetadataForObject(tx, *par, false, false); err != nil {
return false, fmt.Errorf("put metadata for parent of object %s: %w", oid.ID(id), err)
}
return nil
})
if err != nil {
return err
}
return updateVersion(tx, 4)
return true, nil
}
Loading

0 comments on commit 4e74f0d

Please sign in to comment.