@@ -26,7 +26,6 @@ package replication
26
26
27
27
import (
28
28
"context"
29
- "fmt"
30
29
"sync"
31
30
"sync/atomic"
32
31
"time"
72
71
logger log.Logger
73
72
74
73
taskProcessorLock sync.RWMutex
75
- taskProcessors map [string ]TaskProcessor
74
+ taskProcessors map [string ][] TaskProcessor // cluster name - processor
76
75
minTxAckedTaskID int64
77
76
shutdownChan chan struct {}
78
77
}
@@ -114,7 +113,7 @@ func NewTaskProcessorManager(
114
113
),
115
114
logger : shard .GetLogger (),
116
115
metricsHandler : shard .GetMetricsHandler (),
117
- taskProcessors : make (map [string ]TaskProcessor ),
116
+ taskProcessors : make (map [string ][] TaskProcessor ),
118
117
taskExecutorProvider : taskExecutorProvider ,
119
118
taskPollerManager : newPollerManager (shard .GetShardID (), shard .GetClusterMetadata ()),
120
119
minTxAckedTaskID : persistence .EmptyQueueMessageID ,
@@ -149,8 +148,10 @@ func (r *taskProcessorManagerImpl) Stop() {
149
148
150
149
r .shard .GetClusterMetadata ().UnRegisterMetadataChangeCallback (r )
151
150
r .taskProcessorLock .Lock ()
152
- for _ , replicationTaskProcessor := range r .taskProcessors {
153
- replicationTaskProcessor .Stop ()
151
+ for _ , taskProcessors := range r .taskProcessors {
152
+ for _ , processor := range taskProcessors {
153
+ processor .Stop ()
154
+ }
154
155
}
155
156
r .taskProcessorLock .Unlock ()
156
157
}
@@ -170,44 +171,57 @@ func (r *taskProcessorManagerImpl) handleClusterMetadataUpdate(
170
171
r .taskProcessorLock .Lock ()
171
172
defer r .taskProcessorLock .Unlock ()
172
173
currentClusterName := r .shard .GetClusterMetadata ().GetCurrentClusterName ()
174
+ // The metadata triggers an update when the following fields update: 1. Enabled 2. Initial Failover Version 3. Cluster address
175
+ // The callback covers three cases:
176
+ // Case 1: Remove a cluster Case 2: Add a new cluster Case 3: Refresh cluster metadata(1 + 2).
177
+
178
+ // Case 1 and Case 3
173
179
for clusterName := range oldClusterMetadata {
174
180
if clusterName == currentClusterName {
175
181
continue
176
182
}
177
- sourceShardIds := r .taskPollerManager .getSourceClusterShardIDs (clusterName )
183
+ for _ , processor := range r .taskProcessors [clusterName ] {
184
+ processor .Stop ()
185
+ delete (r .taskProcessors , clusterName )
186
+ }
187
+ }
188
+
189
+ // Case 2 and Case 3
190
+ for clusterName := range newClusterMetadata {
191
+ if clusterName == currentClusterName {
192
+ continue
193
+ }
194
+ if clusterInfo := newClusterMetadata [clusterName ]; clusterInfo == nil || ! clusterInfo .Enabled {
195
+ continue
196
+ }
197
+ sourceShardIds , err := r .taskPollerManager .getSourceClusterShardIDs (clusterName )
198
+ if err != nil {
199
+ r .logger .Error ("Failed to get source shard id list" , tag .Error (err ), tag .ClusterName (clusterName ))
200
+ continue
201
+ }
202
+ var processors []TaskProcessor
178
203
for _ , sourceShardId := range sourceShardIds {
179
- perShardTaskProcessorKey := fmt .Sprintf (clusterCallbackKey , clusterName , sourceShardId )
180
- // The metadata triggers an update when the following fields update: 1. Enabled 2. Initial Failover Version 3. Cluster address
181
- // The callback covers three cases:
182
- // Case 1: Remove a cluster Case 2: Add a new cluster Case 3: Refresh cluster metadata.
183
- if processor , ok := r .taskProcessors [perShardTaskProcessorKey ]; ok {
184
- // Case 1 and Case 3
185
- processor .Stop ()
186
- delete (r .taskProcessors , perShardTaskProcessorKey )
187
- }
188
- if clusterInfo := newClusterMetadata [clusterName ]; clusterInfo != nil && clusterInfo .Enabled {
189
- // Case 2 and Case 3
190
- fetcher := r .replicationTaskFetcherFactory .GetOrCreateFetcher (clusterName )
191
- replicationTaskProcessor := NewTaskProcessor (
192
- sourceShardId ,
193
- r .shard ,
194
- r .engine ,
195
- r .config ,
196
- r .shard .GetMetricsHandler (),
197
- fetcher ,
198
- r .taskExecutorProvider (TaskExecutorParams {
199
- RemoteCluster : clusterName ,
200
- Shard : r .shard ,
201
- HistoryResender : r .resender ,
202
- DeleteManager : r .deleteMgr ,
203
- WorkflowCache : r .workflowCache ,
204
- }),
205
- r .eventSerializer ,
206
- )
207
- replicationTaskProcessor .Start ()
208
- r .taskProcessors [perShardTaskProcessorKey ] = replicationTaskProcessor
209
- }
204
+ fetcher := r .replicationTaskFetcherFactory .GetOrCreateFetcher (clusterName )
205
+ replicationTaskProcessor := NewTaskProcessor (
206
+ sourceShardId ,
207
+ r .shard ,
208
+ r .engine ,
209
+ r .config ,
210
+ r .shard .GetMetricsHandler (),
211
+ fetcher ,
212
+ r .taskExecutorProvider (TaskExecutorParams {
213
+ RemoteCluster : clusterName ,
214
+ Shard : r .shard ,
215
+ HistoryResender : r .resender ,
216
+ DeleteManager : r .deleteMgr ,
217
+ WorkflowCache : r .workflowCache ,
218
+ }),
219
+ r .eventSerializer ,
220
+ )
221
+ replicationTaskProcessor .Start ()
222
+ processors = append (processors , replicationTaskProcessor )
210
223
}
224
+ r .taskProcessors [clusterName ] = processors
211
225
}
212
226
}
213
227
0 commit comments