Skip to content

Commit

Permalink
node/metabase: Unify header metadata processing
Browse files Browse the repository at this point in the history
Unify the object header check before writing to the meta bucket and the
actual write for `Put` and migration routine. Also split header errors
from the BoltDB transaction ones.

Refs #3203.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Mar 6, 2025
1 parent 70d625e commit 316c1c9
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 56 deletions.
63 changes: 34 additions & 29 deletions pkg/local_object_storage/metabase/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,30 +49,19 @@ func invalidMetaBucketKeyErr(key []byte, cause error) error {
return fmt.Errorf("invalid meta bucket key (prefix 0x%X): %w", key[0], cause)
}

func putMetadataForObject(tx *bbolt.Tx, hdr object.Object, hasParent, phy bool) error {
owner := hdr.Owner()
if owner.IsZero() {
// checks whether given header corresponds to metadata bucket requirements and
// limits.
func verifyHeaderForMetadata(hdr object.Object) error {
if hdr.GetContainerID().IsZero() {
return fmt.Errorf("invalid container: %w", cid.ErrZero)
}
if hdr.Owner().IsZero() {
return fmt.Errorf("invalid owner: %w", user.ErrZeroID)
}
pldHash, ok := hdr.PayloadChecksum()
if !ok {
if _, ok := hdr.PayloadChecksum(); !ok {
return errors.New("missing payload checksum")
}
var ver version.Version
if v := hdr.Version(); v != nil {
ver = *v
}
var pldHmmHash []byte
if h, ok := hdr.PayloadHomomorphicHash(); ok {
pldHmmHash = h.Value()
}
return putMetadata(tx, hdr.GetContainerID(), hdr.GetID(), ver, owner, hdr.Type(), hdr.CreationEpoch(), hdr.PayloadSize(), pldHash.Value(),
pldHmmHash, hdr.SplitID().ToV2(), hdr.GetParentID(), hdr.GetFirstID(), hdr.Attributes(), hasParent, phy)
}

func putMetadata(tx *bbolt.Tx, cnr cid.ID, id oid.ID, ver version.Version, owner user.ID, typ object.Type, creationEpoch uint64,
payloadLen uint64, pldHash, pldHmmHash, splitID []byte, parentID, firstID oid.ID, attrs []object.Attribute,
hasParent, phy bool) error {
attrs := hdr.Attributes()
for i := range attrs {
if strings.IndexByte(attrs[i].Key(), attributeDelimiter[0]) >= 0 {
return fmt.Errorf("attribute #%d key contains 0x%02X byte used in sep", i, attributeDelimiter[0])
Expand All @@ -81,57 +70,72 @@ func putMetadata(tx *bbolt.Tx, cnr cid.ID, id oid.ID, ver version.Version, owner
return fmt.Errorf("attribute #%d value contains 0x%02X byte used in sep", i, attributeDelimiter[0])
}
}
return nil
}

metaBkt, err := tx.CreateBucketIfNotExists(metaBucketKey(cnr))
// returns BoltDB errors only.
func putMetadataForObject(tx *bbolt.Tx, hdr object.Object, hasParent, phy bool) error {
metaBkt, err := tx.CreateBucketIfNotExists(metaBucketKey(hdr.GetContainerID()))
if err != nil {
return fmt.Errorf("create meta bucket for container: %w", err)
}
id := hdr.GetID()
idk := [1 + oid.Size]byte{metaPrefixID}
copy(idk[1:], id[:])
if err := metaBkt.Put(idk[:], nil); err != nil {
return fmt.Errorf("put object ID to container's meta bucket: %w", err)
}

var keyBuf keyBuffer
var ver version.Version
if v := hdr.Version(); v != nil {
ver = *v
}
if err = putPlainAttribute(metaBkt, &keyBuf, id, object.FilterVersion, ver.String()); err != nil {
return err
}
owner := hdr.Owner()
if err = putPlainAttribute(metaBkt, &keyBuf, id, object.FilterOwnerID, string(owner[:])); err != nil {
return err
}
typ := hdr.Type()
if err = putPlainAttribute(metaBkt, &keyBuf, id, object.FilterType, typ.String()); err != nil {
return err
}
creationEpoch := hdr.CreationEpoch()
if err = putIntAttribute(metaBkt, &keyBuf, id, object.FilterCreationEpoch, strconv.FormatUint(creationEpoch, 10), new(big.Int).SetUint64(creationEpoch)); err != nil {
return err
}
payloadLen := hdr.PayloadSize()
if err = putIntAttribute(metaBkt, &keyBuf, id, object.FilterPayloadSize, strconv.FormatUint(payloadLen, 10), new(big.Int).SetUint64(payloadLen)); err != nil {
return err
}
if err = putPlainAttribute(metaBkt, &keyBuf, id, object.FilterPayloadChecksum, string(pldHash)); err != nil {
return err
if h, ok := hdr.PayloadChecksum(); ok {
if err = putPlainAttribute(metaBkt, &keyBuf, id, object.FilterPayloadChecksum, string(h.Value())); err != nil {
return err
}
}
if len(pldHmmHash) > 0 {
if err = putPlainAttribute(metaBkt, &keyBuf, id, object.FilterPayloadHomomorphicHash, string(pldHmmHash)); err != nil {
if h, ok := hdr.PayloadHomomorphicHash(); ok {
if err = putPlainAttribute(metaBkt, &keyBuf, id, object.FilterPayloadHomomorphicHash, string(h.Value())); err != nil {
return err
}
}
if len(splitID) > 0 {
if splitID := hdr.SplitID().ToV2(); len(splitID) > 0 {
if err = putPlainAttribute(metaBkt, &keyBuf, id, object.FilterSplitID, string(splitID)); err != nil {
return err
}
}
if !firstID.IsZero() {
if firstID := hdr.GetFirstID(); !firstID.IsZero() {
if err = putPlainAttribute(metaBkt, &keyBuf, id, object.FilterFirstSplitObject, string(firstID[:])); err != nil {
return err
}
}
if !parentID.IsZero() {
if parentID := hdr.GetParentID(); !parentID.IsZero() {
if err = putPlainAttribute(metaBkt, &keyBuf, id, object.FilterParentID, string(parentID[:])); err != nil {
return err
}
}
if !hasParent && typ == object.TypeRegular {
if !hasParent && hdr.Type() == object.TypeRegular {
if err = putPlainAttribute(metaBkt, &keyBuf, id, object.FilterRoot, binPropMarker); err != nil {
return err
}
Expand All @@ -141,6 +145,7 @@ func putMetadata(tx *bbolt.Tx, cnr cid.ID, id oid.ID, ver version.Version, owner
return err
}
}
attrs := hdr.Attributes()
for i := range attrs {
ak, av := attrs[i].Key(), attrs[i].Value()
if n, isInt := parseInt(av); isInt && intWithinLimits(n) {
Expand Down
12 changes: 10 additions & 2 deletions pkg/local_object_storage/metabase/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
"github.com/nspcc-dev/neofs-sdk-go/user"
usertest "github.com/nspcc-dev/neofs-sdk-go/user/test"
"github.com/nspcc-dev/neofs-sdk-go/version"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
Expand Down Expand Up @@ -144,10 +145,10 @@ func TestPutMetadata(t *testing.T) {
require.EqualError(t, db.Put(&obj, nil, nil), msg)
}
t.Run("in key", func(t *testing.T) {
testWithAttr(t, "k\x00y", "value", "put metadata: attribute #1 key contains 0x00 byte used in sep")
testWithAttr(t, "k\x00y", "value", "attribute #1 key contains 0x00 byte used in sep")
})
t.Run("in value", func(t *testing.T) {
testWithAttr(t, "key", "va\x00ue", "put metadata: attribute #1 value contains 0x00 byte used in sep")
testWithAttr(t, "key", "va\x00ue", "attribute #1 value contains 0x00 byte used in sep")
})
})
})
Expand Down Expand Up @@ -199,6 +200,7 @@ func TestPutMetadata(t *testing.T) {
var obj object.Object
obj.SetContainerID(cnr)
obj.SetID(id)
obj.SetOwner(usertest.ID()) // Put requires
obj.SetPayloadChecksum(pldHash) // Put requires

require.NoError(t, db.Put(&obj, nil, nil))
Expand Down Expand Up @@ -670,6 +672,7 @@ func TestDB_SearchObjects(t *testing.T) {
for i := range objs {
objs[i].SetContainerID(cnr)
objs[i].SetID(ids[i])
objs[i].SetOwner(usertest.ID()) // required to Put
objs[i].SetPayloadChecksum(checksumtest.Checksum()) // required to Put

err := db.Put(&objs[i], nil, nil)
Expand Down Expand Up @@ -1178,6 +1181,7 @@ func TestDB_SearchObjects(t *testing.T) {
for i := range objs {
objs[i].SetContainerID(cnr)
objs[i].SetID(ids[i])
objs[i].SetOwner(usertest.ID()) // Put requires
objs[i].SetPayloadChecksum(checksumtest.Checksum()) // Put requires
require.NoError(t, db.Put(&objs[i], nil, nil))
}
Expand Down Expand Up @@ -1407,6 +1411,7 @@ func TestDB_SearchObjects(t *testing.T) {
cnr := cidtest.ID()
for i := range objs {
objs[i].SetContainerID(cnr)
objs[i].SetOwner(usertest.ID()) // Put requires
objs[i].SetPayloadChecksum(checksumtest.Checksum()) // Put requires
require.NoError(t, db.Put(&objs[i], nil, nil))
}
Expand Down Expand Up @@ -1481,6 +1486,7 @@ func TestDB_SearchObjects(t *testing.T) {
for i := range objs {
objs[i].SetID(ids[i])
objs[i].SetContainerID(cnr)
objs[i].SetOwner(usertest.ID()) // Put requires
objs[i].SetPayloadChecksum(checksumtest.Checksum()) // Put requires
require.NoError(t, db.Put(&objs[i], nil, nil))
}
Expand Down Expand Up @@ -1539,6 +1545,7 @@ func TestDB_SearchObjects(t *testing.T) {
for i := range objs {
objs[i].SetID(ids[len(ids)-i-1])
objs[i].SetContainerID(cnr)
objs[i].SetOwner(usertest.ID()) // Put requires
objs[i].SetPayloadChecksum(checksumtest.Checksum()) // Put requires
require.NoError(t, db.Put(&objs[i], nil, nil))
}
Expand Down Expand Up @@ -1597,6 +1604,7 @@ func TestDB_SearchObjects(t *testing.T) {
if i == 3 {
appendAttribute(&objs[i], object.AttributeExpirationEpoch, "11")
}
objs[i].SetOwner(usertest.ID()) // Put requires
objs[i].SetPayloadChecksum(checksumtest.Checksum()) // Put requires
require.NoError(t, db.Put(&objs[i], nil, nil))
}
Expand Down
28 changes: 4 additions & 24 deletions pkg/local_object_storage/metabase/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/user"
"github.com/nspcc-dev/neofs-sdk-go/version"
"go.etcd.io/bbolt"
)

Expand Down Expand Up @@ -66,17 +64,8 @@ func (db *DB) Put(obj *objectSDK.Object, storageID []byte, binHeader []byte) err
func (db *DB) put(
tx *bbolt.Tx, obj *objectSDK.Object, id []byte,
si *objectSDK.SplitInfo, currEpoch uint64, hdrBin []byte) error {
cnr := obj.GetContainerID()
if cnr.IsZero() {
return errors.New("missing container in object")
}
owner := obj.OwnerID()
if owner == nil {
return user.ErrZeroID
}
pldHash, ok := obj.PayloadChecksum()
if !ok {
return errors.New("missing payload checksum")
if err := verifyHeaderForMetadata(*obj); err != nil {
return err
}

isParent := si != nil
Expand Down Expand Up @@ -143,7 +132,7 @@ func (db *DB) put(

// update container volume size estimation
if obj.Type() == objectSDK.TypeRegular && !isParent {
err = changeContainerSize(tx, cnr, obj.PayloadSize(), true)
err = changeContainerSize(tx, obj.GetContainerID(), obj.PayloadSize(), true)
if err != nil {
return err
}
Expand All @@ -163,16 +152,7 @@ func (db *DB) put(
}
}

var ver version.Version
if v := obj.Version(); v != nil {
ver = *v
}
var pldHmmHash []byte
if h, ok := obj.PayloadHomomorphicHash(); ok {
pldHmmHash = h.Value()
}
if err := putMetadata(tx, cnr, obj.GetID(), ver, *owner, obj.Type(), obj.CreationEpoch(), obj.PayloadSize(),
pldHash.Value(), pldHmmHash, obj.SplitID().ToV2(), obj.GetParentID(), obj.GetFirstID(), obj.Attributes(), par != nil, !isParent); err != nil {
if err := putMetadataForObject(tx, *obj, par != nil, !isParent); err != nil {
return fmt.Errorf("put metadata: %w", err)
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/local_object_storage/metabase/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,18 @@ func migrateFrom3Version(db *DB, tx *bbolt.Tx) error {
zap.Stringer("container", cnr), zap.Stringer("object", id), zap.Binary("data", v))
return nil
}
if err := verifyHeaderForMetadata(hdr); err != nil {
return fmt.Errorf("invalid object %s: %w", id, err)
}
par := hdr.Parent()
hasParent := par != nil
if err := putMetadataForObject(tx, hdr, hasParent, true); err != nil {
return fmt.Errorf("put metadata for object %s: %w", id, err)
}
if hasParent && !par.GetID().IsZero() { // skip the first object without useful info similar to DB.put
if err := verifyHeaderForMetadata(hdr); err != nil {
return fmt.Errorf("invalid parent of object %s: %w", id, err)
}
if err := putMetadataForObject(tx, *par, false, false); err != nil {
return fmt.Errorf("put metadata for parent of object %s: %w", id, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/local_object_storage/metabase/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ func TestMigrate3to4(t *testing.T) {
// try to migrate
require.EqualError(t, db.Init(), "migrating from meta version 3 failed, consider database resync: "+
"process container 0x6632qzc5qrxpvB1PZam23Xq5AXQ5Kbt2h6G1gtWDb8AzW bucket: "+
"put metadata for object JA1jTW3qwWK9hWs95tesMVbrSLpjCjW6URv8xM7woPnv: "+msg)
"invalid object JA1jTW3qwWK9hWs95tesMVbrSLpjCjW6URv8xM7woPnv: "+msg)
}
t.Run("in key", func(t *testing.T) {
testWithAttr(t, "k\x00y", "value", "attribute #1 key contains 0x00 byte used in sep")
Expand Down

0 comments on commit 316c1c9

Please sign in to comment.