-
Notifications
You must be signed in to change notification settings - Fork 82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: update neofs-sdk-go
#3725
*: update neofs-sdk-go
#3725
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,19 +3,17 @@ | |
import ( | ||
"context" | ||
"fmt" | ||
"slices" | ||
"strconv" | ||
"sync" | ||
"time" | ||
|
||
"github.com/nspcc-dev/neo-go/cli/cmdargs" | ||
"github.com/nspcc-dev/neo-go/cli/options" | ||
"github.com/nspcc-dev/neo-go/pkg/core/block" | ||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" | ||
"github.com/nspcc-dev/neo-go/pkg/io" | ||
"github.com/nspcc-dev/neo-go/pkg/rpcclient" | ||
"github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs" | ||
"github.com/nspcc-dev/neo-go/pkg/util" | ||
"github.com/nspcc-dev/neo-go/pkg/wallet" | ||
"github.com/nspcc-dev/neofs-sdk-go/client" | ||
"github.com/nspcc-dev/neofs-sdk-go/container" | ||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id" | ||
|
@@ -107,12 +105,12 @@ | |
return cli.Exit(fmt.Sprintf("failed to get current block height from RPC: %v", err), 1) | ||
} | ||
fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight) | ||
i, buf, err := searchIndexFile(ctx, pWrapper, containerID, acc, signer, indexFileSize, attr, indexAttrKey, maxParallelSearches, maxRetries, debug) | ||
i, buf, err := searchIndexFile(ctx, pWrapper, containerID, acc.PrivateKey(), signer, indexFileSize, attr, indexAttrKey, maxParallelSearches, maxRetries, debug) | ||
if err != nil { | ||
return cli.Exit(fmt.Errorf("failed to find objects: %w", err), 1) | ||
} | ||
|
||
err = uploadBlocksAndIndexFiles(ctx, pWrapper, rpc, signer, containerID, acc, attr, indexAttrKey, buf, i, indexFileSize, uint(currentBlockHeight), numWorkers, maxRetries, debug) | ||
err = uploadBlocksAndIndexFiles(ctx, pWrapper, rpc, signer, containerID, user.NewFromScriptHash(acc.ScriptHash()), attr, indexAttrKey, buf, i, indexFileSize, uint(currentBlockHeight), numWorkers, maxRetries, debug) | ||
if err != nil { | ||
return cli.Exit(fmt.Errorf("failed to upload objects: %w", err), 1) | ||
} | ||
|
@@ -140,7 +138,7 @@ | |
} | ||
|
||
// uploadBlocksAndIndexFiles uploads the blocks and index files to the container using the pool. | ||
func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr, indexAttributeKey string, buf []byte, currentIndexFileID, indexFileSize, currentBlockHeight uint, numWorkers, maxRetries uint, debug bool) error { | ||
func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, ownerID user.ID, attr, indexAttributeKey string, buf []byte, currentIndexFileID, indexFileSize, currentBlockHeight uint, numWorkers, maxRetries uint, debug bool) error { | ||
if currentIndexFileID*indexFileSize >= currentBlockHeight { | ||
fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", currentIndexFileID*indexFileSize, currentBlockHeight) | ||
return nil | ||
|
@@ -152,15 +150,14 @@ | |
errCh = make(chan error) | ||
doneCh = make(chan struct{}) | ||
wg sync.WaitGroup | ||
emptyOID = make([]byte, neofs.OIDSize) | ||
) | ||
fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", indexFileStart, indexFileEnd-1) | ||
wg.Add(int(numWorkers)) | ||
for i := range numWorkers { | ||
go func(i uint) { | ||
defer wg.Done() | ||
for blockIndex := indexFileStart + i; blockIndex < indexFileEnd; blockIndex += numWorkers { | ||
if slices.Compare(buf[blockIndex%indexFileSize*neofs.OIDSize:blockIndex%indexFileSize*neofs.OIDSize+neofs.OIDSize], emptyOID) != 0 { | ||
if !oid.ID(buf[blockIndex%indexFileSize*oid.Size : blockIndex%indexFileSize*oid.Size+oid.Size]).IsZero() { | ||
if debug { | ||
fmt.Fprintf(ctx.App.Writer, "Block %d is already uploaded\n", blockIndex) | ||
} | ||
|
@@ -207,7 +204,7 @@ | |
) | ||
errRetr := retry(func() error { | ||
var errUpload error | ||
resOid, errUpload = uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs) | ||
resOid, errUpload = uploadObj(ctx.Context, p, signer, ownerID, containerID, objBytes, attrs) | ||
if errUpload != nil { | ||
return errUpload | ||
} | ||
|
@@ -223,7 +220,7 @@ | |
} | ||
return | ||
} | ||
resOid.Encode(buf[blockIndex%indexFileSize*neofs.OIDSize:]) | ||
copy(buf[blockIndex%indexFileSize*oid.Size:], resOid[:]) | ||
} | ||
}(i) | ||
} | ||
|
@@ -241,9 +238,9 @@ | |
fmt.Fprintf(ctx.App.Writer, "Successfully processed batch of blocks: from %d to %d\n", indexFileStart, indexFileEnd-1) | ||
|
||
// Additional check for empty OIDs in the buffer. | ||
for k := uint(0); k < (indexFileEnd-indexFileStart)*neofs.OIDSize; k += neofs.OIDSize { | ||
if slices.Compare(buf[k:k+neofs.OIDSize], emptyOID) == 0 { | ||
return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", indexFileStart/indexFileSize, k/neofs.OIDSize, indexFileStart/indexFileSize*indexFileSize+k/neofs.OIDSize) | ||
for k := uint(0); k < (indexFileEnd-indexFileStart)*oid.Size; k += oid.Size { | ||
if oid.ID(buf[k : k+oid.Size]).IsZero() { | ||
return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", indexFileStart/indexFileSize, k/oid.Size, indexFileStart/indexFileSize*indexFileSize+k/oid.Size) | ||
} | ||
} | ||
if indexFileEnd-indexFileStart == indexFileSize { | ||
|
@@ -254,7 +251,7 @@ | |
} | ||
err := retry(func() error { | ||
var errUpload error | ||
_, errUpload = uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, buf, attrs) | ||
_, errUpload = uploadObj(ctx.Context, p, signer, ownerID, containerID, buf, attrs) | ||
return errUpload | ||
}, maxRetries, debug) | ||
if err != nil { | ||
|
@@ -268,10 +265,10 @@ | |
} | ||
|
||
// searchIndexFile returns the ID and buffer for the next index file to be uploaded. | ||
func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint, debug bool) (uint, []byte, error) { | ||
func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, privKeys *keys.PrivateKey, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint, debug bool) (uint, []byte, error) { | ||
var ( | ||
// buf is used to store OIDs of the uploaded blocks. | ||
buf = make([]byte, indexFileSize*neofs.OIDSize) | ||
buf = make([]byte, indexFileSize*oid.Size) | ||
doneCh = make(chan struct{}) | ||
errCh = make(chan error) | ||
|
||
|
@@ -283,7 +280,7 @@ | |
// Search for existing index files. | ||
filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual) | ||
for i := 0; ; i++ { | ||
indexIDs := searchObjects(ctx.Context, p, containerID, account, attributeKey, uint(i), uint(i+1), 1, maxRetries, debug, errCh, filters) | ||
indexIDs := searchObjects(ctx.Context, p, containerID, privKeys, attributeKey, uint(i), uint(i+1), 1, maxRetries, debug, errCh, filters) | ||
count := 0 | ||
for range indexIDs { | ||
count++ | ||
|
@@ -338,14 +335,14 @@ | |
} | ||
pos := uint(blockIndex) % indexFileSize | ||
if _, ok := processedIndices.LoadOrStore(pos, blockIndex); !ok { | ||
id.Encode(buf[pos*neofs.OIDSize:]) | ||
copy(buf[pos*oid.Size:], id[:]) | ||
} | ||
} | ||
}() | ||
} | ||
|
||
// Search for blocks within the index file range. | ||
objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, existingIndexCount*indexFileSize, (existingIndexCount+1)*indexFileSize, maxParallelSearches, maxRetries, debug, errCh) | ||
objIDs := searchObjects(ctx.Context, p, containerID, privKeys, blockAttributeKey, existingIndexCount*indexFileSize, (existingIndexCount+1)*indexFileSize, maxParallelSearches, maxRetries, debug, errCh) | ||
for id := range objIDs { | ||
oidCh <- id | ||
} | ||
|
@@ -364,7 +361,7 @@ | |
// searchObjects searches in parallel for objects with attribute GE startIndex and LT | ||
// endIndex. It returns a buffered channel of resulting object IDs and closes it once | ||
// OID search is finished. Errors are sent to errCh in a non-blocking way. | ||
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, debug bool, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID { | ||
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, privKeys *keys.PrivateKey, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, debug bool, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID { | ||
var res = make(chan oid.ID, 2*neofs.DefaultSearchBatchSize) | ||
go func() { | ||
var wg sync.WaitGroup | ||
|
@@ -402,7 +399,7 @@ | |
var objIDs []oid.ID | ||
err := retry(func() error { | ||
var errBlockSearch error | ||
objIDs, errBlockSearch = neofs.ObjectSearch(ctx, p, account.PrivateKey(), containerID.String(), prm) | ||
objIDs, errBlockSearch = neofs.ObjectSearch(ctx, p, privKeys, containerID.String(), prm) | ||
return errBlockSearch | ||
}, maxRetries, debug) | ||
if err != nil { | ||
|
@@ -426,18 +423,16 @@ | |
} | ||
|
||
// uploadObj uploads object to the container using provided settings. | ||
func uploadObj(ctx context.Context, p poolWrapper, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute) (oid.ID, error) { | ||
func uploadObj(ctx context.Context, p poolWrapper, signer user.Signer, ownerID user.ID, containerID cid.ID, objData []byte, attrs []object.Attribute) (oid.ID, error) { | ||
var ( | ||
ownerID user.ID | ||
hdr object.Object | ||
prmObjectPutInit client.PrmObjectPutInit | ||
resOID = oid.ID{} | ||
) | ||
|
||
ownerID.SetScriptHash(owner) | ||
hdr.SetPayload(objData) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no-op There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What exactly is a no-op here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. payload is unprocessed for |
||
hdr.SetContainerID(containerID) | ||
hdr.SetOwnerID(&ownerID) | ||
hdr.SetOwner(ownerID) | ||
hdr.SetAttributes(attrs...) | ||
|
||
writer, err := p.ObjectPutInit(ctx, hdr, signer, prmObjectPutInit) | ||
|
@@ -455,7 +450,7 @@ | |
} | ||
res := writer.GetResult() | ||
resOID = res.StoredObjectID() | ||
if resOID.Equals(oid.ID{}) { | ||
if resOID.IsZero() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is done by |
||
return resOID, fmt.Errorf("object ID is empty") | ||
} | ||
return resOID, nil | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
signer
carries ID so u may dropownerID
prmThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retry 0: failed to close object writer: status: code = 1024 message = missing object owner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if I removing
hdr.SetOwner(ownerID)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hdr.SetOwner(signer.UserID())