Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KCL 1.X Fix for ShardEnd corruption and preventing lease table interference in multi-app JVM #776

Merged
merged 2 commits into from
Jan 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.amazonaws.services.kinesis.model.Shard;

import static com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease.fromHashKeyRange;

/**
* Helper class to sync leases with shards of the Kinesis stream.
* It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding).
Expand Down Expand Up @@ -617,7 +619,7 @@ static KinesisClientLease newKCLLease(Shard shard) {
}
newLease.setParentShardIds(parentShardIds);
newLease.setOwnerSwitchesSinceCheckpoint(0L);

newLease.setHashKeyRange(fromHashKeyRange(shard.getHashKeyRange()));
return newLease;
}

Expand All @@ -641,6 +643,7 @@ static KinesisClientLease newKCLLeaseForChildShard(ChildShard childShard) throws
newLease.setParentShardIds(parentShardIds);
newLease.setOwnerSwitchesSinceCheckpoint(0L);
newLease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
newLease.setHashKeyRange(fromHashKeyRange(childShard.getHashKeyRange()));
return newLease;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher
this(shardInfo, streamConfig, checkpoint, recordProcessor, recordProcessorCheckpointer, leaseCoordinator,
parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory,
backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, kinesisDataFetcher, retryGetRecordsInSeconds,
maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, LeaseCleanupManager.createOrGetInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, LeaseCleanupManager.newInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
Executors.newSingleThreadScheduledExecutor(), metricsFactory, config.shouldCleanupLeasesUponShardCompletion(),
config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(),
config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ public class Worker implements Runnable {
// Default configs for periodic shard sync
private static final int SHARD_SYNC_SLEEP_FOR_PERIODIC_SHARD_SYNC = 0;
private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; //Default for KCL.
static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L;
static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L;
static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
static long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L;
static long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L;
static long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
Comment on lines -99 to +101
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we marking these as non-final?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm, I see it's for unit test. Ideally we would be using builder pattern to avoid lengthy constructors, how bad is the build time if we don't override these values in tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should be refactoring the long arg constructors. temporarily giving this package level access. this saves more than 3 minutes of build time.


private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();
private static final LeaseCleanupValidator DEFAULT_LEASE_CLEANUP_VALIDATOR = new KinesisLeaseCleanupValidator();
Expand Down Expand Up @@ -576,7 +576,7 @@ maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(lease
this.workerStateChangeListener = workerStateChangeListener;
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
createShardSyncStrategy(config.getShardSyncStrategyType(), leaderDecider, periodicShardSyncManager);
this.leaseCleanupManager = LeaseCleanupManager.createOrGetInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
this.leaseCleanupManager = LeaseCleanupManager.newInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
Executors.newSingleThreadScheduledExecutor(), metricsFactory, cleanupLeasesUponShardCompletion,
config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(),
config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public Map<String, AttributeValue> toDynamoRecord(KinesisClientLease lease) {
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, DynamoUtils.createAttributeValue(lease.getPendingCheckpoint().getSubSequenceNumber()));
}

if(lease.getHashKeyRange() != null) {
result.put(STARTING_HASH_KEY, DynamoUtils.createAttributeValue(lease.getHashKeyRange().serializedStartingHashKey()));
result.put(ENDING_HASH_KEY, DynamoUtils.createAttributeValue(lease.getHashKeyRange().serializedEndingHashKey()));
}

return result;
}

Expand All @@ -92,6 +97,12 @@ public KinesisClientLease fromDynamoRecord(Map<String, AttributeValue> dynamoRec
);
}

final String startingHashKey, endingHashKey;
if (!Strings.isNullOrEmpty(startingHashKey = DynamoUtils.safeGetString(dynamoRecord, STARTING_HASH_KEY))
&& !Strings.isNullOrEmpty(endingHashKey = DynamoUtils.safeGetString(dynamoRecord, ENDING_HASH_KEY))) {
result.setHashKeyRange(HashKeyRangeForLease.deserialize(startingHashKey, endingHashKey));
}

return result;
}

Expand Down Expand Up @@ -163,6 +174,11 @@ public Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(KinesisClien
result.put(CHILD_SHARD_IDS_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getChildShardIds()), AttributeAction.PUT));
}

if(lease.getHashKeyRange() != null) {
result.put(STARTING_HASH_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getHashKeyRange().serializedStartingHashKey()), AttributeAction.PUT));
result.put(ENDING_HASH_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getHashKeyRange().serializedEndingHashKey()), AttributeAction.PUT));
}

if (lease.getPendingCheckpoint() != null && !lease.getPendingCheckpoint().getSequenceNumber().isEmpty()) {
result.put(PENDING_CHECKPOINT_SEQUENCE_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getPendingCheckpoint().getSequenceNumber()), AttributeAction.PUT));
result.put(PENDING_CHECKPOINT_SUBSEQUENCE_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(lease.getPendingCheckpoint().getSubSequenceNumber()), AttributeAction.PUT));
Expand All @@ -181,7 +197,10 @@ public Map<String, AttributeValueUpdate> getDynamoUpdateLeaseUpdate(KinesisClien

switch (updateField) {
case CHILD_SHARDS:
// TODO: Implement update fields for child shards
if (!CollectionUtils.isNullOrEmpty(lease.getChildShardIds())) {
result.put(CHILD_SHARD_IDS_KEY, new AttributeValueUpdate(DynamoUtils.createAttributeValue(
lease.getChildShardIds()), AttributeAction.PUT));
}
break;
case HASH_KEY_RANGE:
if (lease.getHashKeyRange() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
Expand Down Expand Up @@ -81,10 +82,8 @@ public class LeaseCleanupManager {
@Getter
private volatile boolean isRunning = false;

private static LeaseCleanupManager instance;

/**
* Factory method to return a singleton instance of {@link LeaseCleanupManager}.
* Method to return a new instance of {@link LeaseCleanupManager}.
* @param kinesisProxy
* @param leaseManager
* @param deletionThreadPool
Expand All @@ -96,17 +95,13 @@ public class LeaseCleanupManager {
* @param maxRecords
* @return
*/
public static LeaseCleanupManager createOrGetInstance(IKinesisProxy kinesisProxy, ILeaseManager leaseManager,
ScheduledExecutorService deletionThreadPool, IMetricsFactory metricsFactory,
boolean cleanupLeasesUponShardCompletion, long leaseCleanupIntervalMillis,
long completedLeaseCleanupIntervalMillis, long garbageLeaseCleanupIntervalMillis,
int maxRecords) {
if (instance == null) {
instance = new LeaseCleanupManager(kinesisProxy, leaseManager, deletionThreadPool, metricsFactory, cleanupLeasesUponShardCompletion,
leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords);
}

return instance;
public static LeaseCleanupManager newInstance(IKinesisProxy kinesisProxy, ILeaseManager leaseManager,
ScheduledExecutorService deletionThreadPool, IMetricsFactory metricsFactory,
boolean cleanupLeasesUponShardCompletion, long leaseCleanupIntervalMillis,
long completedLeaseCleanupIntervalMillis, long garbageLeaseCleanupIntervalMillis,
int maxRecords) {
return new LeaseCleanupManager(kinesisProxy, leaseManager, deletionThreadPool, metricsFactory, cleanupLeasesUponShardCompletion,
leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords);
}

/**
Expand Down Expand Up @@ -181,6 +176,7 @@ public LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion
boolean alreadyCheckedForGarbageCollection = false;
boolean wereChildShardsPresent = false;
boolean wasResourceNotFound = false;
String cleanupFailureReason = "";

try {
if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) {
Expand All @@ -189,62 +185,72 @@ public LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion
Set<String> childShardKeys = leaseFromDDB.getChildShardIds();
if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
try {
// throws ResourceNotFoundException
childShardKeys = getChildShardsFromService(shardInfo);

if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
LOG.error("No child shards returned from service for shard " + shardInfo.getShardId());
// If no children shard is found in DDB and from service, then do not delete the lease
Copy link
Contributor

@joshua-kim joshua-kim Jan 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this adds additional safety since this is just retried either way on the next deletion run, are we adding this for unit testing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will prevent the the lease deletion and log error as this closed and valid shard could not retrieve children info from anywhere. This is a final guard to protect against bad server response.

throw new InvalidStateException("No child shards found for this supposedly " +
"closed shard in both local DDB and in service " + shardInfo.getShardId());
} else {
wereChildShardsPresent = true;
updateLeaseWithChildShards(leasePendingDeletion, childShardKeys);
}
} catch (ResourceNotFoundException e) {
throw e;
} finally {
// We rely on resource presence in service for garbage collection. Since we already
// made a call to getChildShardsFromService we would be coming to know if the resource
// is present of not. In latter case, we would throw ResourceNotFoundException, which is
// handled in catch block.
alreadyCheckedForGarbageCollection = true;
}
} else {
wereChildShardsPresent = true;
}
try {
cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, childShardKeys);
final CompletedShardResult completedShardResult = cleanupLeaseForCompletedShard(lease, childShardKeys);
cleanedUpCompletedLease = completedShardResult.cleanedUp();
cleanupFailureReason = completedShardResult.failureMsg();
} catch (Exception e) {
// Suppressing the exception here, so that we can attempt for garbage cleanup.
LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId());
LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId() + " due to " + e.getMessage());
}
} else {
LOG.info("Lease not present in lease table while cleaning the shard " + shardInfo.getShardId());
cleanedUpCompletedLease = true;
}
}

if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) {
try {
wereChildShardsPresent = !CollectionUtils
if (!cleanedUpCompletedLease && !alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) {
// throws ResourceNotFoundException
wereChildShardsPresent = !CollectionUtils
.isNullOrEmpty(getChildShardsFromService(shardInfo));
} catch (ResourceNotFoundException e) {
throw e;
}
}
} catch (ResourceNotFoundException e) {
wasResourceNotFound = true;
cleanedUpGarbageLease = cleanupLeaseForGarbageShard(lease);
cleanupFailureReason = cleanedUpGarbageLease ? "" : "DDB Lease Deletion Failed";
} catch (Exception e) {
LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId() + " : " + e.getMessage());
cleanupFailureReason = e.getMessage();
}

return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease, wereChildShardsPresent,
wasResourceNotFound);
wasResourceNotFound, cleanupFailureReason);
}

private Set<String> getChildShardsFromService(ShardInfo shardInfo) {
final String iterator = kinesisProxy.getIterator(shardInfo.getShardId(), ShardIteratorType.LATEST.toString());
return kinesisProxy.get(iterator, maxRecords).getChildShards().stream().map(c -> c.getShardId()).collect(Collectors.toSet());
}


// A lease that ended with SHARD_END from ResourceNotFoundException is safe to delete if it no longer exists in the
// stream (known explicitly from ResourceNotFound being thrown when processing this shard),
private boolean cleanupLeaseForGarbageShard(KinesisClientLease lease) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
LOG.info("Deleting lease " + lease.getLeaseKey() + " as it is not present in the stream.");
leaseManager.deleteLease(lease);
try {
leaseManager.deleteLease(lease);
} catch (Exception e) {
LOG.warn("Lease deletion failed for " + lease.getLeaseKey() + " due to " + e.getMessage());
return false;
}
return true;
}

Expand All @@ -264,8 +270,9 @@ private boolean allParentShardLeasesDeleted(KinesisClientLease lease) throws Dep
// We should only be deleting the current shard's lease if
// 1. All of its children are currently being processed, i.e their checkpoint is not TRIM_HORIZON or AT_TIMESTAMP.
// 2. Its parent shard lease(s) have already been deleted.
private boolean cleanupLeaseForCompletedShard(KinesisClientLease lease, Set<String> childShardLeaseKeys)
private CompletedShardResult cleanupLeaseForCompletedShard(KinesisClientLease lease, Set<String> childShardLeaseKeys)
throws DependencyException, ProvisionedThroughputException, InvalidStateException, IllegalStateException {

final Set<String> processedChildShardLeaseKeys = new HashSet<>();

for (String childShardLeaseKey : childShardLeaseKeys) {
Expand All @@ -281,22 +288,25 @@ private boolean cleanupLeaseForCompletedShard(KinesisClientLease lease, Set<Stri
}
}

if (!allParentShardLeasesDeleted(lease) || !Objects.equals(childShardLeaseKeys, processedChildShardLeaseKeys)) {
return false;
boolean parentShardsDeleted = allParentShardLeasesDeleted(lease);
boolean childrenStartedProcessing = Objects.equals(childShardLeaseKeys, processedChildShardLeaseKeys);

if (!parentShardsDeleted || !childrenStartedProcessing) {
return new CompletedShardResult(false, !parentShardsDeleted ? "Parent shard(s) not deleted yet" : "Child shard(s) yet to begin processing");
}

LOG.info("Deleting lease " + lease.getLeaseKey() + " as it has been completely processed and processing of child shard(s) has begun.");
leaseManager.deleteLease(lease);

return true;
return new CompletedShardResult(true, "");
}

private void updateLeaseWithChildShards(LeasePendingDeletion leasePendingDeletion, Set<String> childShardKeys)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
final KinesisClientLease updatedLease = leasePendingDeletion.lease();
updatedLease.setChildShardIds(childShardKeys);

leaseManager.updateLease(updatedLease);
leaseManager.updateLeaseWithMetaInfo(updatedLease, UpdateField.CHILD_SHARDS);
}

@VisibleForTesting
Expand Down Expand Up @@ -364,9 +374,17 @@ public static class LeaseCleanupResult {
boolean cleanedUpGarbageLease;
boolean wereChildShardsPresent;
boolean wasResourceNotFound;
String cleanupFailureReason;

public boolean leaseCleanedUp() {
return cleanedUpCompletedLease | cleanedUpGarbageLease;
}
}

@Value
@Accessors(fluent = true)
private static class CompletedShardResult {
boolean cleanedUp;
String failureMsg;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,6 @@ public void updateLeaseWithMetaInfo(T lease, UpdateField updateField)
request.setExpected(serializer.getDynamoExistentExpectation(lease.getLeaseKey()));

Map<String, AttributeValueUpdate> updates = serializer.getDynamoUpdateLeaseUpdate(lease, updateField);
updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease));
request.setAttributeUpdates(updates);

try {
Expand Down
Loading