Skip to content

Commit 8b6b2fb

Browse files
Keep queue factory group tag (#3880)
1 parent 6ef7749 commit 8b6b2fb

File tree

2 files changed

+38
-32
lines changed

2 files changed

+38
-32
lines changed

service/history/historyEngineFactory.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ type (
5555
NewCacheFn wcache.NewCacheFn
5656
ArchivalClient archiver.Client
5757
EventSerializer serialization.Serializer
58-
QueueFactories []QueueFactory
58+
QueueFactories []QueueFactory `group:"queueFactory"`
5959
ReplicationTaskFetcherFactory replication.TaskFetcherFactory
6060
ReplicationTaskExecutorProvider replication.TaskExecutorProvider
6161
TracerProvider trace.TracerProvider

service/history/queueFactoryBase.go

+37-31
Original file line numberDiff line numberDiff line change
@@ -89,63 +89,69 @@ type (
8989
fx.In
9090

9191
Lifecycle fx.Lifecycle
92-
Factories []QueueFactory
92+
Factories []QueueFactory `group:"queueFactory"`
9393
}
9494
)
9595

9696
var QueueModule = fx.Options(
9797
fx.Provide(QueueSchedulerRateLimiterProvider),
9898
fx.Provide(
9999
fx.Annotated{
100-
Name: "transferQueueFactory",
100+
Group: QueueFactoryFxGroup,
101101
Target: NewTransferQueueFactory,
102102
},
103103
fx.Annotated{
104-
Name: "timerQueueFactory",
104+
Group: QueueFactoryFxGroup,
105105
Target: NewTimerQueueFactory,
106106
},
107107
fx.Annotated{
108-
Name: "visibilityQueueFactory",
108+
Group: QueueFactoryFxGroup,
109109
Target: NewVisibilityQueueFactory,
110110
},
111-
fx.Annotated{
112-
Name: "archivalQueueFactory",
113-
Target: NewArchivalQueueFactory,
114-
},
115-
getQueueFactories,
111+
getOptionalQueueFactories,
116112
),
117113
fx.Invoke(QueueFactoryLifetimeHooks),
118114
)
119115

120-
type queueFactorySet struct {
121-
fx.In
122-
123-
TransferQueueFactory QueueFactory `name:"transferQueueFactory"`
124-
TimerQueueFactory QueueFactory `name:"timerQueueFactory"`
125-
VisibilityQueueFactory QueueFactory `name:"visibilityQueueFactory"`
126-
ArchivalQueueFactory QueueFactory `name:"archivalQueueFactory"`
116+
// additionalQueueFactories is a container for a list of queue factories that are only added to the group if
117+
// they are enabled. This exists because there is no way to conditionally add to a group with a provider that returns
118+
// a single object. For example, this doesn't work because it will always add the factory to the group, which can
119+
// cause NPEs:
120+
//
121+
// fx.Annotated{
122+
// Group: "queueFactory",
123+
// Target: func() QueueFactory { return isEnabled ? NewQueueFactory() : nil },
124+
// },
125+
type additionalQueueFactories struct {
126+
// This is what tells fx to add the factories to the group whenever this object is provided.
127+
fx.Out
128+
129+
// Factories is a list of queue factories that will be added to the `group:"queueFactory"` group.
130+
Factories []QueueFactory `group:"queueFactory,flatten"`
127131
}
128132

129-
// getQueueFactories returns factories for all the enabled queue types.
130-
// The archival queue factory is only returned when archival is enabled in the static config.
131-
func getQueueFactories(
132-
queueFactorySet queueFactorySet,
133+
// getOptionalQueueFactories returns an additionalQueueFactories which contains a list of queue factories that will be
134+
// added to the `group:"queueFactory"` group. The factories are added to the group only if they are enabled, which
135+
// is why we must return a list here.
136+
func getOptionalQueueFactories(
133137
archivalMetadata archiver.ArchivalMetadata,
134-
) []QueueFactory {
135-
factories := []QueueFactory{
136-
queueFactorySet.TransferQueueFactory,
137-
queueFactorySet.TimerQueueFactory,
138-
queueFactorySet.VisibilityQueueFactory,
139-
}
138+
params ArchivalQueueFactoryParams,
139+
) additionalQueueFactories {
140+
140141
c := tasks.CategoryArchival
141-
// this will only affect tests because this method is only called once in production,
142+
// Removing this category will only affect tests because this method is only called once in production,
142143
// but it may be called many times across test runs, which would leave the archival queue as a dangling category
143144
tasks.RemoveCategory(c.ID())
144-
if archivalMetadata.GetHistoryConfig().StaticClusterState() == archiver.ArchivalEnabled || archivalMetadata.GetVisibilityConfig().StaticClusterState() == archiver.ArchivalEnabled {
145-
factories = append(factories, queueFactorySet.ArchivalQueueFactory)
146-
tasks.NewCategory(c.ID(), c.Type(), c.Name())
145+
if archivalMetadata.GetHistoryConfig().StaticClusterState() != archiver.ArchivalEnabled &&
146+
archivalMetadata.GetVisibilityConfig().StaticClusterState() != archiver.ArchivalEnabled {
147+
return additionalQueueFactories{}
148+
}
149+
tasks.NewCategory(c.ID(), c.Type(), c.Name())
150+
return additionalQueueFactories{
151+
Factories: []QueueFactory{
152+
NewArchivalQueueFactory(params),
153+
},
147154
}
148-
return factories
149155
}
150156

151157
func QueueSchedulerRateLimiterProvider(

0 commit comments

Comments
 (0)