Skip to content

Commit 8ca4b2c

Browse files
authored
Handle data loss error in replication read history (#3704)
* Bypass data corruption error in replication
1 parent 0f27ed2 commit 8ca4b2c

File tree

4 files changed

+33
-5
lines changed

4 files changed

+33
-5
lines changed

common/dynamicconfig/constants.go

+2
Original file line numberDiff line numberDiff line change
@@ -663,6 +663,8 @@ const (
663663
ReplicationTaskProcessorHostQPS = "history.ReplicationTaskProcessorHostQPS"
664664
// ReplicationTaskProcessorShardQPS is the qps of task processing rate limiter on shard level
665665
ReplicationTaskProcessorShardQPS = "history.ReplicationTaskProcessorShardQPS"
666+
// ReplicationBypassCorruptedData is the flag to bypass corrupted workflow data in source cluster
667+
ReplicationBypassCorruptedData = "history.ReplicationBypassCorruptedData"
666668

667669
// keys for worker
668670

common/persistence/history_manager.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
enumspb "go.temporal.io/api/enums/v1"
3434
historypb "go.temporal.io/api/history/v1"
3535
"go.temporal.io/api/serviceerror"
36+
3637
"go.temporal.io/server/common/persistence/serialization"
3738

3839
persistencespb "go.temporal.io/server/api/persistence/v1"
@@ -1159,9 +1160,9 @@ func (m *executionManagerImpl) filterHistoryNodes(
11591160

11601161
switch {
11611162
case node.NodeID < lastNodeID:
1162-
return nil, serviceerror.NewUnavailable("corrupted data, nodeID cannot decrease")
1163+
return nil, serviceerror.NewDataLoss("corrupted data, nodeID cannot decrease")
11631164
case node.NodeID == lastNodeID:
1164-
return nil, serviceerror.NewUnavailable("corrupted data, same nodeID must have smaller txnID")
1165+
return nil, serviceerror.NewDataLoss("corrupted data, same nodeID must have smaller txnID")
11651166
default: // row.NodeID > lastNodeID:
11661167
// NOTE: when row.nodeID > lastNodeID, we expect the one with largest txnID comes first
11671168
lastTransactionID = node.TransactionID
@@ -1188,7 +1189,7 @@ func (m *executionManagerImpl) filterHistoryNodesReverse(
11881189

11891190
switch {
11901191
case node.NodeID > lastNodeID:
1191-
return nil, serviceerror.NewUnavailable("corrupted data, nodeID cannot decrease")
1192+
return nil, serviceerror.NewDataLoss("corrupted data, nodeID cannot decrease")
11921193
default:
11931194
lastTransactionID = node.PrevTransactionID
11941195
lastNodeID = node.NodeID

service/history/configs/config.go

+2
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ type Config struct {
241241
ReplicationTaskProcessorCleanupJitterCoefficient dynamicconfig.FloatPropertyFnWithShardIDFilter
242242
ReplicationTaskProcessorHostQPS dynamicconfig.FloatPropertyFn
243243
ReplicationTaskProcessorShardQPS dynamicconfig.FloatPropertyFn
244+
ReplicationBypassCorruptedData dynamicconfig.BoolPropertyFnWithNamespaceIDFilter
244245

245246
// The following are used by consistent query
246247
MaxBufferedQueryCount dynamicconfig.IntPropertyFn
@@ -422,6 +423,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
422423
ReplicatorProcessorMaxSkipTaskCount: dc.GetIntProperty(dynamicconfig.ReplicatorMaxSkipTaskCount, 250),
423424
ReplicationTaskProcessorHostQPS: dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorHostQPS, 1500),
424425
ReplicationTaskProcessorShardQPS: dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorShardQPS, 30),
426+
ReplicationBypassCorruptedData: dc.GetBoolPropertyFnWithNamespaceIDFilter(dynamicconfig.ReplicationBypassCorruptedData, false),
425427

426428
MaximumBufferedEventsBatch: dc.GetIntProperty(dynamicconfig.MaximumBufferedEventsBatch, 100),
427429
MaximumSignalsPerExecution: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MaximumSignalsPerExecution, 0),

service/history/replication/ack_manager.go

+25-2
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,7 @@ func (p *ackMgrImpl) generateHistoryReplicationTask(
485485
taskInfo.NextEventID,
486486
)
487487
if err != nil {
488-
return nil, err
488+
return nil, p.handleReadHistoryError(namespaceID, workflowID, runID, err)
489489
}
490490

491491
replicationTask := &replicationspb.ReplicationTask{
@@ -689,7 +689,7 @@ func (p *ackMgrImpl) processNewRunReplication(
689689
common.FirstEventID+1,
690690
)
691691
if err != nil {
692-
return nil, err
692+
return nil, p.handleReadHistoryError(namespaceID, workflowID, newRunID, err)
693693
}
694694
}
695695
attr.HistoryTaskAttributes.NewRunEvents = newRunEventsBlob
@@ -720,3 +720,26 @@ func getVersionHistoryItems(
720720
}
721721
return versionhistory.CopyVersionHistory(versionHistoryBranch).GetItems(), versionHistoryBranch.GetBranchToken(), nil
722722
}
723+
724+
func (p *ackMgrImpl) handleReadHistoryError(
725+
namespaceID namespace.ID,
726+
workflowID string,
727+
runID string,
728+
err error,
729+
) error {
730+
switch err.(type) {
731+
case *serviceerror.NotFound, *serviceerror.DataLoss:
732+
if p.config.ReplicationBypassCorruptedData(namespaceID.String()) {
733+
// bypass this corrupted workflow to unblock the replication queue.
734+
p.logger.Error("Cannot get history from corrupted workflow",
735+
tag.WorkflowNamespaceID(namespaceID.String()),
736+
tag.WorkflowID(workflowID),
737+
tag.WorkflowRunID(runID),
738+
tag.Error(err))
739+
return nil
740+
}
741+
return err
742+
default:
743+
return err
744+
}
745+
}

0 commit comments

Comments
 (0)