@@ -30,6 +30,7 @@ import (
30
30
"sync"
31
31
32
32
"go.uber.org/fx"
33
+ "go.uber.org/multierr"
33
34
34
35
"go.temporal.io/server/common/cluster"
35
36
"go.temporal.io/server/common/config"
@@ -95,8 +96,10 @@ func (s *ServerImpl) Start() error {
95
96
s .logger .Info ("Starting server for services" , tag .Value (s .so .serviceNames ))
96
97
s .logger .Debug (s .so .config .String ())
97
98
99
+ ctx := context .TODO ()
100
+
98
101
if err := initSystemNamespaces (
99
- context . TODO () ,
102
+ ctx ,
100
103
& s .persistenceConfig ,
101
104
s .clusterMetadata .CurrentClusterName ,
102
105
s .so .persistenceServiceResolver ,
@@ -107,30 +110,22 @@ func (s *ServerImpl) Start() error {
107
110
return fmt .Errorf ("unable to initialize system namespace: %w" , err )
108
111
}
109
112
110
- var wg sync.WaitGroup
111
- for _ , svcMeta := range s .servicesMetadata {
112
- wg .Add (1 )
113
- go func (svcMeta * ServicesMetadata ) {
114
- timeoutCtx , cancelFunc := context .WithTimeout (context .Background (), serviceStartTimeout )
115
- defer cancelFunc ()
116
- svcMeta .App .Start (timeoutCtx )
117
- wg .Done ()
118
- }(svcMeta )
113
+ if err := s .startServices (); err != nil {
114
+ return err
119
115
}
120
- wg .Wait ()
121
116
122
117
if s .so .blockingStart {
123
118
// If s.so.interruptCh is nil this will wait forever.
124
119
interruptSignal := <- s .so .interruptCh
125
120
s .logger .Info ("Received interrupt signal, stopping the server." , tag .Value (interruptSignal ))
126
- s .Stop ()
121
+ return s .Stop ()
127
122
}
128
123
129
124
return nil
130
125
}
131
126
132
127
// Stop stops the server.
133
- func (s * ServerImpl ) Stop () {
128
+ func (s * ServerImpl ) Stop () error {
134
129
var wg sync.WaitGroup
135
130
wg .Add (len (s .servicesMetadata ))
136
131
close (s .stoppedCh )
@@ -147,6 +142,38 @@ func (s *ServerImpl) Stop() {
147
142
if s .so .metricHandler != nil {
148
143
s .so .metricHandler .Stop (s .logger )
149
144
}
145
+ return nil
146
+ }
147
+
148
+ func (s * ServerImpl ) startServices () error {
149
+ ctx , cancel := context .WithTimeout (context .Background (), serviceStartTimeout )
150
+ defer cancel ()
151
+ results := make (chan startServiceResult , len (s .servicesMetadata ))
152
+ for _ , svcMeta := range s .servicesMetadata {
153
+ go func (svcMeta * ServicesMetadata ) {
154
+ err := svcMeta .App .Start (ctx )
155
+ results <- startServiceResult {
156
+ svc : svcMeta ,
157
+ err : err ,
158
+ }
159
+ }(svcMeta )
160
+ }
161
+ return s .readResults (results )
162
+ }
163
+
164
+ func (s * ServerImpl ) readResults (results chan startServiceResult ) (err error ) {
165
+ for i := 0 ; i < len (s .servicesMetadata ); i ++ {
166
+ r := <- results
167
+ if r .err != nil {
168
+ err = multierr .Combine (err , fmt .Errorf ("failed to start service %v: %w" , r .svc .ServiceName , r .err ))
169
+ }
170
+ }
171
+ return
172
+ }
173
+
174
+ type startServiceResult struct {
175
+ svc * ServicesMetadata
176
+ err error
150
177
}
151
178
152
179
func initSystemNamespaces (
0 commit comments