Skip to content

Commit 19878bf

Browse files
committed
add MoveSubBucket test
Signed-off-by: Mustafa Elbehery <melbeher@redhat.com>
1 parent 3179540 commit 19878bf

File tree

4 files changed

+345
-3
lines changed

4 files changed

+345
-3
lines changed

bucket.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,8 @@ func (b *Bucket) DeleteBucket(key []byte) error {
297297
// Returns an error if
298298
// 1. the sub-bucket cannot be found in the source bucket;
299299
// 2. or the key already exists in the destination bucket;
300-
// 3. the key represents a non-bucket value.
300+
// 3. or the key represents a non-bucket value;
301+
// 4. the source and destination buckets are the same.
301302
func (b *Bucket) MoveBucket(key []byte, dstBucket *Bucket) error {
302303
if b.tx.db == nil || dstBucket.tx.db == nil {
303304
return errors.ErrTxClosed
@@ -319,7 +320,7 @@ func (b *Bucket) MoveBucket(key []byte, dstBucket *Bucket) error {
319320
// Do nothing (return true directly) if the source bucket and the
320321
// destination bucket are actually the same bucket.
321322
if b == dstBucket || (b.RootPage() == dstBucket.RootPage() && b.RootPage() != 0) {
322-
return nil
323+
return fmt.Errorf("source bucket %s and target bucket %s are the same: %w", b.String(), dstBucket.String(), errors.ErrSameBuckets)
323324
}
324325

325326
// check whether the key already exists in the destination bucket

errors/errors.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,12 @@ var (
6969
// ErrValueTooLarge is returned when inserting a value that is larger than MaxValueSize.
7070
ErrValueTooLarge = errors.New("value too large")
7171

72-
// ErrIncompatibleValue is returned when trying create or delete a bucket
72+
// ErrIncompatibleValue is returned when trying to create or delete a bucket
7373
// on an existing non-bucket key or when trying to create or delete a
7474
// non-bucket key on an existing bucket key.
7575
ErrIncompatibleValue = errors.New("incompatible value")
76+
77+
// ErrSameBuckets is returned when trying to move a sub-bucket between
78+
// source and target buckets, while source and target buckets are the same.
79+
ErrSameBuckets = errors.New("the source and target are the same bucket")
7680
)

movebucket_test.go

+291
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
package bbolt_test
2+
3+
import (
4+
"bytes"
5+
crand "crypto/rand"
6+
"math/rand"
7+
"os"
8+
"testing"
9+
10+
"go.etcd.io/bbolt"
11+
"go.etcd.io/bbolt/errors"
12+
"go.etcd.io/bbolt/internal/btesting"
13+
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
func TestTx_MoveBucket(t *testing.T) {
18+
testCases := []struct {
19+
name string
20+
srcBucketPath []string
21+
dstBucketPath []string
22+
bucketToMove string
23+
incompatibleKeyInSrc bool
24+
incompatibleKeyInDst bool
25+
parentSrc bool
26+
parentDst bool
27+
expActErr error
28+
}{
29+
{
30+
name: "happy path",
31+
srcBucketPath: []string{"sb1", "sb2", "sb3ToMove"},
32+
dstBucketPath: []string{"db1", "db2"},
33+
bucketToMove: "sb3ToMove",
34+
incompatibleKeyInSrc: false,
35+
incompatibleKeyInDst: false,
36+
parentSrc: true,
37+
parentDst: false,
38+
expActErr: nil,
39+
},
40+
{
41+
name: "bucketToMove not exist in srcBucket",
42+
srcBucketPath: []string{"sb1", "sb2"},
43+
dstBucketPath: []string{"db1", "db2"},
44+
bucketToMove: "sb3ToMove",
45+
incompatibleKeyInSrc: false,
46+
incompatibleKeyInDst: false,
47+
parentSrc: false,
48+
parentDst: false,
49+
expActErr: errors.ErrBucketNotFound,
50+
},
51+
{
52+
name: "bucketToMove exist in dstBucket",
53+
srcBucketPath: []string{"sb1", "sb2", "sb3ToMove"},
54+
dstBucketPath: []string{"db1", "db2", "sb3ToMove"},
55+
bucketToMove: "sb3ToMove",
56+
incompatibleKeyInSrc: false,
57+
incompatibleKeyInDst: false,
58+
parentSrc: true,
59+
parentDst: true,
60+
expActErr: errors.ErrBucketExists,
61+
},
62+
{
63+
name: "bucketToMove key exist in srcBucket but no subBucket value",
64+
srcBucketPath: []string{"sb1", "sb2"},
65+
dstBucketPath: []string{"db1", "db2"},
66+
bucketToMove: "sb3ToMove",
67+
incompatibleKeyInSrc: true,
68+
incompatibleKeyInDst: false,
69+
parentSrc: true,
70+
parentDst: false,
71+
expActErr: errors.ErrIncompatibleValue,
72+
},
73+
{
74+
name: "bucketToMove key exist in dstBucket but no subBucket value",
75+
srcBucketPath: []string{"sb1", "sb2", "sb3ToMove"},
76+
dstBucketPath: []string{"db1", "db2"},
77+
bucketToMove: "sb3ToMove",
78+
incompatibleKeyInSrc: false,
79+
incompatibleKeyInDst: true,
80+
parentSrc: true,
81+
parentDst: true,
82+
expActErr: errors.ErrIncompatibleValue,
83+
},
84+
{
85+
name: "srcBucket is rootBucket",
86+
srcBucketPath: []string{"", "sb3ToMove"},
87+
dstBucketPath: []string{"db1", "db2"},
88+
bucketToMove: "sb3ToMove",
89+
incompatibleKeyInSrc: false,
90+
incompatibleKeyInDst: false,
91+
parentSrc: true,
92+
parentDst: false,
93+
expActErr: nil,
94+
},
95+
{
96+
name: "dstBucket is rootBucket",
97+
srcBucketPath: []string{"sb1", "sb2", "sb3ToMove"},
98+
dstBucketPath: []string{""},
99+
bucketToMove: "sb3ToMove",
100+
incompatibleKeyInSrc: false,
101+
incompatibleKeyInDst: false,
102+
parentSrc: true,
103+
parentDst: false,
104+
expActErr: nil,
105+
},
106+
{
107+
name: "srcBucket is rootBucket and dstBucket is rootBucket",
108+
srcBucketPath: []string{"", "sb3ToMove"},
109+
dstBucketPath: []string{""},
110+
bucketToMove: "sb3ToMove",
111+
incompatibleKeyInSrc: false,
112+
incompatibleKeyInDst: false,
113+
parentSrc: false,
114+
parentDst: false,
115+
expActErr: errors.ErrSameBuckets,
116+
},
117+
}
118+
119+
for _, tc := range testCases {
120+
121+
t.Run(tc.name, func(*testing.T) {
122+
db := btesting.MustCreateDBWithOption(t, &bbolt.Options{PageSize: pageSize})
123+
124+
dumpBucketBeforeMoving := tempfile()
125+
dumpBucketAfterMoving := tempfile()
126+
127+
// arrange
128+
if err := db.Update(func(tx *bbolt.Tx) error {
129+
srcBucket := openBuckets(t, tx, tc.incompatibleKeyInSrc, true, false, tc.srcBucketPath...)
130+
dstBucket := openBuckets(t, tx, tc.incompatibleKeyInDst, true, false, tc.dstBucketPath...)
131+
132+
if tc.incompatibleKeyInSrc {
133+
if pErr := srcBucket.Put([]byte(tc.bucketToMove), []byte("0")); pErr != nil {
134+
t.Fatalf("error inserting key %v, and value %v in bucket %v: %v", tc.bucketToMove, "0", srcBucket, pErr)
135+
}
136+
}
137+
138+
if tc.incompatibleKeyInDst {
139+
if pErr := dstBucket.Put([]byte(tc.bucketToMove), []byte("0")); pErr != nil {
140+
t.Fatalf("error inserting key %v, and value %v in bucket %v: %v", tc.bucketToMove, "0", dstBucket, pErr)
141+
}
142+
}
143+
144+
return nil
145+
}); err != nil {
146+
t.Fatal(err)
147+
}
148+
db.MustCheck()
149+
150+
// act
151+
if err := db.Update(func(tx *bbolt.Tx) error {
152+
srcBucket := openBuckets(t, tx, false, false, tc.parentSrc, tc.srcBucketPath...)
153+
dstBucket := openBuckets(t, tx, false, false, tc.parentDst, tc.dstBucketPath...)
154+
155+
var bucketToMove *bbolt.Bucket
156+
if srcBucket != nil {
157+
bucketToMove = srcBucket.Bucket([]byte(tc.bucketToMove))
158+
} else {
159+
bucketToMove = tx.Bucket([]byte(tc.bucketToMove))
160+
}
161+
162+
if tc.expActErr == nil && bucketToMove != nil {
163+
if wErr := dumpBucket([]byte(tc.bucketToMove), bucketToMove, dumpBucketBeforeMoving); wErr != nil {
164+
t.Fatalf("error dumping bucket %v to file %v: %v", bucketToMove.String(), dumpBucketBeforeMoving, wErr)
165+
}
166+
}
167+
168+
mErr := tx.MoveBucket([]byte(tc.bucketToMove), srcBucket, dstBucket)
169+
require.ErrorIs(t, mErr, tc.expActErr)
170+
171+
return nil
172+
}); err != nil {
173+
t.Fatal(err)
174+
}
175+
db.MustCheck()
176+
177+
// skip assertion if failure expected
178+
if tc.expActErr != nil {
179+
return
180+
}
181+
182+
// assert
183+
if err := db.Update(func(tx *bbolt.Tx) error {
184+
var movedBucket *bbolt.Bucket
185+
srcBucket := openBuckets(t, tx, false, false, tc.parentSrc, tc.srcBucketPath...)
186+
187+
if srcBucket != nil {
188+
if movedBucket = srcBucket.Bucket([]byte(tc.bucketToMove)); movedBucket != nil {
189+
t.Fatalf("expected childBucket %v to be moved from srcBucket %v", tc.bucketToMove, srcBucket)
190+
}
191+
} else {
192+
if movedBucket = tx.Bucket([]byte(tc.bucketToMove)); movedBucket != nil {
193+
t.Fatalf("expected childBucket %v to be moved from root bucket %v", tc.bucketToMove, "root bucket")
194+
}
195+
}
196+
197+
dstBucket := openBuckets(t, tx, false, false, tc.parentDst, tc.dstBucketPath...)
198+
if dstBucket != nil {
199+
if movedBucket = dstBucket.Bucket([]byte(tc.bucketToMove)); movedBucket == nil {
200+
t.Fatalf("expected childBucket %v to be child of dstBucket %v", tc.bucketToMove, dstBucket)
201+
}
202+
} else {
203+
if movedBucket = tx.Bucket([]byte(tc.bucketToMove)); movedBucket == nil {
204+
t.Fatalf("expected childBucket %v to be child of dstBucket %v", tc.bucketToMove, "root bucket")
205+
}
206+
}
207+
208+
wErr := dumpBucket([]byte(tc.bucketToMove), movedBucket, dumpBucketAfterMoving)
209+
if wErr != nil {
210+
t.Fatalf("error dumping bucket %v to file %v", movedBucket.String(), dumpBucketAfterMoving)
211+
}
212+
213+
beforeBucket := readBucketFromFile(t, dumpBucketBeforeMoving)
214+
afterBucket := readBucketFromFile(t, dumpBucketAfterMoving)
215+
216+
if !bytes.Equal(beforeBucket, afterBucket) {
217+
t.Fatalf("bucket's content before moving is different than after moving")
218+
}
219+
220+
return nil
221+
}); err != nil {
222+
t.Fatal(err)
223+
}
224+
db.MustCheck()
225+
})
226+
}
227+
}
228+
229+
func openBuckets(t testing.TB, tx *bbolt.Tx, incompatibleKey bool, init bool, parent bool, paths ...string) *bbolt.Bucket {
230+
t.Helper()
231+
232+
var bk *bbolt.Bucket
233+
var err error
234+
235+
idx := len(paths) - 1
236+
for i, key := range paths {
237+
if len(key) == 0 {
238+
if !init {
239+
break
240+
}
241+
continue
242+
}
243+
if (incompatibleKey && i == idx) || (parent && i == idx) {
244+
continue
245+
}
246+
if bk == nil {
247+
bk, err = tx.CreateBucketIfNotExists([]byte(key))
248+
} else {
249+
bk, err = bk.CreateBucketIfNotExists([]byte(key))
250+
}
251+
if err != nil {
252+
t.Fatalf("error creating bucket %v: %v", key, err)
253+
}
254+
if init {
255+
insertRandKeysValuesBucket(t, bk, rand.Intn(4096))
256+
}
257+
}
258+
259+
return bk
260+
}
261+
262+
func readBucketFromFile(t testing.TB, tmpFile string) []byte {
263+
data, err := os.ReadFile(tmpFile)
264+
if err != nil {
265+
t.Fatalf("error reading temp file %v", tmpFile)
266+
}
267+
268+
return data
269+
}
270+
271+
func insertRandKeysValuesBucket(t testing.TB, bk *bbolt.Bucket, n int) {
272+
var min, max = 1, 1024
273+
274+
for i := 0; i < n; i++ {
275+
// generate rand key/value length
276+
keyLength := rand.Intn(max-min) + min
277+
valLength := rand.Intn(max-min) + min
278+
279+
keyData := make([]byte, keyLength)
280+
valData := make([]byte, valLength)
281+
282+
_, err := crand.Read(keyData)
283+
require.NoError(t, err)
284+
285+
_, err = crand.Read(valData)
286+
require.NoError(t, err)
287+
288+
err = bk.Put(keyData, valData)
289+
require.NoError(t, err)
290+
}
291+
}

utils_test.go

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package bbolt_test
2+
3+
import (
4+
bolt "go.etcd.io/bbolt"
5+
"go.etcd.io/bbolt/internal/common"
6+
)
7+
8+
// `dumpBucket` dumps all the data, including both key/value data
9+
// and child buckets, from the source bucket into the target db file.
10+
func dumpBucket(srcBucketName []byte, srcBucket *bolt.Bucket, dstFilename string) error {
11+
common.Assert(len(srcBucketName) != 0, "source bucket name can't be empty")
12+
common.Assert(srcBucket != nil, "the source bucket can't be nil")
13+
common.Assert(len(dstFilename) != 0, "the target file path can't be empty")
14+
15+
dstDB, err := bolt.Open(dstFilename, 0600, nil)
16+
if err != nil {
17+
return err
18+
}
19+
20+
return dstDB.Update(func(tx *bolt.Tx) error {
21+
dstBucket, err := tx.CreateBucket(srcBucketName)
22+
if err != nil {
23+
return err
24+
}
25+
return cloneBucket(srcBucket, dstBucket)
26+
})
27+
}
28+
29+
func cloneBucket(src *bolt.Bucket, dst *bolt.Bucket) error {
30+
return src.ForEach(func(k, v []byte) error {
31+
if v == nil {
32+
srcChild := src.Bucket(k)
33+
dstChild, err := dst.CreateBucket(k)
34+
if err != nil {
35+
return err
36+
}
37+
if err = dstChild.SetSequence(srcChild.Sequence()); err != nil {
38+
return err
39+
}
40+
41+
return cloneBucket(srcChild, dstChild)
42+
}
43+
44+
return dst.Put(k, v)
45+
})
46+
}

0 commit comments

Comments
 (0)