Skip to content

Commit

Permalink
node/metabase: Ignore NeoFS protocol violations on metabucket migration
Browse files Browse the repository at this point in the history
Same as 85616ae but for NeoFS protocol
disorders.

Refs #3203.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Mar 5, 2025
1 parent 335aa84 commit 7dbda76
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 10 deletions.
9 changes: 7 additions & 2 deletions pkg/local_object_storage/metabase/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ func migrateFrom3Version(db *DB, tx *bbolt.Tx) error {
return nil
}
if err := verifyHeaderForMetadata(hdr); err != nil {
return fmt.Errorf("invalid object %s: %w", id, err)
db.log.Info("invalid header in the container bucket, ignoring", zap.Error(err),
zap.Stringer("container", cnr), zap.Stringer("object", id), zap.Binary("data", v))
return nil
}
par := hdr.Parent()
hasParent := par != nil
Expand All @@ -149,7 +151,10 @@ func migrateFrom3Version(db *DB, tx *bbolt.Tx) error {
}
if hasParent && !par.GetID().IsZero() { // skip the first object without useful info similar to DB.put
if err := verifyHeaderForMetadata(hdr); err != nil {
return fmt.Errorf("invalid parent of object %s: %w", id, err)
db.log.Info("invalid parent header in the container bucket, ignoring", zap.Error(err),
zap.Stringer("container", cnr), zap.Stringer("child", id),
zap.Stringer("parent", par.GetID()), zap.Binary("data", v))
return nil
}
if err := putMetadataForObject(tx, *par, false, false); err != nil {
return fmt.Errorf("put metadata for parent of object %s: %w", id, err)
Expand Down
53 changes: 45 additions & 8 deletions pkg/local_object_storage/metabase/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,12 @@ func TestMigrate3to4(t *testing.T) {
t.Run("failure", func(t *testing.T) {
t.Run("zero by in attribute", func(t *testing.T) {
testWithAttr := func(t *testing.T, k, v, msg string) {
db := newDB(t)
var logBuf zaptest.Buffer
db := newDB(t, WithLogger(zap.New(zapcore.NewCore(
zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()),
zap.CombineWriteSyncers(&logBuf),
zapcore.InfoLevel,
))))
cnr := cid.ID{74, 207, 174, 156, 40, 231, 114, 55, 114, 92, 232, 152, 106, 247, 193, 112, 158, 52, 3, 52, 184, 14, 75, 215, 86, 203, 76, 88, 158, 253, 241, 195}
id := oid.ID{254, 229, 187, 147, 179, 23, 187, 50, 37, 212, 113, 82, 18, 24, 192, 81, 251, 204, 82, 56, 211, 244, 161, 185, 71, 248, 118, 213, 134, 26, 49, 79}
var obj object.Object
Expand All @@ -598,10 +603,23 @@ func TestMigrate3to4(t *testing.T) {
return bkt.Put([]byte("version"), []byte{0x03, 0, 0, 0, 0, 0, 0, 0})
}))
require.NoError(t, err)
// try to migrate
require.EqualError(t, db.Init(), "migrating from meta version 3 failed, consider database resync: "+
"process container 0x6632qzc5qrxpvB1PZam23Xq5AXQ5Kbt2h6G1gtWDb8AzW bucket: "+
"invalid object JA1jTW3qwWK9hWs95tesMVbrSLpjCjW6URv8xM7woPnv: "+msg)
// migrate
require.NoError(t, db.Init())
// assert ignored
assertSearchResult(t, db, cnr, nil, nil, nil)
// assert log message
msgs := logBuf.Lines()
require.Len(t, msgs, 1)
var m map[string]any
require.NoError(t, json.Unmarshal([]byte(msgs[0]), &m))
require.Subset(t, m, map[string]any{
"level": "info",
"msg": "invalid header in the container bucket, ignoring",
"error": msg,
"container": "632qzc5qrxpvB1PZam23Xq5AXQ5Kbt2h6G1gtWDb8AzW",
"object": "JA1jTW3qwWK9hWs95tesMVbrSLpjCjW6URv8xM7woPnv",
"data": base64.StdEncoding.EncodeToString(obj.Marshal()),
})
}
t.Run("in key", func(t *testing.T) {
testWithAttr(t, "k\x00y", "value", "attribute #1 key contains 0x00 byte used in sep")
Expand Down Expand Up @@ -677,7 +695,12 @@ func TestMigrate3to4(t *testing.T) {
})
})
t.Run("header limit overflow", func(t *testing.T) {
db := newDB(t)
var logBuf zaptest.Buffer
db := newDB(t, WithLogger(zap.New(zapcore.NewCore(
zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()),
zap.CombineWriteSyncers(&logBuf),
zapcore.InfoLevel,
))))
cnr := cidtest.ID()
ids := sortObjectIDs(oidtest.IDs(5))
objs := make([]object.Object, len(ids))
Expand Down Expand Up @@ -720,7 +743,21 @@ func TestMigrate3to4(t *testing.T) {
return nil
}))
// migrate
require.EqualError(t, db.Init(), fmt.Sprintf("migrating from meta version 3 failed, consider database resync: "+
"process container 0x6%s bucket: invalid object %s: header len %d exceeds the limit", cnr, ids[1], objs[1].HeaderLen()))
require.NoError(t, db.Init())
// assert all others are available
assertSearchResult(t, db, cnr, nil, nil, searchResultForIDs(slices.Concat(ids[:1], ids[2:])))
// assert log message
msgs := logBuf.Lines()
require.Len(t, msgs, 1)
var m map[string]any
require.NoError(t, json.Unmarshal([]byte(msgs[0]), &m))
require.Subset(t, m, map[string]any{
"level": "info",
"msg": "invalid header in the container bucket, ignoring",
"error": fmt.Sprintf("header len %d exceeds the limit", objs[1].HeaderLen()),
"container": cnr.String(),
"object": ids[1].String(),
"data": base64.StdEncoding.EncodeToString(objBins[1]),
})
})
}

0 comments on commit 7dbda76

Please sign in to comment.