From 27aa52cfcd11bc4f807b9d95453beb181e036bd4 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 6 Mar 2025 12:08:55 +0300 Subject: [PATCH 1/6] node/metabase: Split V3->4 migration func to smaller ones Pure refactoring. This will also decrease code movements on support. Refs #3202. Signed-off-by: Leonard Lyubich --- pkg/local_object_storage/metabase/version.go | 94 +++++++++++--------- 1 file changed, 53 insertions(+), 41 deletions(-) diff --git a/pkg/local_object_storage/metabase/version.go b/pkg/local_object_storage/metabase/version.go index f5e525d6f2..5b3d6fbf0a 100644 --- a/pkg/local_object_storage/metabase/version.go +++ b/pkg/local_object_storage/metabase/version.go @@ -118,7 +118,14 @@ func migrateFrom3Version(db *DB, tx *bbolt.Tx) error { if k, _ := c.Seek(pref); bytes.HasPrefix(k, pref) { return fmt.Errorf("key with prefix 0x%X detected, metadata space is occupied by unexpected data or the version has not been updated to #%d", pref, currentMetaVersion) } - err := tx.ForEach(func(name []byte, b *bbolt.Bucket) error { + if err := migrateContainersToMetaBucket(db.log, db.cfg.containers, tx); err != nil { + return err + } + return updateVersion(tx, 4) +} + +func migrateContainersToMetaBucket(l *zap.Logger, cs Containers, tx *bbolt.Tx) error { + return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { switch name[0] { default: return nil @@ -128,53 +135,58 @@ func migrateFrom3Version(db *DB, tx *bbolt.Tx) error { return fmt.Errorf("invalid container bucket with prefix 0x%X: wrong CID len %d", name[0], len(name[1:])) } cnr := cid.ID(name[1:]) - if exists, err := db.cfg.containers.Exists(cnr); err != nil { + if exists, err := cs.Exists(cnr); err != nil { return fmt.Errorf("check container presence: %w", err) } else if !exists { - db.log.Info("container no longer exists, ignoring", zap.Stringer("container", cnr)) + l.Info("container no longer exists, ignoring", zap.Stringer("container", cnr)) return nil } - err := b.ForEach(func(k, v []byte) error { - if len(k) != oid.Size { - return fmt.Errorf("wrong OID key len %d", len(k)) - } - id := oid.ID(k) - var hdr object.Object - if err := hdr.Unmarshal(v); err != nil { - db.log.Info("invalid object binary in the container bucket's value, ignoring", zap.Error(err), - zap.Stringer("container", cnr), zap.Stringer("object", id), zap.Binary("data", v)) - return nil - } - if err := verifyHeaderForMetadata(hdr); err != nil { - db.log.Info("invalid header in the container bucket, ignoring", zap.Error(err), - zap.Stringer("container", cnr), zap.Stringer("object", id), zap.Binary("data", v)) - return nil - } - par := hdr.Parent() - hasParent := par != nil - if err := putMetadataForObject(tx, hdr, hasParent, true); err != nil { - return fmt.Errorf("put metadata for object %s: %w", id, err) - } - if hasParent && !par.GetID().IsZero() { // skip the first object without useful info similar to DB.put - if err := verifyHeaderForMetadata(hdr); err != nil { - db.log.Info("invalid parent header in the container bucket, ignoring", zap.Error(err), - zap.Stringer("container", cnr), zap.Stringer("child", id), - zap.Stringer("parent", par.GetID()), zap.Binary("data", v)) - return nil - } - if err := putMetadataForObject(tx, *par, false, false); err != nil { - return fmt.Errorf("put metadata for parent of object %s: %w", id, err) - } - } - return nil - }) - if err != nil { + if err := migrateContainerToMetaBucket(l, tx, b.Cursor(), cnr); err != nil { return fmt.Errorf("process container 0x%X%s bucket: %w", name[0], cnr, err) } return nil }) - if err != nil { - return err +} + +func migrateContainerToMetaBucket(l *zap.Logger, tx *bbolt.Tx, c *bbolt.Cursor, cnr cid.ID) error { + for k, v := c.First(); k != nil; k, v = c.Next() { + if err := migrateObjectToMetaBucket(l, tx, cnr, k, v); err != nil { + return err + } } - return updateVersion(tx, 4) + return nil +} + +func migrateObjectToMetaBucket(l *zap.Logger, tx *bbolt.Tx, cnr cid.ID, id, bin []byte) error { + if len(id) != oid.Size { + return fmt.Errorf("wrong OID key len %d", len(id)) + } + var hdr object.Object + if err := hdr.Unmarshal(bin); err != nil { + l.Info("invalid object binary in the container bucket's value, ignoring", zap.Error(err), + zap.Stringer("container", cnr), zap.Stringer("object", oid.ID(id)), zap.Binary("data", bin)) + return nil + } + if err := verifyHeaderForMetadata(hdr); err != nil { + l.Info("invalid header in the container bucket, ignoring", zap.Error(err), + zap.Stringer("container", cnr), zap.Stringer("object", oid.ID(id)), zap.Binary("data", bin)) + return nil + } + par := hdr.Parent() + hasParent := par != nil + if err := putMetadataForObject(tx, hdr, hasParent, true); err != nil { + return fmt.Errorf("put metadata for object %s: %w", oid.ID(id), err) + } + if hasParent && !par.GetID().IsZero() { // skip the first object without useful info similar to DB.put + if err := verifyHeaderForMetadata(hdr); err != nil { + l.Info("invalid parent header in the container bucket, ignoring", zap.Error(err), + zap.Stringer("container", cnr), zap.Stringer("child", oid.ID(id)), + zap.Stringer("parent", par.GetID()), zap.Binary("data", bin)) + return nil + } + if err := putMetadataForObject(tx, *par, false, false); err != nil { + return fmt.Errorf("put metadata for parent of object %s: %w", oid.ID(id), err) + } + } + return nil } From b80247dc38f899f1fa8a89067cd3dd73889614e3 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 6 Mar 2025 12:34:34 +0300 Subject: [PATCH 2/6] node/metabase: Replace `ForEach` bucket with cursor Will be useful to implement continued processing. Another micro-improvement is the delayed `bbolt/Tx.Bucket`when there are errors in the name. Refs #3202. Signed-off-by: Leonard Lyubich --- pkg/local_object_storage/metabase/version.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/local_object_storage/metabase/version.go b/pkg/local_object_storage/metabase/version.go index 5b3d6fbf0a..265fa20a2c 100644 --- a/pkg/local_object_storage/metabase/version.go +++ b/pkg/local_object_storage/metabase/version.go @@ -125,10 +125,11 @@ func migrateFrom3Version(db *DB, tx *bbolt.Tx) error { } func migrateContainersToMetaBucket(l *zap.Logger, cs Containers, tx *bbolt.Tx) error { - return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { + c := tx.Cursor() + for name, _ := c.First(); name != nil; name, _ = c.Next() { switch name[0] { default: - return nil + continue case primaryPrefix, tombstonePrefix, storageGroupPrefix, lockersPrefix, linkObjectsPrefix: } if len(name[1:]) != cid.Size { @@ -139,13 +140,14 @@ func migrateContainersToMetaBucket(l *zap.Logger, cs Containers, tx *bbolt.Tx) e return fmt.Errorf("check container presence: %w", err) } else if !exists { l.Info("container no longer exists, ignoring", zap.Stringer("container", cnr)) - return nil + continue } + b := tx.Bucket(name) // must not be nil, bbolt/Tx.ForEach follows the same assumption if err := migrateContainerToMetaBucket(l, tx, b.Cursor(), cnr); err != nil { return fmt.Errorf("process container 0x%X%s bucket: %w", name[0], cnr, err) } - return nil - }) + } + return nil } func migrateContainerToMetaBucket(l *zap.Logger, tx *bbolt.Tx, c *bbolt.Cursor, cnr cid.ID) error { From f105b24ae3839fa1828a7d875d62a57bfd98e790 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 6 Mar 2025 13:44:24 +0300 Subject: [PATCH 3/6] node/metabase: Migrate objects to metabucket in batches Previously, any migration to newer metabase version was performed within the single BoltDB transaction. Thus, V3->4 migration tried to overtake all objects to the metabucket in one tx. Practice has shown that such an approach can delay migration for hours and even days if the objects number in the millions. This brings two changes to the metabase init procedure: 1. current version number is read in a separate BoltDB RO transaction; 2. any metabase version upgrade is done in separate BoltDB transaction(s); 3. V3->4 migration writes objects' metadata to the metabucket in batches of 1000 size. 2 was needed to achieve 3. Overall this is a good change as upgrading a version is an atomic operation. 3 was tested in practice: it sped up processing of 9KK objects from almost a week (!) to half an hour. Closes #3202. Signed-off-by: Leonard Lyubich --- CHANGELOG.md | 1 + pkg/local_object_storage/metabase/control.go | 14 +-- pkg/local_object_storage/metabase/version.go | 119 ++++++++++++------ .../metabase/version_test.go | 110 ++++++++++++++++ 4 files changed, 201 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cc68bb336d..c1cf275f80 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ Changelog for NeoFS Node - IR cache size for handled notary requests (#3205) - Metabase V3->4 migration routine no longer breaks on broken binary or proto-violating object encounter (#3203) - Metabase V3->4 migration routine skips objects from the removed containers (#3203) +- Metabase V3->4 migration routine processes up to 1000 objects per BoltDB transaction and keeps the progress (#3202) ### Removed diff --git a/pkg/local_object_storage/metabase/control.go b/pkg/local_object_storage/metabase/control.go index f08c080cd7..22d88caa08 100644 --- a/pkg/local_object_storage/metabase/control.go +++ b/pkg/local_object_storage/metabase/control.go @@ -93,15 +93,15 @@ func (db *DB) init(reset bool) error { string(bucketNameLocked): {}, } + if !reset { + // Normal open, check version and update if not initialized. + if err := db.checkVersion(); err != nil { + return err + } + } + return db.boltDB.Update(func(tx *bbolt.Tx) error { var err error - if !reset { - // Normal open, check version and update if not initialized. - err := db.checkVersion(tx) - if err != nil { - return err - } - } for k := range mStaticBuckets { name := []byte(k) if reset { diff --git a/pkg/local_object_storage/metabase/version.go b/pkg/local_object_storage/metabase/version.go index 265fa20a2c..596c4764bc 100644 --- a/pkg/local_object_storage/metabase/version.go +++ b/pkg/local_object_storage/metabase/version.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "errors" "fmt" + "slices" objectconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" @@ -25,13 +26,20 @@ var versionKey = []byte("version") // the current code version. var ErrOutdatedVersion = logicerr.New("invalid version, resynchronization is required") -func (db *DB) checkVersion(tx *bbolt.Tx) error { - stored, knownVersion := getVersion(tx) +func (db *DB) checkVersion() error { + var stored uint64 + var knownVersion bool + if err := db.boltDB.View(func(tx *bbolt.Tx) error { + stored, knownVersion = getVersion(tx) + return nil + }); err != nil { + return err + } switch { case !knownVersion: // new database, write version - return updateVersion(tx, currentMetaVersion) + return db.boltDB.Update(func(tx *bbolt.Tx) error { return updateVersion(tx, currentMetaVersion) }) case stored == currentMetaVersion: return nil case stored > currentMetaVersion: @@ -45,7 +53,7 @@ func (db *DB) checkVersion(tx *bbolt.Tx) error { return fmt.Errorf("%w: expected=%d, stored=%d", ErrOutdatedVersion, currentMetaVersion, stored) } - err := migrate(db, tx) + err := migrate(db) if err != nil { return fmt.Errorf("migrating from meta version %d failed, consider database resync: %w", i, err) } @@ -77,13 +85,17 @@ func getVersion(tx *bbolt.Tx) (uint64, bool) { return 0, false } -var migrateFrom = map[uint64]func(*DB, *bbolt.Tx) error{ +var migrateFrom = map[uint64]func(*DB) error{ 2: migrateFrom2Version, 3: migrateFrom3Version, } -func migrateFrom2Version(db *DB, tx *bbolt.Tx) error { - tsExpiration := db.epochState.CurrentEpoch() + objectconfig.DefaultTombstoneLifetime +func migrateFrom2Version(db *DB) error { + return db.boltDB.Update(func(tx *bbolt.Tx) error { return migrateFrom2VersionTx(tx, db.epochState) }) +} + +func migrateFrom2VersionTx(tx *bbolt.Tx, epochState EpochState) error { + tsExpiration := epochState.CurrentEpoch() + objectconfig.DefaultTombstoneLifetime bkt := tx.Bucket(graveyardBucketName) if bkt == nil { return errors.New("graveyard bucket is nil") @@ -112,83 +124,118 @@ func migrateFrom2Version(db *DB, tx *bbolt.Tx) error { return updateVersion(tx, 3) } -func migrateFrom3Version(db *DB, tx *bbolt.Tx) error { - c := tx.Cursor() - pref := []byte{metadataPrefix} - if k, _ := c.Seek(pref); bytes.HasPrefix(k, pref) { - return fmt.Errorf("key with prefix 0x%X detected, metadata space is occupied by unexpected data or the version has not been updated to #%d", pref, currentMetaVersion) - } - if err := migrateContainersToMetaBucket(db.log, db.cfg.containers, tx); err != nil { - return err +func migrateFrom3Version(db *DB) error { + var fromBkt, afterObj []byte + for { + if err := db.boltDB.Update(func(tx *bbolt.Tx) error { + var err error + if fromBkt, afterObj, err = migrateContainersToMetaBucket(db.log, db.cfg.containers, tx, fromBkt, afterObj); err == nil { + fromBkt, afterObj = slices.Clone(fromBkt), slices.Clone(afterObj) // needed after the tx lifetime + } + return err + }); err != nil { + return err + } + if fromBkt == nil { + break + } } - return updateVersion(tx, 4) + return db.boltDB.Update(func(tx *bbolt.Tx) error { return updateVersion(tx, 4) }) } -func migrateContainersToMetaBucket(l *zap.Logger, cs Containers, tx *bbolt.Tx) error { +func migrateContainersToMetaBucket(l *zap.Logger, cs Containers, tx *bbolt.Tx, fromBkt, afterObj []byte) ([]byte, []byte, error) { c := tx.Cursor() - for name, _ := c.First(); name != nil; name, _ = c.Next() { + var name []byte + if fromBkt != nil { + name, _ = c.Seek(fromBkt) + } else { + name, _ = c.First() + } + rem := uint(1000) + var done uint + var err error + for ; name != nil; name, _ = c.Next() { switch name[0] { default: continue case primaryPrefix, tombstonePrefix, storageGroupPrefix, lockersPrefix, linkObjectsPrefix: } if len(name[1:]) != cid.Size { - return fmt.Errorf("invalid container bucket with prefix 0x%X: wrong CID len %d", name[0], len(name[1:])) + return nil, nil, fmt.Errorf("invalid container bucket with prefix 0x%X: wrong CID len %d", name[0], len(name[1:])) } cnr := cid.ID(name[1:]) if exists, err := cs.Exists(cnr); err != nil { - return fmt.Errorf("check container presence: %w", err) + return nil, nil, fmt.Errorf("check container presence: %w", err) } else if !exists { l.Info("container no longer exists, ignoring", zap.Stringer("container", cnr)) continue } b := tx.Bucket(name) // must not be nil, bbolt/Tx.ForEach follows the same assumption - if err := migrateContainerToMetaBucket(l, tx, b.Cursor(), cnr); err != nil { - return fmt.Errorf("process container 0x%X%s bucket: %w", name[0], cnr, err) + if done, afterObj, err = migrateContainerToMetaBucket(l, tx, b.Cursor(), cnr, afterObj, rem); err != nil { + return nil, nil, fmt.Errorf("process container 0x%X%s bucket: %w", name[0], cnr, err) } + if done == rem { + break + } + rem -= done } - return nil + return name, afterObj, nil } -func migrateContainerToMetaBucket(l *zap.Logger, tx *bbolt.Tx, c *bbolt.Cursor, cnr cid.ID) error { - for k, v := c.First(); k != nil; k, v = c.Next() { - if err := migrateObjectToMetaBucket(l, tx, cnr, k, v); err != nil { - return err +func migrateContainerToMetaBucket(l *zap.Logger, tx *bbolt.Tx, c *bbolt.Cursor, cnr cid.ID, after []byte, limit uint) (uint, []byte, error) { + var k, v []byte + if after != nil { + if k, v = c.Seek(after); bytes.Equal(k, after) { + k, v = c.Next() } + } else { + k, v = c.First() } - return nil + var done uint + for ; k != nil; k, v = c.Next() { + ok, err := migrateObjectToMetaBucket(l, tx, cnr, k, v) + if err != nil { + return 0, nil, err + } + if ok { + if done++; done == limit { + break + } + } + } + return done, k, nil } -func migrateObjectToMetaBucket(l *zap.Logger, tx *bbolt.Tx, cnr cid.ID, id, bin []byte) error { +func migrateObjectToMetaBucket(l *zap.Logger, tx *bbolt.Tx, cnr cid.ID, id, bin []byte) (bool, error) { if len(id) != oid.Size { - return fmt.Errorf("wrong OID key len %d", len(id)) + return false, fmt.Errorf("wrong OID key len %d", len(id)) } var hdr object.Object if err := hdr.Unmarshal(bin); err != nil { l.Info("invalid object binary in the container bucket's value, ignoring", zap.Error(err), zap.Stringer("container", cnr), zap.Stringer("object", oid.ID(id)), zap.Binary("data", bin)) - return nil + return false, nil } if err := verifyHeaderForMetadata(hdr); err != nil { l.Info("invalid header in the container bucket, ignoring", zap.Error(err), zap.Stringer("container", cnr), zap.Stringer("object", oid.ID(id)), zap.Binary("data", bin)) - return nil + return false, nil } par := hdr.Parent() hasParent := par != nil if err := putMetadataForObject(tx, hdr, hasParent, true); err != nil { - return fmt.Errorf("put metadata for object %s: %w", oid.ID(id), err) + return false, fmt.Errorf("put metadata for object %s: %w", oid.ID(id), err) } if hasParent && !par.GetID().IsZero() { // skip the first object without useful info similar to DB.put if err := verifyHeaderForMetadata(hdr); err != nil { l.Info("invalid parent header in the container bucket, ignoring", zap.Error(err), zap.Stringer("container", cnr), zap.Stringer("child", oid.ID(id)), zap.Stringer("parent", par.GetID()), zap.Binary("data", bin)) - return nil + return false, nil } if err := putMetadataForObject(tx, *par, false, false); err != nil { - return fmt.Errorf("put metadata for parent of object %s: %w", oid.ID(id), err) + return false, fmt.Errorf("put metadata for parent of object %s: %w", oid.ID(id), err) } } - return nil + return true, nil } diff --git a/pkg/local_object_storage/metabase/version_test.go b/pkg/local_object_storage/metabase/version_test.go index 9cf2e619be..89cce0d7f9 100644 --- a/pkg/local_object_storage/metabase/version_test.go +++ b/pkg/local_object_storage/metabase/version_test.go @@ -833,4 +833,114 @@ func TestMigrate3to4(t *testing.T) { // assert all others are available assertSearchResult(t, db, cnr, nil, nil, searchResultForIDs(ids)) }) + t.Run("various object sets", func(t *testing.T) { + for _, tc := range []struct { + name string + m map[object.Type][]uint + }{ + {name: "no objects", m: nil}, + {name: "empty containers only", m: map[object.Type][]uint{ + object.TypeRegular: make([]uint, 3), + object.TypeTombstone: make([]uint, 5), + object.TypeStorageGroup: make([]uint, 10), + object.TypeLock: make([]uint, 1), + object.TypeLink: make([]uint, 100), + }}, + {name: "some containers are empty", m: map[object.Type][]uint{ + object.TypeRegular: {1, 7, 0, 20}, + object.TypeTombstone: {0, 15, 0}, + object.TypeStorageGroup: make([]uint, 10), + }}, + {name: "some containers are empty", m: map[object.Type][]uint{ + object.TypeRegular: {1, 7, 0, 20}, + object.TypeTombstone: {0, 15, 0}, + object.TypeStorageGroup: make([]uint, 10), + }}, + {name: "one big container", m: map[object.Type][]uint{ + object.TypeRegular: {3999}, + }}, + {name: "big counts", m: map[object.Type][]uint{ + object.TypeRegular: {200, 700, 600}, + object.TypeTombstone: {20, 30}, + object.TypeStorageGroup: {10, 0, 20, 0, 30, 0, 40, 0}, + object.TypeLock: {1, 2, 3, 4, 5, 6, 7, 8, 9}, + object.TypeLink: {99}, + }}, + {name: "big counts aligned", m: map[object.Type][]uint{ + object.TypeRegular: {1000}, + object.TypeTombstone: {500, 500, 500}, + object.TypeStorageGroup: {200, 200, 200, 200, 200}, + }}, + {name: "big counts not aligned", m: map[object.Type][]uint{ + object.TypeRegular: {999, 999}, + object.TypeTombstone: {999}, + object.TypeStorageGroup: {999, 999, 999}, + }}, + } { + t.Run(tc.name, func(t *testing.T) { testMigrationV3To4(t, tc.m) }) + } + }) +} + +func TestSlicesCloneNil(t *testing.T) { + // not stated in docs, but migrateContainersToMetaBucket relies on this + require.Nil(t, slices.Clone([]byte(nil))) +} + +func testMigrationV3To4(t *testing.T, mAll map[object.Type][]uint) { + db := newDB(t) + // force version#3 + require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error { + bkt := tx.Bucket([]byte{0x05}) + require.NotNil(t, bkt) + require.NoError(t, bkt.Put([]byte("version"), []byte{0x03, 0, 0, 0, 0, 0, 0, 0})) + return nil + })) + // store configured objects + mCnrs := make(map[cid.ID][]oid.ID) + for typ, counts := range mAll { + for _, count := range counts { + cnr := cidtest.ID() + var ids []oid.ID + for range count { + id := oidtest.ID() + require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error { + var obj object.Object + obj.SetID(id) + obj.SetContainerID(cnr) + obj.SetOwner(usertest.ID()) + obj.SetPayloadChecksum(checksumtest.Checksum()) + + var prefix byte + switch typ { + default: + t.Fatalf("unexpected object type %v", typ) + case object.TypeRegular: + prefix = 0x06 + case object.TypeTombstone: + prefix = 0x09 + case object.TypeStorageGroup: + prefix = 0x08 + case object.TypeLock: + prefix = 0x07 + case object.TypeLink: + prefix = 0x12 + } + b, err := tx.CreateBucketIfNotExists(slices.Concat([]byte{prefix}, cnr[:])) + require.NoError(t, err) + require.NoError(t, b.Put(id[:], obj.Marshal())) + return nil + })) + ids = append(ids, id) + } + mCnrs[cnr] = ids + } + } + // migrate + require.NoError(t, db.Init()) + // TODO: would also be nice to check tx num which is known + // check all objects are available + for cnr, ids := range mCnrs { + assertSearchResult(t, db, cnr, nil, nil, searchResultForIDs(sortObjectIDs(ids))) + } } From 58bdd7827d6b433a7dbd25460371bea6db885e9d Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 6 Mar 2025 16:49:26 +0300 Subject: [PATCH 4/6] node/metabase: Ignore already objects migrated to metabucket earlier With f105b24ae3839fa1828a7d875d62a57bfd98e790, metabase migration V3->4 splits the whole process in 1000-object batches. Each batch is done in a single BoltDB transaction. Transaction multiplicity automatically introduced the possibility of partial filling of the metabucket, for example, when node was suffered accidental shutdown. Therefore, when restarting the migration procedure, some objects are ready, and they are not skipped. Although it is worth noting that repeating the object write is not an error, it is just no-op in terms of storage. This makes migrator to check whether next object has already been stored in the metabucket, and skip it if so. Refs #3202. Signed-off-by: Leonard Lyubich --- pkg/local_object_storage/metabase/version.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/local_object_storage/metabase/version.go b/pkg/local_object_storage/metabase/version.go index 596c4764bc..86c4518534 100644 --- a/pkg/local_object_storage/metabase/version.go +++ b/pkg/local_object_storage/metabase/version.go @@ -191,9 +191,10 @@ func migrateContainerToMetaBucket(l *zap.Logger, tx *bbolt.Tx, c *bbolt.Cursor, } else { k, v = c.First() } + metaBkt := tx.Bucket(metaBucketKey(cnr)) // may be nil var done uint for ; k != nil; k, v = c.Next() { - ok, err := migrateObjectToMetaBucket(l, tx, cnr, k, v) + ok, err := migrateObjectToMetaBucket(l, tx, metaBkt, cnr, k, v) if err != nil { return 0, nil, err } @@ -206,7 +207,7 @@ func migrateContainerToMetaBucket(l *zap.Logger, tx *bbolt.Tx, c *bbolt.Cursor, return done, k, nil } -func migrateObjectToMetaBucket(l *zap.Logger, tx *bbolt.Tx, cnr cid.ID, id, bin []byte) (bool, error) { +func migrateObjectToMetaBucket(l *zap.Logger, tx *bbolt.Tx, metaBkt *bbolt.Bucket, cnr cid.ID, id, bin []byte) (bool, error) { if len(id) != oid.Size { return false, fmt.Errorf("wrong OID key len %d", len(id)) } @@ -221,6 +222,13 @@ func migrateObjectToMetaBucket(l *zap.Logger, tx *bbolt.Tx, cnr cid.ID, id, bin zap.Stringer("container", cnr), zap.Stringer("object", oid.ID(id)), zap.Binary("data", bin)) return false, nil } + if metaBkt != nil { + key := [1 + oid.Size]byte{metaPrefixID} + copy(key[1:], id) + if metaBkt.Get(key[:]) != nil { + return false, nil + } + } par := hdr.Parent() hasParent := par != nil if err := putMetadataForObject(tx, hdr, hasParent, true); err != nil { From a5fa0b1793e07325f5f418a02573174b54a606f2 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 6 Mar 2025 18:25:40 +0300 Subject: [PATCH 5/6] node: Interrupt metabase V3->4 process by OS signal f105b24ae3839fa1828a7d875d62a57bfd98e790 opened up the ability to gracefully interrupt the migration process fast. This binds migrator to OS signal-tracking SN app context. Once context is done, migrator does not spawn the next BoltDB transaction. Refs #3202. Signed-off-by: Leonard Lyubich --- CHANGELOG.md | 1 + cmd/neofs-node/storage.go | 1 + pkg/local_object_storage/metabase/db.go | 11 +++++++++++ pkg/local_object_storage/metabase/version.go | 6 ++++++ 4 files changed, 19 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c1cf275f80..8b683b90b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ Changelog for NeoFS Node - Metabase V3->4 migration routine no longer breaks on broken binary or proto-violating object encounter (#3203) - Metabase V3->4 migration routine skips objects from the removed containers (#3203) - Metabase V3->4 migration routine processes up to 1000 objects per BoltDB transaction and keeps the progress (#3202) +- Metabase V3->4 migration routine stops gracefully on OS signal (#3202) ### Removed diff --git a/cmd/neofs-node/storage.go b/cmd/neofs-node/storage.go index 506a171545..27a1ea34dd 100644 --- a/cmd/neofs-node/storage.go +++ b/cmd/neofs-node/storage.go @@ -185,6 +185,7 @@ func (c *cfg) shardOpts() []shardOptsWithID { meta.WithLogger(c.log), meta.WithEpochState(c.cfgNetmap.state), meta.WithContainers(containerPresenceChecker{src: c.cfgObject.cnrSource}), + meta.WithInitContext(c.ctx), ), shard.WithPiloramaOptions(piloramaOpts...), shard.WithWriteCache(shCfg.WritecacheCfg.Enabled), diff --git a/pkg/local_object_storage/metabase/db.go b/pkg/local_object_storage/metabase/db.go index 20afabed99..97d3b9779c 100644 --- a/pkg/local_object_storage/metabase/db.go +++ b/pkg/local_object_storage/metabase/db.go @@ -2,6 +2,7 @@ package meta import ( "bytes" + "context" "encoding/binary" "encoding/hex" "io/fs" @@ -65,6 +66,8 @@ type cfg struct { epochState EpochState containers Containers + + initCtx context.Context } func defaultCfg() *cfg { @@ -75,6 +78,7 @@ func defaultCfg() *cfg { boltBatchDelay: bbolt.DefaultMaxBatchDelay, boltBatchSize: bbolt.DefaultMaxBatchSize, log: zap.L(), + initCtx: context.Background(), } } @@ -357,3 +361,10 @@ func WithEpochState(s EpochState) Option { // WithContainers return option to specify container source. func WithContainers(cs Containers) Option { return func(c *cfg) { c.containers = cs } } + +// WithInitContext allows to specify context for [DB.Init] operation. +func WithInitContext(ctx context.Context) Option { + // TODO: accept context parameter instead. Current approach allowed to reduce + // code changes. + return func(c *cfg) { c.initCtx = ctx } +} diff --git a/pkg/local_object_storage/metabase/version.go b/pkg/local_object_storage/metabase/version.go index 86c4518534..377130cd7e 100644 --- a/pkg/local_object_storage/metabase/version.go +++ b/pkg/local_object_storage/metabase/version.go @@ -2,6 +2,7 @@ package meta import ( "bytes" + "context" "encoding/binary" "errors" "fmt" @@ -127,6 +128,11 @@ func migrateFrom2VersionTx(tx *bbolt.Tx, epochState EpochState) error { func migrateFrom3Version(db *DB) error { var fromBkt, afterObj []byte for { + select { + case <-db.initCtx.Done(): + return context.Cause(db.initCtx) + default: + } if err := db.boltDB.Update(func(tx *bbolt.Tx) error { var err error if fromBkt, afterObj, err = migrateContainersToMetaBucket(db.log, db.cfg.containers, tx, fromBkt, afterObj); err == nil { From eea3ce1bc73cda70c93f4c51644f5abcf0267354 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 6 Mar 2025 21:33:39 +0300 Subject: [PATCH 6/6] *: deobfuscate some if statements We have enough complexity in this code base already, no need to add more with things like this. Signed-off-by: Roman Khimov --- pkg/core/object/metadata.go | 3 ++- pkg/local_object_storage/blobstor/iterate_test.go | 3 ++- pkg/local_object_storage/metabase/version.go | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/core/object/metadata.go b/pkg/core/object/metadata.go index 48b4510e69..351c8d66dd 100644 --- a/pkg/core/object/metadata.go +++ b/pkg/core/object/metadata.go @@ -157,7 +157,8 @@ func calcMaxUniqueSearchResults(lim uint16, sets [][]client.SearchResultItem) ui } } } - if n++; n == lim { + n++ + if n == lim { return n } } diff --git a/pkg/local_object_storage/blobstor/iterate_test.go b/pkg/local_object_storage/blobstor/iterate_test.go index 19cfd9b159..1e115db81c 100644 --- a/pkg/local_object_storage/blobstor/iterate_test.go +++ b/pkg/local_object_storage/blobstor/iterate_test.go @@ -192,7 +192,8 @@ func TestIterate_IgnoreErrors(t *testing.T) { // n := 0 // expectedErr := errors.New("expected error") // prm.SetIterationHandler(func(e IterationElement) error { - // if n++; n == objCount/2 { + // n++ + // if n == objCount/2 { // return expectedErr // } // return nil diff --git a/pkg/local_object_storage/metabase/version.go b/pkg/local_object_storage/metabase/version.go index 377130cd7e..bcebb7b849 100644 --- a/pkg/local_object_storage/metabase/version.go +++ b/pkg/local_object_storage/metabase/version.go @@ -205,7 +205,8 @@ func migrateContainerToMetaBucket(l *zap.Logger, tx *bbolt.Tx, c *bbolt.Cursor, return 0, nil, err } if ok { - if done++; done == limit { + done++ + if done == limit { break } }