Skip to content

Commit

Permalink
node/metabase: Assert object header does not exceed the size limit
Browse files Browse the repository at this point in the history
Object headers must not exceed 16K. This should be checked before saving
to the metabucket. This also ensures that the BoltDB limits are met: any
key should not exceed 32K.

Refs #3203.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Mar 5, 2025
1 parent 9fa1c6a commit 335aa84
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pkg/local_object_storage/metabase/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ func invalidMetaBucketKeyErr(key []byte, cause error) error {
// checks whether given header corresponds to metadata bucket requirements and
// limits.
func verifyHeaderForMetadata(hdr object.Object) error {
if ln := hdr.HeaderLen(); ln > object.MaxHeaderLen {
return fmt.Errorf("header len %d exceeds the limit", ln)
}
if hdr.GetContainerID().IsZero() {
return fmt.Errorf("invalid container: %w", cid.ErrZero)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/local_object_storage/metabase/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func assertIntAttr(t testing.TB, mb *bbolt.Bucket, id oid.ID, attr string, origi
assertPrefixedAttrIDPresence(t, mb, id, true, attr, val, true)
}

func TestHeaderLimit(t *testing.T) { require.Less(t, object.MaxHeaderLen, bbolt.MaxKeySize) }

func TestPutMetadata(t *testing.T) {
db := newDB(t)
cnr := cidtest.ID()
Expand Down
47 changes: 47 additions & 0 deletions pkg/local_object_storage/metabase/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,4 +676,51 @@ func TestMigrate3to4(t *testing.T) {
"data": base64.StdEncoding.EncodeToString(invalidProtobuf),
})
})
t.Run("header limit overflow", func(t *testing.T) {
db := newDB(t)
cnr := cidtest.ID()
ids := sortObjectIDs(oidtest.IDs(5))
objs := make([]object.Object, len(ids))
objBins := make([][]byte, len(ids))
for i := range ids {
objs[i].SetContainerID(cnr)
objs[i].SetID(ids[i])
objs[i].SetOwner(usertest.ID())
objs[i].SetPayloadChecksum(checksumtest.Checksum())
objBins[i] = objs[i].Marshal()
}
// store objects and force version#3
require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error {
b, err := tx.CreateBucket(slices.Concat([]byte{primaryPrefix}, cnr[:]))
require.NoError(t, err)
for i := range objBins {
require.NoError(t, b.Put(ids[i][:], objBins[i]))
}
bkt := tx.Bucket([]byte{0x05})
require.NotNil(t, bkt)
require.NoError(t, bkt.Put([]byte("version"), []byte{0x03, 0, 0, 0, 0, 0, 0, 0}))
return nil
}))
// assert all available
resSelect, err := db.Select(cnr, nil)
require.NoError(t, err)
require.Len(t, resSelect, len(ids))
for i := range ids {
require.True(t, slices.ContainsFunc(resSelect, func(addr oid.Address) bool { return addr.Object() == ids[i] }))
}
// corrupt one object
bigAttrVal := make([]byte, 16<<10)
rand.Read(bigAttrVal) //nolint:staticcheck
objs[1].SetAttributes(*object.NewAttribute("attr", base64.StdEncoding.EncodeToString(bigAttrVal))) // preserve valid chars
objBins[1] = objs[1].Marshal()
require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(slices.Concat([]byte{primaryPrefix}, cnr[:]))
require.NotNil(t, b)
require.NoError(t, b.Put(ids[1][:], objBins[1]))
return nil
}))
// migrate
require.EqualError(t, db.Init(), fmt.Sprintf("migrating from meta version 3 failed, consider database resync: "+
"process container 0x6%s bucket: invalid object %s: header len %d exceeds the limit", cnr, ids[1], objs[1].HeaderLen()))
})
}

0 comments on commit 335aa84

Please sign in to comment.