Skip to content

Commit ae223cf

Browse files
authored
Filter replication task on source cluster (#3641)
1 parent 4c4ef46 commit ae223cf

File tree

5 files changed

+337
-210
lines changed

5 files changed

+337
-210
lines changed

common/dynamicconfig/constants.go

+2
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,8 @@ const (
555555

556556
// ReplicatorTaskBatchSize is batch size for ReplicatorProcessor
557557
ReplicatorTaskBatchSize = "history.replicatorTaskBatchSize"
558+
// ReplicatorMaxSkipTaskCount is maximum number of tasks that can be skipped during tasks pagination due to not meeting filtering conditions (e.g. missed namespace).
559+
ReplicatorMaxSkipTaskCount = "history.replicatorMaxSkipTaskCount"
558560
// ReplicatorTaskWorkerCount is number of worker for ReplicatorProcessor
559561
ReplicatorTaskWorkerCount = "history.replicatorTaskWorkerCount"
560562
// ReplicatorTaskMaxRetryCount is max times of retry for ReplicatorProcessor

common/namespace/namespace.go

+10
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,16 @@ func (ns *Namespace) ClusterNames() []string {
177177
return out
178178
}
179179

180+
// IsOnCluster returns true is namespace is registered on cluster otherwise false.
181+
func (ns *Namespace) IsOnCluster(clusterName string) bool {
182+
for _, namespaceCluster := range ns.replicationConfig.Clusters {
183+
if namespaceCluster == clusterName {
184+
return true
185+
}
186+
}
187+
return false
188+
}
189+
180190
// ConfigVersion return the namespace config version
181191
func (ns *Namespace) ConfigVersion() int64 {
182192
return ns.configVersion

service/history/configs/config.go

+2
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ type Config struct {
158158
ReplicatorProcessorMaxReschedulerSize dynamicconfig.IntPropertyFn
159159
ReplicatorProcessorEnablePriorityTaskProcessor dynamicconfig.BoolPropertyFn
160160
ReplicatorProcessorFetchTasksBatchSize dynamicconfig.IntPropertyFn
161+
ReplicatorProcessorMaxSkipTaskCount dynamicconfig.IntPropertyFn
161162

162163
// System Limits
163164
MaximumBufferedEventsBatch dynamicconfig.IntPropertyFn
@@ -418,6 +419,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
418419
ReplicatorProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.ReplicatorProcessorMaxReschedulerSize, 10000),
419420
ReplicatorProcessorEnablePriorityTaskProcessor: dc.GetBoolProperty(dynamicconfig.ReplicatorProcessorEnablePriorityTaskProcessor, false),
420421
ReplicatorProcessorFetchTasksBatchSize: dc.GetIntProperty(dynamicconfig.ReplicatorTaskBatchSize, 25),
422+
ReplicatorProcessorMaxSkipTaskCount: dc.GetIntProperty(dynamicconfig.ReplicatorMaxSkipTaskCount, 250),
421423
ReplicationTaskProcessorHostQPS: dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorHostQPS, 1500),
422424
ReplicationTaskProcessorShardQPS: dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorShardQPS, 30),
423425

service/history/replication/ack_manager.go

+63-30
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"go.temporal.io/server/common/collection"
4444
"go.temporal.io/server/common/convert"
4545
"go.temporal.io/server/common/definition"
46+
"go.temporal.io/server/common/dynamicconfig"
4647
"go.temporal.io/server/common/log"
4748
"go.temporal.io/server/common/log/tag"
4849
"go.temporal.io/server/common/metrics"
@@ -74,7 +75,9 @@ type (
7475
metricsHandler metrics.MetricsHandler
7576
logger log.Logger
7677
retryPolicy backoff.RetryPolicy
77-
pageSize int
78+
namespaceRegistry namespace.Registry
79+
pageSize dynamicconfig.IntPropertyFn
80+
maxSkipTaskCount dynamicconfig.IntPropertyFn
7881

7982
sync.Mutex
8083
// largest replication task ID generated
@@ -86,7 +89,6 @@ type (
8689

8790
var (
8891
errUnknownReplicationTask = serviceerror.NewInternal("unknown replication task")
89-
emptyReplicationTasks = []*replicationspb.ReplicationTask{}
9092
)
9193

9294
func NewAckManager(
@@ -112,7 +114,9 @@ func NewAckManager(
112114
metricsHandler: shard.GetMetricsHandler().WithTags(metrics.OperationTag(metrics.ReplicatorQueueProcessorScope)),
113115
logger: log.With(logger, tag.ComponentReplicatorQueue),
114116
retryPolicy: retryPolicy,
115-
pageSize: config.ReplicatorProcessorFetchTasksBatchSize(),
117+
namespaceRegistry: shard.GetNamespaceRegistry(),
118+
pageSize: config.ReplicatorProcessorFetchTasksBatchSize,
119+
maxSkipTaskCount: config.ReplicatorProcessorMaxSkipTaskCount,
116120

117121
maxTaskID: nil,
118122
sanityCheckTime: time.Time{},
@@ -224,9 +228,9 @@ func (p *ackMgrImpl) GetTasks(
224228
minTaskID, maxTaskID := p.taskIDsRange(queryMessageID)
225229
replicationTasks, lastTaskID, err := p.getTasks(
226230
ctx,
231+
pollingCluster,
227232
minTaskID,
228233
maxTaskID,
229-
p.pageSize,
230234
)
231235
if err != nil {
232236
return nil, err
@@ -255,49 +259,65 @@ func (p *ackMgrImpl) GetTasks(
255259

256260
func (p *ackMgrImpl) getTasks(
257261
ctx context.Context,
262+
pollingCluster string,
258263
minTaskID int64,
259264
maxTaskID int64,
260-
batchSize int,
261265
) ([]*replicationspb.ReplicationTask, int64, error) {
262266
if minTaskID > maxTaskID {
263-
return nil, 0, serviceerror.NewUnavailable("min task ID < max task ID, probably due to shard re-balancing")
267+
return nil, 0, serviceerror.NewUnavailable("min task ID > max task ID, probably due to shard re-balancing")
264268
} else if minTaskID == maxTaskID {
265-
return []*replicationspb.ReplicationTask{}, maxTaskID, nil
269+
return nil, maxTaskID, nil
266270
}
267271

268-
replicationTasks := make([]*replicationspb.ReplicationTask, 0, batchSize)
269-
iter := collection.NewPagingIterator(p.getPaginationFn(ctx, minTaskID, maxTaskID, batchSize))
270-
for iter.HasNext() && len(replicationTasks) < batchSize {
272+
replicationTasks := make([]*replicationspb.ReplicationTask, 0, p.pageSize())
273+
skippedTaskCount := 0
274+
lastTaskID := maxTaskID // If no tasks are returned, then it means there are no tasks bellow maxTaskID.
275+
iter := collection.NewPagingIterator(p.getReplicationTasksFn(ctx, minTaskID, maxTaskID, p.pageSize()))
276+
// iter.HasNext() should be the last check to avoid extra page read in case if replicationTasks is already full.
277+
for len(replicationTasks) < p.pageSize() && skippedTaskCount <= p.maxSkipTaskCount() && iter.HasNext() {
271278
task, err := iter.Next()
272279
if err != nil {
273-
p.logger.Error("replication task reader encounter error, return earlier", tag.Error(err))
274-
if len(replicationTasks) == 0 {
275-
return nil, 0, err
276-
} else {
277-
return replicationTasks, replicationTasks[len(replicationTasks)-1].GetSourceTaskId(), nil
278-
}
280+
return p.swallowPartialResultsError(replicationTasks, lastTaskID, err)
279281
}
280282

281-
if replicationTask, err := p.toReplicationTask(ctx, task); err != nil {
282-
p.logger.Error("replication task reader encounter error, return earlier", tag.Error(err))
283-
if len(replicationTasks) == 0 {
284-
return nil, 0, err
285-
} else {
286-
return replicationTasks, replicationTasks[len(replicationTasks)-1].GetSourceTaskId(), nil
283+
// If, for any reason, task is skipped:
284+
// - lastTaskID needs to be updated because this task should not be read next time,
285+
// - skippedTaskCount needs to be incremented to prevent timeout on caller side (too many tasks are skipped).
286+
// If error has occurred though, lastTaskID shouldn't be updated, and next time task needs to be read again.
287+
288+
ns, err := p.namespaceRegistry.GetNamespaceByID(namespace.ID(task.GetNamespaceID()))
289+
if err != nil {
290+
if _, isNotFound := err.(*serviceerror.NamespaceNotFound); !isNotFound {
291+
return p.swallowPartialResultsError(replicationTasks, lastTaskID, err)
287292
}
288-
} else if replicationTask != nil {
289-
replicationTasks = append(replicationTasks, replicationTask)
293+
// Namespace doesn't exist on this cluster (i.e. deleted). It is safe to skip the task.
294+
lastTaskID = task.GetTaskID()
295+
skippedTaskCount++
296+
continue
297+
}
298+
// If namespace doesn't exist on polling cluster, there is no reason to send the task.
299+
if !ns.IsOnCluster(pollingCluster) {
300+
lastTaskID = task.GetTaskID()
301+
skippedTaskCount++
302+
continue
290303
}
291-
}
292304

293-
if len(replicationTasks) == 0 {
294-
return emptyReplicationTasks, maxTaskID, nil
295-
} else {
296-
return replicationTasks, replicationTasks[len(replicationTasks)-1].GetSourceTaskId(), nil
305+
replicationTask, err := p.toReplicationTask(ctx, task)
306+
if err != nil {
307+
return p.swallowPartialResultsError(replicationTasks, lastTaskID, err)
308+
} else if replicationTask == nil {
309+
lastTaskID = task.GetTaskID()
310+
skippedTaskCount++
311+
continue
312+
}
313+
lastTaskID = task.GetTaskID()
314+
replicationTasks = append(replicationTasks, replicationTask)
297315
}
316+
317+
return replicationTasks, lastTaskID, nil
298318
}
299319

300-
func (p *ackMgrImpl) getPaginationFn(
320+
func (p *ackMgrImpl) getReplicationTasksFn(
301321
ctx context.Context,
302322
minTaskID int64,
303323
maxTaskID int64,
@@ -319,6 +339,19 @@ func (p *ackMgrImpl) getPaginationFn(
319339
}
320340
}
321341

342+
func (p *ackMgrImpl) swallowPartialResultsError(
343+
replicationTasks []*replicationspb.ReplicationTask,
344+
lastTaskID int64,
345+
err error,
346+
) ([]*replicationspb.ReplicationTask, int64, error) {
347+
348+
p.logger.Error("Replication tasks reader encountered error, return earlier.", tag.Error(err), tag.Value(len(replicationTasks)))
349+
if len(replicationTasks) == 0 {
350+
return nil, 0, err
351+
}
352+
return replicationTasks, lastTaskID, nil
353+
}
354+
322355
func (p *ackMgrImpl) taskIDsRange(
323356
lastReadMessageID int64,
324357
) (minTaskID int64, maxTaskID int64) {

0 commit comments

Comments
 (0)