Skip to content

Commit fc6efef

Browse files
authored
Remove failover levels (#3854)
1 parent d1c5d10 commit fc6efef

34 files changed

+159
-337
lines changed

common/metrics/metric_defs.go

-2
Original file line numberDiff line numberDiff line change
@@ -1412,8 +1412,6 @@ var (
14121412
ShardInfoVisibilityLagHistogram = NewDimensionlessHistogramDef("shardinfo_visibility_lag")
14131413
ShardInfoImmediateQueueLagHistogram = NewDimensionlessHistogramDef("shardinfo_immediate_queue_lag")
14141414
ShardInfoScheduledQueueLagTimer = NewTimerDef("shardinfo_scheduled_queue_lag")
1415-
ShardInfoTransferFailoverLatencyTimer = NewTimerDef("shardinfo_transfer_failover_latency")
1416-
ShardInfoTimerFailoverLatencyTimer = NewTimerDef("shardinfo_timer_failover_latency")
14171415
SyncShardFromRemoteCounter = NewCounterDef("syncshard_remote_count")
14181416
SyncShardFromRemoteFailure = NewCounterDef("syncshard_remote_failed")
14191417
TaskRequests = NewCounterDef("task_requests")

common/persistence/dataInterfaces.go

-17
Original file line numberDiff line numberDiff line change
@@ -163,23 +163,6 @@ type (
163163
Msg string
164164
}
165165

166-
// ShardInfoWithFailover describes a shard
167-
ShardInfoWithFailover struct {
168-
*persistencespb.ShardInfo
169-
FailoverLevels map[tasks.Category]map[string]FailoverLevel // uuid -> FailoverLevel
170-
}
171-
172-
// FailoverLevel contains corresponding start / end level
173-
// TODO: remove FailoverLevel definition, they are only used by
174-
// old queue processing logic
175-
FailoverLevel struct {
176-
StartTime time.Time
177-
MinLevel tasks.Key
178-
CurrentLevel tasks.Key
179-
MaxLevel tasks.Key
180-
NamespaceIDs map[string]struct{}
181-
}
182-
183166
// TaskQueueKey is the struct used to identity TaskQueues
184167
TaskQueueKey struct {
185168
NamespaceID string

service/history/historyEngine2_test.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,10 @@ func (s *engine2Suite) SetupTest() {
135135
s.config = tests.NewDynamicConfig()
136136
mockShard := shard.NewTestContext(
137137
s.controller,
138-
&persistence.ShardInfoWithFailover{
139-
ShardInfo: &persistencespb.ShardInfo{
140-
ShardId: 1,
141-
RangeId: 1,
142-
}},
138+
&persistencespb.ShardInfo{
139+
ShardId: 1,
140+
RangeId: 1,
141+
},
143142
s.config,
144143
)
145144
s.mockShard = mockShard

service/history/historyEngine3_eventsv2_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,10 @@ func (s *engine3Suite) SetupTest() {
112112

113113
s.mockShard = shard.NewTestContext(
114114
s.controller,
115-
&persistence.ShardInfoWithFailover{ShardInfo: &persistencespb.ShardInfo{
115+
&persistencespb.ShardInfo{
116116
ShardId: 1,
117117
RangeId: 1,
118-
}},
118+
},
119119
s.config,
120120
)
121121
s.mockShard.Resource.ShardMgr.EXPECT().AssertShardOwnership(gomock.Any(), gomock.Any()).AnyTimes()

service/history/historyEngine_test.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,10 @@ func (s *engineSuite) SetupTest() {
142142
s.config = tests.NewDynamicConfig()
143143
s.mockShard = shard.NewTestContext(
144144
s.controller,
145-
&persistence.ShardInfoWithFailover{
146-
ShardInfo: &persistencespb.ShardInfo{
147-
ShardId: 1,
148-
RangeId: 1,
149-
}},
145+
&persistencespb.ShardInfo{
146+
ShardId: 1,
147+
RangeId: 1,
148+
},
150149
s.config,
151150
)
152151
s.workflowCache = wcache.NewCache(s.mockShard)

service/history/ndc/activity_replicator_test.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,10 @@ func (s *activityReplicatorSuite) SetupTest() {
104104
s.mockTimerProcessor.EXPECT().NotifyNewTasks(gomock.Any()).AnyTimes()
105105
s.mockShard = shard.NewTestContext(
106106
s.controller,
107-
&persistence.ShardInfoWithFailover{
108-
ShardInfo: &persistencespb.ShardInfo{
109-
ShardId: 1,
110-
RangeId: 1,
111-
}},
107+
&persistencespb.ShardInfo{
108+
ShardId: 1,
109+
RangeId: 1,
110+
},
112111
tests.NewDynamicConfig(),
113112
)
114113
s.workflowCache = wcache.NewCache(s.mockShard).(*wcache.CacheImpl)

service/history/ndc/branch_manager_test.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,10 @@ func (s *branchMgrSuite) SetupTest() {
8686

8787
s.mockShard = shard.NewTestContext(
8888
s.controller,
89-
&persistence.ShardInfoWithFailover{
90-
ShardInfo: &persistencespb.ShardInfo{
91-
ShardId: 10,
92-
RangeId: 1,
93-
}},
89+
&persistencespb.ShardInfo{
90+
ShardId: 10,
91+
RangeId: 1,
92+
},
9493
tests.NewDynamicConfig(),
9594
)
9695

service/history/ndc/conflict_resolver_test.go

+4-6
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import (
3838
"go.temporal.io/server/common/convert"
3939
"go.temporal.io/server/common/definition"
4040
"go.temporal.io/server/common/log"
41-
"go.temporal.io/server/common/persistence"
4241
"go.temporal.io/server/common/persistence/versionhistory"
4342
"go.temporal.io/server/service/history/shard"
4443
"go.temporal.io/server/service/history/tests"
@@ -82,11 +81,10 @@ func (s *conflictResolverSuite) SetupTest() {
8281

8382
s.mockShard = shard.NewTestContext(
8483
s.controller,
85-
&persistence.ShardInfoWithFailover{
86-
ShardInfo: &persistencespb.ShardInfo{
87-
ShardId: 10,
88-
RangeId: 1,
89-
}},
84+
&persistencespb.ShardInfo{
85+
ShardId: 10,
86+
RangeId: 1,
87+
},
9088
tests.NewDynamicConfig(),
9189
)
9290

service/history/ndc/history_replicator_test.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,10 @@ func (s *historyReplicatorSuite) SetupTest() {
9494

9595
s.mockShard = shard.NewTestContext(
9696
s.controller,
97-
&persistence.ShardInfoWithFailover{
98-
ShardInfo: &persistencespb.ShardInfo{
99-
ShardId: 10,
100-
RangeId: 1,
101-
}},
97+
&persistencespb.ShardInfo{
98+
ShardId: 10,
99+
RangeId: 1,
100+
},
102101
tests.NewDynamicConfig(),
103102
)
104103

service/history/ndc/resetter_test.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,10 @@ func (s *resetterSuite) SetupTest() {
9191

9292
s.mockShard = shard.NewTestContext(
9393
s.controller,
94-
&persistence.ShardInfoWithFailover{
95-
ShardInfo: &persistencespb.ShardInfo{
96-
ShardId: 10,
97-
RangeId: 1,
98-
}},
94+
&persistencespb.ShardInfo{
95+
ShardId: 10,
96+
RangeId: 1,
97+
},
9998
tests.NewDynamicConfig(),
10099
)
101100

service/history/ndc/state_rebuilder_test.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,10 @@ func (s *stateRebuilderSuite) SetupTest() {
9494

9595
s.mockShard = shard.NewTestContext(
9696
s.controller,
97-
&persistence.ShardInfoWithFailover{
98-
ShardInfo: &persistencespb.ShardInfo{
99-
ShardId: 10,
100-
RangeId: 1,
101-
}},
97+
&persistencespb.ShardInfo{
98+
ShardId: 10,
99+
RangeId: 1,
100+
},
102101
tests.NewDynamicConfig(),
103102
)
104103

service/history/ndc/transaction_manager_test.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,10 @@ func (s *transactionMgrSuite) SetupTest() {
8888

8989
s.mockShard = shard.NewTestContext(
9090
s.controller,
91-
&persistence.ShardInfoWithFailover{
92-
ShardInfo: &persistencespb.ShardInfo{
93-
ShardId: 10,
94-
RangeId: 1,
95-
}},
91+
&persistencespb.ShardInfo{
92+
ShardId: 10,
93+
RangeId: 1,
94+
},
9695
tests.NewDynamicConfig(),
9796
)
9897

service/history/ndc/workflow_resetter_test.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,10 @@ func (s *workflowResetterSuite) SetupTest() {
9898

9999
s.mockShard = shard.NewTestContext(
100100
s.controller,
101-
&persistence.ShardInfoWithFailover{
102-
ShardInfo: &persistencespb.ShardInfo{
103-
ShardId: 0,
104-
RangeId: 1,
105-
}},
101+
&persistencespb.ShardInfo{
102+
ShardId: 0,
103+
RangeId: 1,
104+
},
106105
tests.NewDynamicConfig(),
107106
)
108107
s.mockExecutionMgr = s.mockShard.Resource.ExecutionMgr

service/history/queues/queue_base_test.go

+37-51
Original file line numberDiff line numberDiff line change
@@ -120,14 +120,12 @@ func (s *queueBaseSuite) TestNewProcessBase_NoPreviousState() {
120120

121121
mockShard := shard.NewTestContext(
122122
s.controller,
123-
&persistence.ShardInfoWithFailover{
124-
ShardInfo: &persistencespb.ShardInfo{
125-
ShardId: 0,
126-
RangeId: rangeID,
127-
QueueAckLevels: map[int32]*persistencespb.QueueAckLevel{
128-
tasks.CategoryIDTransfer: {
129-
AckLevel: ackLevel,
130-
},
123+
&persistencespb.ShardInfo{
124+
ShardId: 0,
125+
RangeId: rangeID,
126+
QueueAckLevels: map[int32]*persistencespb.QueueAckLevel{
127+
tasks.CategoryIDTransfer: {
128+
AckLevel: ackLevel,
131129
},
132130
},
133131
},
@@ -207,13 +205,11 @@ func (s *queueBaseSuite) TestNewProcessBase_WithPreviousState() {
207205

208206
mockShard := shard.NewTestContext(
209207
s.controller,
210-
&persistence.ShardInfoWithFailover{
211-
ShardInfo: &persistencespb.ShardInfo{
212-
ShardId: 0,
213-
RangeId: 10,
214-
QueueStates: map[int32]*persistencespb.QueueState{
215-
tasks.CategoryIDTransfer: persistenceState,
216-
},
208+
&persistencespb.ShardInfo{
209+
ShardId: 0,
210+
RangeId: 10,
211+
QueueStates: map[int32]*persistencespb.QueueState{
212+
tasks.CategoryIDTransfer: persistenceState,
217213
},
218214
},
219215
s.config,
@@ -248,14 +244,12 @@ func (s *queueBaseSuite) TestNewProcessBase_WithPreviousState() {
248244
func (s *queueBaseSuite) TestStartStop() {
249245
mockShard := shard.NewTestContext(
250246
s.controller,
251-
&persistence.ShardInfoWithFailover{
252-
ShardInfo: &persistencespb.ShardInfo{
253-
ShardId: 0,
254-
RangeId: 10,
255-
QueueAckLevels: map[int32]*persistencespb.QueueAckLevel{
256-
tasks.CategoryIDTransfer: {
257-
AckLevel: 1024,
258-
},
247+
&persistencespb.ShardInfo{
248+
ShardId: 0,
249+
RangeId: 10,
250+
QueueAckLevels: map[int32]*persistencespb.QueueAckLevel{
251+
tasks.CategoryIDTransfer: {
252+
AckLevel: 1024,
259253
},
260254
},
261255
},
@@ -318,13 +312,11 @@ func (s *queueBaseSuite) TestProcessNewRange() {
318312

319313
mockShard := shard.NewTestContext(
320314
s.controller,
321-
&persistence.ShardInfoWithFailover{
322-
ShardInfo: &persistencespb.ShardInfo{
323-
ShardId: 0,
324-
RangeId: 10,
325-
QueueStates: map[int32]*persistencespb.QueueState{
326-
tasks.CategoryIDTimer: persistenceState,
327-
},
315+
&persistencespb.ShardInfo{
316+
ShardId: 0,
317+
RangeId: 10,
318+
QueueStates: map[int32]*persistencespb.QueueState{
319+
tasks.CategoryIDTimer: persistenceState,
328320
},
329321
},
330322
s.config,
@@ -375,13 +367,11 @@ func (s *queueBaseSuite) TestCheckPoint_WithPendingTasks() {
375367

376368
mockShard := shard.NewTestContext(
377369
s.controller,
378-
&persistence.ShardInfoWithFailover{
379-
ShardInfo: &persistencespb.ShardInfo{
380-
ShardId: 0,
381-
RangeId: 10,
382-
QueueStates: map[int32]*persistencespb.QueueState{
383-
tasks.CategoryIDTimer: persistenceState,
384-
},
370+
&persistencespb.ShardInfo{
371+
ShardId: 0,
372+
RangeId: 10,
373+
QueueStates: map[int32]*persistencespb.QueueState{
374+
tasks.CategoryIDTimer: persistenceState,
385375
},
386376
},
387377
s.config,
@@ -449,13 +439,11 @@ func (s *queueBaseSuite) TestCheckPoint_NoPendingTasks() {
449439

450440
mockShard := shard.NewTestContext(
451441
s.controller,
452-
&persistence.ShardInfoWithFailover{
453-
ShardInfo: &persistencespb.ShardInfo{
454-
ShardId: 0,
455-
RangeId: 10,
456-
QueueStates: map[int32]*persistencespb.QueueState{
457-
tasks.CategoryIDTimer: persistenceState,
458-
},
442+
&persistencespb.ShardInfo{
443+
ShardId: 0,
444+
RangeId: 10,
445+
QueueStates: map[int32]*persistencespb.QueueState{
446+
tasks.CategoryIDTimer: persistenceState,
459447
},
460448
},
461449
s.config,
@@ -538,13 +526,11 @@ func (s *queueBaseSuite) TestCheckPoint_MoveSlices() {
538526

539527
mockShard := shard.NewTestContext(
540528
s.controller,
541-
&persistence.ShardInfoWithFailover{
542-
ShardInfo: &persistencespb.ShardInfo{
543-
ShardId: 0,
544-
RangeId: 10,
545-
QueueStates: map[int32]*persistencespb.QueueState{
546-
tasks.CategoryIDTimer: initialPersistenceState,
547-
},
529+
&persistencespb.ShardInfo{
530+
ShardId: 0,
531+
RangeId: 10,
532+
QueueStates: map[int32]*persistencespb.QueueState{
533+
tasks.CategoryIDTimer: initialPersistenceState,
548534
},
549535
},
550536
s.config,

service/history/queues/queue_scheduled_test.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,9 @@ func (s *scheduledQueueSuite) SetupTest() {
7373
s.controller = gomock.NewController(s.T())
7474
s.mockShard = shard.NewTestContext(
7575
s.controller,
76-
&persistence.ShardInfoWithFailover{
77-
ShardInfo: &persistencespb.ShardInfo{
78-
ShardId: 0,
79-
RangeId: 1,
80-
},
76+
&persistencespb.ShardInfo{
77+
ShardId: 0,
78+
RangeId: 1,
8179
},
8280
tests.NewDynamicConfig(),
8381
)

service/history/replication/ack_manager_test.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,10 @@ func (s *ackManagerSuite) SetupTest() {
9898

9999
s.mockShard = shard.NewTestContext(
100100
s.controller,
101-
&persistence.ShardInfoWithFailover{
102-
ShardInfo: &persistencespb.ShardInfo{
103-
ShardId: 0,
104-
RangeId: 1,
105-
}},
101+
&persistencespb.ShardInfo{
102+
ShardId: 0,
103+
RangeId: 1,
104+
},
106105
tests.NewDynamicConfig(),
107106
)
108107

service/history/replication/dlq_handler_test.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,11 @@ func (s *dlqHandlerSuite) SetupTest() {
9494

9595
s.mockShard = shard.NewTestContext(
9696
s.controller,
97-
&persistence.ShardInfoWithFailover{
98-
ShardInfo: &persistencespb.ShardInfo{
99-
ShardId: 0,
100-
RangeId: 1,
101-
ReplicationDlqAckLevel: map[string]int64{cluster.TestAlternativeClusterName: persistence.EmptyQueueMessageID},
102-
}},
97+
&persistencespb.ShardInfo{
98+
ShardId: 0,
99+
RangeId: 1,
100+
ReplicationDlqAckLevel: map[string]int64{cluster.TestAlternativeClusterName: persistence.EmptyQueueMessageID},
101+
},
103102
tests.NewDynamicConfig(),
104103
)
105104
s.mockResource = s.mockShard.Resource

0 commit comments

Comments
 (0)