Skip to content

Commit 2d9bb16

Browse files
authored
Frontend health check waits until membership initialized (#3936)
1 parent 74bfaff commit 2d9bb16

File tree

8 files changed

+54
-4
lines changed

8 files changed

+54
-4
lines changed

common/membership/interfaces.go

+6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
package membership
2828

2929
import (
30+
"context"
3031
"errors"
3132

3233
"go.temporal.io/api/serviceerror"
@@ -82,6 +83,11 @@ type (
8283
// GetMemberCount returns the number of reachable members
8384
// currently in this node's membership list for the given service
8485
GetMemberCount(service primitives.ServiceName) (int, error)
86+
// WaitUntilInitialized blocks until initialization is completed and returns the result
87+
// of initialization. The current implementation does log.Fatal if it can't initialize,
88+
// so currently this will never return non-nil, except for context cancel/timeout. A
89+
// future implementation might return more errors.
90+
WaitUntilInitialized(context.Context) error
8591
}
8692

8793
// ServiceResolver provides membership information for a specific temporal service.

common/membership/interfaces_mock.go

+15
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/membership/rpMonitor.go

+10
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"time"
3737

3838
"go.temporal.io/server/common/convert"
39+
"go.temporal.io/server/common/future"
3940
"go.temporal.io/server/common/headers"
4041
"go.temporal.io/server/common/primitives"
4142

@@ -69,6 +70,7 @@ type ringpopMonitor struct {
6970
metadataManager persistence.ClusterMetadataManager
7071
broadcastHostPortResolver func() (string, error)
7172
hostID uuid.UUID
73+
initialized *future.FutureImpl[struct{}]
7274
}
7375

7476
var _ Monitor = (*ringpopMonitor)(nil)
@@ -103,6 +105,7 @@ func NewRingpopMonitor(
103105
metadataManager: metadataManager,
104106
broadcastHostPortResolver: broadcastHostPortResolver,
105107
hostID: uuid.NewUUID(),
108+
initialized: future.NewFuture[struct{}](),
106109
}
107110
for service, port := range services {
108111
rpo.rings[service] = newRingpopServiceResolver(service, port, rp, logger)
@@ -152,6 +155,13 @@ func (rpo *ringpopMonitor) Start() {
152155
for _, ring := range rpo.rings {
153156
ring.Start()
154157
}
158+
159+
rpo.initialized.Set(struct{}{}, nil)
160+
}
161+
162+
func (rpo *ringpopMonitor) WaitUntilInitialized(ctx context.Context) error {
163+
_, err := rpo.initialized.Get(ctx)
164+
return err
155165
}
156166

157167
func ServiceNameToServiceTypeEnum(name primitives.ServiceName) (persistence.ServiceType, error) {

common/resourcetest/resourceTest.go

+1
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ func NewTest(
166166
membershipMonitor.EXPECT().GetResolver(primitives.MatchingService).Return(matchingServiceResolver, nil).AnyTimes()
167167
membershipMonitor.EXPECT().GetResolver(primitives.HistoryService).Return(historyServiceResolver, nil).AnyTimes()
168168
membershipMonitor.EXPECT().GetResolver(primitives.WorkerService).Return(workerServiceResolver, nil).AnyTimes()
169+
membershipMonitor.EXPECT().WaitUntilInitialized(gomock.Any()).Return(nil).AnyTimes()
169170

170171
scope := tally.NewTestScope("test", nil)
171172
serviceName, _ := metrics.MetricsServiceIdxToServiceName(serviceMetricsIndex)

service/frontend/fx.go

+2
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,7 @@ func HandlerProvider(
555555
clusterMetadata cluster.Metadata,
556556
archivalMetadata archiver.ArchivalMetadata,
557557
healthServer *health.Server,
558+
membershipMonitor membership.Monitor,
558559
) Handler {
559560
wfHandler := NewWorkflowHandler(
560561
serviceConfig,
@@ -576,6 +577,7 @@ func HandlerProvider(
576577
archivalMetadata,
577578
healthServer,
578579
timeSource,
580+
membershipMonitor,
579581
)
580582
return wfHandler
581583
}

service/frontend/workflow_handler.go

+14-4
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ import (
6767
"go.temporal.io/server/common/headers"
6868
"go.temporal.io/server/common/log"
6969
"go.temporal.io/server/common/log/tag"
70+
"go.temporal.io/server/common/membership"
7071
"go.temporal.io/server/common/metrics"
7172
"go.temporal.io/server/common/namespace"
7273
"go.temporal.io/server/common/payload"
@@ -123,6 +124,7 @@ type (
123124
archivalMetadata archiver.ArchivalMetadata
124125
healthServer *health.Server
125126
overrides *Overrides
127+
membershipMonitor membership.Monitor
126128
}
127129
)
128130

@@ -147,6 +149,7 @@ func NewWorkflowHandler(
147149
archivalMetadata archiver.ArchivalMetadata,
148150
healthServer *health.Server,
149151
timeSource clock.TimeSource,
152+
membershipMonitor membership.Monitor,
150153
) *WorkflowHandler {
151154

152155
handler := &WorkflowHandler{
@@ -187,9 +190,10 @@ func NewWorkflowHandler(
187190
visibilityMrg.GetIndexName(),
188191
visibility.AllowListForValidation(visibilityMrg.GetName()),
189192
),
190-
archivalMetadata: archivalMetadata,
191-
healthServer: healthServer,
192-
overrides: NewOverrides(),
193+
archivalMetadata: archivalMetadata,
194+
healthServer: healthServer,
195+
overrides: NewOverrides(),
196+
membershipMonitor: membershipMonitor,
193197
}
194198

195199
return handler
@@ -202,7 +206,13 @@ func (wh *WorkflowHandler) Start() {
202206
common.DaemonStatusInitialized,
203207
common.DaemonStatusStarted,
204208
) {
205-
wh.healthServer.SetServingStatus(WorkflowServiceName, healthpb.HealthCheckResponse_SERVING)
209+
// Start in NOT_SERVING state and switch to SERVING after membership is ready
210+
wh.healthServer.SetServingStatus(WorkflowServiceName, healthpb.HealthCheckResponse_NOT_SERVING)
211+
go func() {
212+
_ = wh.membershipMonitor.WaitUntilInitialized(context.Background())
213+
wh.healthServer.SetServingStatus(WorkflowServiceName, healthpb.HealthCheckResponse_SERVING)
214+
wh.logger.Info("Frontend is now healthy")
215+
}()
206216
}
207217
}
208218

service/frontend/workflow_handler_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ func (s *workflowHandlerSuite) getWorkflowHandler(config *Config) *WorkflowHandl
190190
s.mockResource.GetArchivalMetadata(),
191191
health.NewServer(),
192192
clock.NewRealTimeSource(),
193+
s.mockResource.GetMembershipMonitor(),
193194
)
194195
}
195196

tests/simpleMonitor.go

+5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
package tests
2626

2727
import (
28+
"context"
2829
"fmt"
2930

3031
"go.temporal.io/server/common/membership"
@@ -92,3 +93,7 @@ func (s *simpleMonitor) GetReachableMembers() ([]string, error) {
9293
func (s *simpleMonitor) GetMemberCount(service primitives.ServiceName) (int, error) {
9394
return 0, nil
9495
}
96+
97+
func (s *simpleMonitor) WaitUntilInitialized(_ context.Context) error {
98+
return nil
99+
}

0 commit comments

Comments
 (0)