Skip to content

Commit 57ba37e

Browse files
committed
Add log into MoveBucket and clone the key
Signed-off-by: Benjamin Wang <benjamin.ahrtr@gmail.com>
1 parent 0bd26bc commit 57ba37e

File tree

1 file changed

+32
-16
lines changed

1 file changed

+32
-16
lines changed

bucket.go

+32-16
Original file line numberDiff line numberDiff line change
@@ -285,19 +285,21 @@ func (b *Bucket) DeleteBucket(key []byte) (err error) {
285285
return errors.ErrTxNotWritable
286286
}
287287

288+
newKey := cloneBytes(key)
289+
288290
// Move cursor to correct position.
289291
c := b.Cursor()
290-
k, _, flags := c.seek(key)
292+
k, _, flags := c.seek(newKey)
291293

292294
// Return an error if bucket doesn't exist or is not a bucket.
293-
if !bytes.Equal(key, k) {
295+
if !bytes.Equal(newKey, k) {
294296
return errors.ErrBucketNotFound
295297
} else if (flags & common.BucketLeafFlag) == 0 {
296298
return errors.ErrIncompatibleValue
297299
}
298300

299301
// Recursively delete all child buckets.
300-
child := b.Bucket(key)
302+
child := b.Bucket(newKey)
301303
err = child.ForEachBucket(func(k []byte) error {
302304
if err := child.DeleteBucket(k); err != nil {
303305
return fmt.Errorf("delete bucket: %s", err)
@@ -309,15 +311,15 @@ func (b *Bucket) DeleteBucket(key []byte) (err error) {
309311
}
310312

311313
// Remove cached copy.
312-
delete(b.buckets, string(key))
314+
delete(b.buckets, string(newKey))
313315

314316
// Release all bucket pages to freelist.
315317
child.nodes = nil
316318
child.rootNode = nil
317319
child.free()
318320

319321
// Delete the node if we have a matching key.
320-
c.node().del(key)
322+
c.node().del(newKey)
321323

322324
return nil
323325
}
@@ -328,48 +330,62 @@ func (b *Bucket) DeleteBucket(key []byte) (err error) {
328330
// 2. or the key already exists in the destination bucket;
329331
// 3. or the key represents a non-bucket value;
330332
// 4. the source and destination buckets are the same.
331-
func (b *Bucket) MoveBucket(key []byte, dstBucket *Bucket) error {
333+
func (b *Bucket) MoveBucket(key []byte, dstBucket *Bucket) (err error) {
334+
lg := b.tx.db.Logger()
335+
lg.Debugf("Moving bucket %q", string(key))
336+
defer func() {
337+
if err != nil {
338+
lg.Errorf("Moving bucket %q failed: %v", string(key), err)
339+
} else {
340+
lg.Debugf("Moving bucket %q successfully", string(key))
341+
}
342+
}()
343+
332344
if b.tx.db == nil || dstBucket.tx.db == nil {
333345
return errors.ErrTxClosed
334346
} else if !dstBucket.Writable() {
335347
return errors.ErrTxNotWritable
336348
}
337349

350+
newKey := cloneBytes(key)
351+
338352
// Move cursor to correct position.
339353
c := b.Cursor()
340-
k, v, flags := c.seek(key)
354+
k, v, flags := c.seek(newKey)
341355

342356
// Return an error if bucket doesn't exist or is not a bucket.
343-
if !bytes.Equal(key, k) {
357+
if !bytes.Equal(newKey, k) {
344358
return errors.ErrBucketNotFound
345359
} else if (flags & common.BucketLeafFlag) == 0 {
346-
return fmt.Errorf("key %q isn't a bucket in the source bucket: %w", key, errors.ErrIncompatibleValue)
360+
lg.Errorf("An incompatible key %s exists in the source bucket", string(newKey))
361+
return errors.ErrIncompatibleValue
347362
}
348363

349364
// Do nothing (return true directly) if the source bucket and the
350365
// destination bucket are actually the same bucket.
351366
if b == dstBucket || (b.RootPage() == dstBucket.RootPage() && b.RootPage() != 0) {
352-
return fmt.Errorf("source bucket %s and target bucket %s are the same: %w", b.String(), dstBucket.String(), errors.ErrSameBuckets)
367+
lg.Errorf("The source bucket (%s) and the target bucket (%s) are the same bucket", b.String(), dstBucket.String())
368+
return errors.ErrSameBuckets
353369
}
354370

355371
// check whether the key already exists in the destination bucket
356372
curDst := dstBucket.Cursor()
357-
k, _, flags = curDst.seek(key)
373+
k, _, flags = curDst.seek(newKey)
358374

359375
// Return an error if there is an existing key in the destination bucket.
360-
if bytes.Equal(key, k) {
376+
if bytes.Equal(newKey, k) {
361377
if (flags & common.BucketLeafFlag) != 0 {
362378
return errors.ErrBucketExists
363379
}
364-
return fmt.Errorf("key %q already exists in the target bucket: %w", key, errors.ErrIncompatibleValue)
380+
lg.Errorf("An incompatible key %s exists in the target bucket", string(newKey))
381+
return errors.ErrIncompatibleValue
365382
}
366383

367384
// remove the sub-bucket from the source bucket
368-
delete(b.buckets, string(key))
369-
c.node().del(key)
385+
delete(b.buckets, string(newKey))
386+
c.node().del(newKey)
370387

371388
// add te sub-bucket to the destination bucket
372-
newKey := cloneBytes(key)
373389
newValue := cloneBytes(v)
374390
curDst.node().put(newKey, newKey, newValue, 0, common.BucketLeafFlag)
375391

0 commit comments

Comments
 (0)