Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve KeyManager API import operation #968

Merged
merged 10 commits into from
Feb 16, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Bugs fixed
- Ensure that Web3Signer stops the http server when a sigterm is received
- Improve Key Manager API import operation to use parallel processing instead of serial processing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think this is more of an improvement than a bug but you might disagree :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will move it to Improvements section


## 24.1.1

Expand Down
18 changes: 8 additions & 10 deletions core/src/main/java/tech/pegasys/web3signer/core/Eth2Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,16 +199,14 @@ private void registerEth2Routes(

router
.route(HttpMethod.POST, KEYSTORES_PATH)
.handler(
new BlockingHandlerDecorator(
new ImportKeystoresHandler(
objectMapper,
baseConfig.getKeyConfigPath(),
slashingProtectionContext.map(
SlashingProtectionContext::getSlashingProtection),
blsSignerProvider,
validatorManager),
false))
.blockingHandler(
new ImportKeystoresHandler(
objectMapper,
baseConfig.getKeyConfigPath(),
slashingProtectionContext.map(SlashingProtectionContext::getSlashingProtection),
blsSignerProvider,
validatorManager),
false)
.failureHandler(errorHandler);

router
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2024 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.web3signer.core.service.http.handlers.keymanager.imports;

public record ImportKeystoreData(
int index,
String pubKey,
String keystoreJson,
String password,
ImportKeystoreResult importKeystoreResult)
implements Comparable<ImportKeystoreData> {

@Override
public int compareTo(ImportKeystoreData other) {
return Integer.compare(this.index, other.index);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;

public class ImportKeystoreResult {
private final ImportKeystoreStatus status;
private final String message;
private ImportKeystoreStatus status;
private String message;

@JsonCreator
public ImportKeystoreResult(
Expand All @@ -36,4 +36,12 @@ public ImportKeystoreStatus getStatus() {
public String getMessage() {
return message;
}

public void setStatus(final ImportKeystoreStatus status) {
this.status = status;
}

public void setMessage(final String message) {
this.message = message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE;
import static tech.pegasys.web3signer.core.service.http.handlers.ContentTypes.JSON_UTF_8;
import static tech.pegasys.web3signer.core.service.http.handlers.keymanager.imports.ImportKeystoreStatus.DUPLICATE;
import static tech.pegasys.web3signer.core.service.http.handlers.keymanager.imports.ImportKeystoreStatus.IMPORTED;
import static tech.pegasys.web3signer.signing.KeystoreFileManager.KEYSTORE_JSON_EXTENSION;
import static tech.pegasys.web3signer.signing.KeystoreFileManager.KEYSTORE_PASSWORD_EXTENSION;
import static tech.pegasys.web3signer.signing.KeystoreFileManager.METADATA_YAML_EXTENSION;
Expand All @@ -29,13 +31,12 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -78,110 +79,172 @@ public ImportKeystoresHandler(
public void handle(final RoutingContext context) {
// API spec - https://github.com/ethereum/keymanager-APIs/tree/master/flows#import
final ImportKeystoresRequestBody parsedBody;
// step 0: Parse and verify the request body
try {
parsedBody = parseRequestBody(context.body());
} catch (final IllegalArgumentException | JsonProcessingException e) {
handleInvalidRequest(context, e);
return;
}

// check that keystores have matching passwords
// step 1: verify if keystores/passwords list length is same
if (parsedBody.getKeystores().size() != parsedBody.getPasswords().size()) {
context.fail(BAD_REQUEST);
return;
}

// no keystores passed in, nothing to do, return 200 and empty response.
// step 2: no keystores passed in, nothing to do, return 200 and empty response.
if (parsedBody.getKeystores().isEmpty()) {
try {
context
.response()
.putHeader(CONTENT_TYPE, JSON_UTF_8)
.setStatusCode(SUCCESS)
.end(
objectMapper.writeValueAsString(
new ImportKeystoresResponse(Collections.emptyList())));
} catch (JsonProcessingException e) {
context.fail(SERVER_ERROR, e);
}
return;
}

// extract pubkeys to import first
final List<String> pubkeysToImport;
try {
pubkeysToImport =
parsedBody.getKeystores().stream()
.map(json -> new JsonObject(json).getString("pubkey"))
.map(IdentifierUtils::normaliseIdentifier)
.collect(Collectors.toList());
} catch (Exception e) {
context.fail(BAD_REQUEST, e);
context
.response()
.putHeader(CONTENT_TYPE, JSON_UTF_8)
.setStatusCode(SUCCESS)
.end("{\"data\": []}");
return;
}

// load existing keys
final Set<String> existingPubkeys =
// load "active" keys
final Set<String> activePubKeys =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clear to me what "active" means here and why that's significant for this code. Why is this better than "existingPubKeys"?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "active keys" term is used in the specs i.e https://github.com/ethereum/keymanager-APIs/tree/master/flows#import. For us, "active" means keys which are already loaded into memory before this API is called.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed it back to prior one.

artifactSignerProvider.availableIdentifiers().stream()
.map(IdentifierUtils::normaliseIdentifier)
.collect(Collectors.toSet());

// filter out already loaded keys for slashing data import
final List<String> nonLoadedPubkeys =
pubkeysToImport.stream()
.filter(key -> !existingPubkeys.contains(key))
.collect(Collectors.toList());
// map incoming keystores either as duplicate or to be imported
final List<ImportKeystoreData> importKeystoreDataList =
getKeystoreDataToProcess(parsedBody, activePubKeys);

// Step 3: import slashing protection data for all to-be-IMPORTED keys
final List<String> importedPubKeys = getToBeImportedPubKeys(importKeystoreDataList);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/importedPubKeys/pubKeysToBeImported ?


// read slashing protection data if present and import data matching non-loaded keys to import
// only
if (slashingProtection.isPresent()
&& !StringUtils.isEmpty(parsedBody.getSlashingProtection())) {
try {
final InputStream slashingProtectionData =
new ByteArrayInputStream(
parsedBody.getSlashingProtection().getBytes(StandardCharsets.UTF_8));
slashingProtection.get().importDataWithFilter(slashingProtectionData, nonLoadedPubkeys);
} catch (Exception e) {
slashingProtection.get().importDataWithFilter(slashingProtectionData, importedPubKeys);
} catch (final Exception e) {
// since we haven't written any keys to the file system, we don't need to clean up
context.fail(BAD_REQUEST, e);
return;
}
}

final List<ImportKeystoreResult> results = new ArrayList<>();
for (int i = 0; i < parsedBody.getKeystores().size(); i++) {
final String pubkey = pubkeysToImport.get(i);
try {
final String jsonKeystoreData = parsedBody.getKeystores().get(i);
final String password = parsedBody.getPasswords().get(i);

if (existingPubkeys.contains(pubkey)) {
// keystore already loaded
results.add(new ImportKeystoreResult(ImportKeystoreStatus.DUPLICATE, null));
} else {
validatorManager.addValidator(Bytes.fromHexString(pubkey), jsonKeystoreData, password);
results.add(new ImportKeystoreResult(ImportKeystoreStatus.IMPORTED, null));
}
} catch (final Exception e) {
// cleanup the current key being processed and continue
removeSignersAndCleanupImportedKeystoreFiles(List.of(pubkey));
results.add(
new ImportKeystoreResult(
ImportKeystoreStatus.ERROR, "Error importing keystore: " + e.getMessage()));
}
}
// must return status 200 from here onward ...

// step 4: add validators to be imported in parallel stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aren't we just importing here rather than "adding to be imported"?
Don't think we should mention the parallel stream implementation detail

importValidators(importKeystoreDataList);

// final step, send sorted results ...
try {
final List<ImportKeystoreResult> results = getKeystoreResults(importKeystoreDataList);
context
.response()
.putHeader(CONTENT_TYPE, JSON_UTF_8)
.setStatusCode(SUCCESS)
.end(objectMapper.writeValueAsString(new ImportKeystoresResponse(results)));
} catch (final Exception e) {
removeSignersAndCleanupImportedKeystoreFiles(nonLoadedPubkeys);
// critical bug, clean out imported keystores files ...
removeSignersAndCleanupImportedKeystoreFiles(importedPubKeys);
context.fail(SERVER_ERROR, e);
}
}

/**
* Import validators in parallel stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need javadoc for a private method? If parallel stream is important here maybe rename the method?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, good observation, lets me rename the method.

*
* @param importKeystoreDataList List of keystore data to import
*/
private void importValidators(final List<ImportKeystoreData> importKeystoreDataList) {
final List<ImportKeystoreData> toBeImported =
importKeystoreDataList.stream().filter(ImportKeystoresHandler::imported).toList();
toBeImported.parallelStream()
.forEach(
data -> {
try {
final Bytes pubKeyBytes = Bytes.fromHexString(data.pubKey());
validatorManager.addValidator(pubKeyBytes, data.keystoreJson(), data.password());
} catch (final Exception e) {
// modify the result to error status
data.importKeystoreResult().setStatus(ImportKeystoreStatus.ERROR);
data.importKeystoreResult()
.setMessage("Error importing keystore: " + e.getMessage());
}
});

// clean out failed validators
removeSignersAndCleanupImportedKeystoreFiles(getFailedValidators(importKeystoreDataList));
}

/**
* Get the results of the keystore import
*
* @param importKeystoreDataList Import Keystore Data
* @return Import Keystore Results in sorted order
*/
private static List<ImportKeystoreResult> getKeystoreResults(
final List<ImportKeystoreData> importKeystoreDataList) {
return importKeystoreDataList.stream()
.sorted()
.map(ImportKeystoreData::importKeystoreResult)
.toList();
}

private List<ImportKeystoreData> getKeystoreDataToProcess(
final ImportKeystoresRequestBody requestBody, final Set<String> activePubKeys) {
return IntStream.range(0, requestBody.getKeystores().size())
.mapToObj(
i -> {
final String jsonKeystoreData = requestBody.getKeystores().get(i);
final String password = requestBody.getPasswords().get(i);
final String pubkey;
try {
pubkey = parseAndNormalizePubKey(jsonKeystoreData);
} catch (final Exception e) {
final ImportKeystoreResult errorResult =
new ImportKeystoreResult(
ImportKeystoreStatus.ERROR, "Error parsing pubkey: " + e.getMessage());
return new ImportKeystoreData(i, null, null, null, errorResult);
}
if (activePubKeys.contains(pubkey)) {
return new ImportKeystoreData(
i, pubkey, null, null, new ImportKeystoreResult(DUPLICATE, null));
}

return new ImportKeystoreData(
i, pubkey, jsonKeystoreData, password, new ImportKeystoreResult(IMPORTED, null));
})
.toList();
}

private static List<String> getToBeImportedPubKeys(
List<ImportKeystoreData> importKeystoreDataList) {
return importKeystoreDataList.stream()
.filter(ImportKeystoresHandler::imported)
.map(ImportKeystoreData::pubKey)
.toList();
}

private static List<String> getFailedValidators(List<ImportKeystoreData> importKeystoreDataList) {
return importKeystoreDataList.stream()
.filter(ImportKeystoresHandler::failed)
.map(ImportKeystoreData::pubKey)
.toList();
}

private static boolean imported(ImportKeystoreData data) {
return data.importKeystoreResult().getStatus() == IMPORTED;
}

private static boolean failed(ImportKeystoreData data) {
return data.importKeystoreResult().getStatus() == ImportKeystoreStatus.ERROR
&& data.pubKey() != null;
}

private static String parseAndNormalizePubKey(final String json) {
return IdentifierUtils.normaliseIdentifier(new JsonObject(json).getString("pubkey"));
}

private ImportKeystoresRequestBody parseRequestBody(final RequestBody requestBody)
throws JsonProcessingException {
final String body = requestBody.asString();
Expand Down
Loading