Skip to content

Commit 8e38c33

Browse files
committed
add MoveSubBucket test
Signed-off-by: Mustafa Elbehery <melbeher@redhat.com>
1 parent 3179540 commit 8e38c33

File tree

4 files changed

+346
-3
lines changed

4 files changed

+346
-3
lines changed

bucket.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,8 @@ func (b *Bucket) DeleteBucket(key []byte) error {
297297
// Returns an error if
298298
// 1. the sub-bucket cannot be found in the source bucket;
299299
// 2. or the key already exists in the destination bucket;
300-
// 3. the key represents a non-bucket value.
300+
// 3. or the key represents a non-bucket value;
301+
// 4. the source and destination buckets are the same.
301302
func (b *Bucket) MoveBucket(key []byte, dstBucket *Bucket) error {
302303
if b.tx.db == nil || dstBucket.tx.db == nil {
303304
return errors.ErrTxClosed
@@ -319,7 +320,7 @@ func (b *Bucket) MoveBucket(key []byte, dstBucket *Bucket) error {
319320
// Do nothing (return true directly) if the source bucket and the
320321
// destination bucket are actually the same bucket.
321322
if b == dstBucket || (b.RootPage() == dstBucket.RootPage() && b.RootPage() != 0) {
322-
return nil
323+
return fmt.Errorf("source bucket %s and target bucket %s are the same: %w", b.String(), dstBucket.String(), errors.ErrSameBuckets)
323324
}
324325

325326
// check whether the key already exists in the destination bucket

errors/errors.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,12 @@ var (
6969
// ErrValueTooLarge is returned when inserting a value that is larger than MaxValueSize.
7070
ErrValueTooLarge = errors.New("value too large")
7171

72-
// ErrIncompatibleValue is returned when trying create or delete a bucket
72+
// ErrIncompatibleValue is returned when trying to create or delete a bucket
7373
// on an existing non-bucket key or when trying to create or delete a
7474
// non-bucket key on an existing bucket key.
7575
ErrIncompatibleValue = errors.New("incompatible value")
76+
77+
// ErrSameBuckets is returned when trying to move a sub-bucket between
78+
// source and target buckets, while source and target buckets are the same.
79+
ErrSameBuckets = errors.New("the source and target are the same bucket")
7680
)

movebucket_test.go

+292
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
package bbolt_test
2+
3+
import (
4+
"bytes"
5+
crand "crypto/rand"
6+
"math/rand"
7+
"os"
8+
"path/filepath"
9+
"testing"
10+
11+
"go.etcd.io/bbolt"
12+
"go.etcd.io/bbolt/errors"
13+
"go.etcd.io/bbolt/internal/btesting"
14+
15+
"github.com/stretchr/testify/require"
16+
)
17+
18+
func TestTx_MoveBucket(t *testing.T) {
19+
testCases := []struct {
20+
name string
21+
srcBucketPath []string
22+
dstBucketPath []string
23+
bucketToMove string
24+
incompatibleKeyInSrc bool
25+
incompatibleKeyInDst bool
26+
parentSrc bool
27+
parentDst bool
28+
expActErr error
29+
}{
30+
{
31+
name: "happy path",
32+
srcBucketPath: []string{"sb1", "sb2", "sb3ToMove"},
33+
dstBucketPath: []string{"db1", "db2"},
34+
bucketToMove: "sb3ToMove",
35+
incompatibleKeyInSrc: false,
36+
incompatibleKeyInDst: false,
37+
parentSrc: true,
38+
parentDst: false,
39+
expActErr: nil,
40+
},
41+
{
42+
name: "bucketToMove not exist in srcBucket",
43+
srcBucketPath: []string{"sb1", "sb2"},
44+
dstBucketPath: []string{"db1", "db2"},
45+
bucketToMove: "sb3ToMove",
46+
incompatibleKeyInSrc: false,
47+
incompatibleKeyInDst: false,
48+
parentSrc: false,
49+
parentDst: false,
50+
expActErr: errors.ErrBucketNotFound,
51+
},
52+
{
53+
name: "bucketToMove exist in dstBucket",
54+
srcBucketPath: []string{"sb1", "sb2", "sb3ToMove"},
55+
dstBucketPath: []string{"db1", "db2", "sb3ToMove"},
56+
bucketToMove: "sb3ToMove",
57+
incompatibleKeyInSrc: false,
58+
incompatibleKeyInDst: false,
59+
parentSrc: true,
60+
parentDst: true,
61+
expActErr: errors.ErrBucketExists,
62+
},
63+
{
64+
name: "bucketToMove key exist in srcBucket but no subBucket value",
65+
srcBucketPath: []string{"sb1", "sb2"},
66+
dstBucketPath: []string{"db1", "db2"},
67+
bucketToMove: "sb3ToMove",
68+
incompatibleKeyInSrc: true,
69+
incompatibleKeyInDst: false,
70+
parentSrc: true,
71+
parentDst: false,
72+
expActErr: errors.ErrIncompatibleValue,
73+
},
74+
{
75+
name: "bucketToMove key exist in dstBucket but no subBucket value",
76+
srcBucketPath: []string{"sb1", "sb2", "sb3ToMove"},
77+
dstBucketPath: []string{"db1", "db2"},
78+
bucketToMove: "sb3ToMove",
79+
incompatibleKeyInSrc: false,
80+
incompatibleKeyInDst: true,
81+
parentSrc: true,
82+
parentDst: true,
83+
expActErr: errors.ErrIncompatibleValue,
84+
},
85+
{
86+
name: "srcBucket is rootBucket",
87+
srcBucketPath: []string{"", "sb3ToMove"},
88+
dstBucketPath: []string{"db1", "db2"},
89+
bucketToMove: "sb3ToMove",
90+
incompatibleKeyInSrc: false,
91+
incompatibleKeyInDst: false,
92+
parentSrc: true,
93+
parentDst: false,
94+
expActErr: nil,
95+
},
96+
{
97+
name: "dstBucket is rootBucket",
98+
srcBucketPath: []string{"sb1", "sb2", "sb3ToMove"},
99+
dstBucketPath: []string{""},
100+
bucketToMove: "sb3ToMove",
101+
incompatibleKeyInSrc: false,
102+
incompatibleKeyInDst: false,
103+
parentSrc: true,
104+
parentDst: false,
105+
expActErr: nil,
106+
},
107+
{
108+
name: "srcBucket is rootBucket and dstBucket is rootBucket",
109+
srcBucketPath: []string{"", "sb3ToMove"},
110+
dstBucketPath: []string{""},
111+
bucketToMove: "sb3ToMove",
112+
incompatibleKeyInSrc: false,
113+
incompatibleKeyInDst: false,
114+
parentSrc: false,
115+
parentDst: false,
116+
expActErr: errors.ErrSameBuckets,
117+
},
118+
}
119+
120+
for _, tc := range testCases {
121+
122+
t.Run(tc.name, func(*testing.T) {
123+
db := btesting.MustCreateDBWithOption(t, &bbolt.Options{PageSize: pageSize})
124+
125+
dumpBucketBeforeMoving := filepath.Join(t.TempDir(), "beforeBucket.db")
126+
dumpBucketAfterMoving := filepath.Join(t.TempDir(), "afterBucket.db")
127+
128+
// arrange
129+
if err := db.Update(func(tx *bbolt.Tx) error {
130+
srcBucket := openBuckets(t, tx, tc.incompatibleKeyInSrc, true, false, tc.srcBucketPath...)
131+
dstBucket := openBuckets(t, tx, tc.incompatibleKeyInDst, true, false, tc.dstBucketPath...)
132+
133+
if tc.incompatibleKeyInSrc {
134+
if pErr := srcBucket.Put([]byte(tc.bucketToMove), []byte("0")); pErr != nil {
135+
t.Fatalf("error inserting key %v, and value %v in bucket %v: %v", tc.bucketToMove, "0", srcBucket, pErr)
136+
}
137+
}
138+
139+
if tc.incompatibleKeyInDst {
140+
if pErr := dstBucket.Put([]byte(tc.bucketToMove), []byte("0")); pErr != nil {
141+
t.Fatalf("error inserting key %v, and value %v in bucket %v: %v", tc.bucketToMove, "0", dstBucket, pErr)
142+
}
143+
}
144+
145+
return nil
146+
}); err != nil {
147+
t.Fatal(err)
148+
}
149+
db.MustCheck()
150+
151+
// act
152+
if err := db.Update(func(tx *bbolt.Tx) error {
153+
srcBucket := openBuckets(t, tx, false, false, tc.parentSrc, tc.srcBucketPath...)
154+
dstBucket := openBuckets(t, tx, false, false, tc.parentDst, tc.dstBucketPath...)
155+
156+
var bucketToMove *bbolt.Bucket
157+
if srcBucket != nil {
158+
bucketToMove = srcBucket.Bucket([]byte(tc.bucketToMove))
159+
} else {
160+
bucketToMove = tx.Bucket([]byte(tc.bucketToMove))
161+
}
162+
163+
if tc.expActErr == nil && bucketToMove != nil {
164+
if wErr := dumpBucket([]byte(tc.bucketToMove), bucketToMove, dumpBucketBeforeMoving); wErr != nil {
165+
t.Fatalf("error dumping bucket %v to file %v: %v", bucketToMove.String(), dumpBucketBeforeMoving, wErr)
166+
}
167+
}
168+
169+
mErr := tx.MoveBucket([]byte(tc.bucketToMove), srcBucket, dstBucket)
170+
require.ErrorIs(t, mErr, tc.expActErr)
171+
172+
return nil
173+
}); err != nil {
174+
t.Fatal(err)
175+
}
176+
db.MustCheck()
177+
178+
// skip assertion if failure expected
179+
if tc.expActErr != nil {
180+
return
181+
}
182+
183+
// assert
184+
if err := db.Update(func(tx *bbolt.Tx) error {
185+
var movedBucket *bbolt.Bucket
186+
srcBucket := openBuckets(t, tx, false, false, tc.parentSrc, tc.srcBucketPath...)
187+
188+
if srcBucket != nil {
189+
if movedBucket = srcBucket.Bucket([]byte(tc.bucketToMove)); movedBucket != nil {
190+
t.Fatalf("expected childBucket %v to be moved from srcBucket %v", tc.bucketToMove, srcBucket)
191+
}
192+
} else {
193+
if movedBucket = tx.Bucket([]byte(tc.bucketToMove)); movedBucket != nil {
194+
t.Fatalf("expected childBucket %v to be moved from root bucket %v", tc.bucketToMove, "root bucket")
195+
}
196+
}
197+
198+
dstBucket := openBuckets(t, tx, false, false, tc.parentDst, tc.dstBucketPath...)
199+
if dstBucket != nil {
200+
if movedBucket = dstBucket.Bucket([]byte(tc.bucketToMove)); movedBucket == nil {
201+
t.Fatalf("expected childBucket %v to be child of dstBucket %v", tc.bucketToMove, dstBucket)
202+
}
203+
} else {
204+
if movedBucket = tx.Bucket([]byte(tc.bucketToMove)); movedBucket == nil {
205+
t.Fatalf("expected childBucket %v to be child of dstBucket %v", tc.bucketToMove, "root bucket")
206+
}
207+
}
208+
209+
wErr := dumpBucket([]byte(tc.bucketToMove), movedBucket, dumpBucketAfterMoving)
210+
if wErr != nil {
211+
t.Fatalf("error dumping bucket %v to file %v", movedBucket.String(), dumpBucketAfterMoving)
212+
}
213+
214+
beforeBucket := readBucketFromFile(t, dumpBucketBeforeMoving)
215+
afterBucket := readBucketFromFile(t, dumpBucketAfterMoving)
216+
217+
if !bytes.Equal(beforeBucket, afterBucket) {
218+
t.Fatalf("bucket's content before moving is different than after moving")
219+
}
220+
221+
return nil
222+
}); err != nil {
223+
t.Fatal(err)
224+
}
225+
db.MustCheck()
226+
})
227+
}
228+
}
229+
230+
func openBuckets(t testing.TB, tx *bbolt.Tx, incompatibleKey bool, init bool, parent bool, paths ...string) *bbolt.Bucket {
231+
t.Helper()
232+
233+
var bk *bbolt.Bucket
234+
var err error
235+
236+
idx := len(paths) - 1
237+
for i, key := range paths {
238+
if len(key) == 0 {
239+
if !init {
240+
break
241+
}
242+
continue
243+
}
244+
if (incompatibleKey && i == idx) || (parent && i == idx) {
245+
continue
246+
}
247+
if bk == nil {
248+
bk, err = tx.CreateBucketIfNotExists([]byte(key))
249+
} else {
250+
bk, err = bk.CreateBucketIfNotExists([]byte(key))
251+
}
252+
if err != nil {
253+
t.Fatalf("error creating bucket %v: %v", key, err)
254+
}
255+
if init {
256+
insertRandKeysValuesBucket(t, bk, rand.Intn(4096))
257+
}
258+
}
259+
260+
return bk
261+
}
262+
263+
func readBucketFromFile(t testing.TB, tmpFile string) []byte {
264+
data, err := os.ReadFile(tmpFile)
265+
if err != nil {
266+
t.Fatalf("error reading temp file %v", tmpFile)
267+
}
268+
269+
return data
270+
}
271+
272+
func insertRandKeysValuesBucket(t testing.TB, bk *bbolt.Bucket, n int) {
273+
var min, max = 1, 1024
274+
275+
for i := 0; i < n; i++ {
276+
// generate rand key/value length
277+
keyLength := rand.Intn(max-min) + min
278+
valLength := rand.Intn(max-min) + min
279+
280+
keyData := make([]byte, keyLength)
281+
valData := make([]byte, valLength)
282+
283+
_, err := crand.Read(keyData)
284+
require.NoError(t, err)
285+
286+
_, err = crand.Read(valData)
287+
require.NoError(t, err)
288+
289+
err = bk.Put(keyData, valData)
290+
require.NoError(t, err)
291+
}
292+
}

utils_test.go

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package bbolt_test
2+
3+
import (
4+
bolt "go.etcd.io/bbolt"
5+
"go.etcd.io/bbolt/internal/common"
6+
)
7+
8+
// `dumpBucket` dumps all the data, including both key/value data
9+
// and child buckets, from the source bucket into the target db file.
10+
func dumpBucket(srcBucketName []byte, srcBucket *bolt.Bucket, dstFilename string) error {
11+
common.Assert(len(srcBucketName) != 0, "source bucket name can't be empty")
12+
common.Assert(srcBucket != nil, "the source bucket can't be nil")
13+
common.Assert(len(dstFilename) != 0, "the target file path can't be empty")
14+
15+
dstDB, err := bolt.Open(dstFilename, 0600, nil)
16+
if err != nil {
17+
return err
18+
}
19+
20+
return dstDB.Update(func(tx *bolt.Tx) error {
21+
dstBucket, err := tx.CreateBucket(srcBucketName)
22+
if err != nil {
23+
return err
24+
}
25+
return cloneBucket(srcBucket, dstBucket)
26+
})
27+
}
28+
29+
func cloneBucket(src *bolt.Bucket, dst *bolt.Bucket) error {
30+
return src.ForEach(func(k, v []byte) error {
31+
if v == nil {
32+
srcChild := src.Bucket(k)
33+
dstChild, err := dst.CreateBucket(k)
34+
if err != nil {
35+
return err
36+
}
37+
if err = dstChild.SetSequence(srcChild.Sequence()); err != nil {
38+
return err
39+
}
40+
41+
return cloneBucket(srcChild, dstChild)
42+
}
43+
44+
return dst.Put(k, v)
45+
})
46+
}

0 commit comments

Comments
 (0)