Skip to content

Commit de0b049

Browse files
authored
Fix visibility processor panic on add after stop (#3830)
* add guard clause * cleanup * wait for in progress adds before shutdown * use RWLock * concurrency test * comments * test * define error * remove nil assignment * err name * test * tests * reuse future
1 parent c257d84 commit de0b049

File tree

2 files changed

+61
-5
lines changed

2 files changed

+61
-5
lines changed

common/persistence/visibility/store/elasticsearch/processor.go

+20-3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"errors"
3434
"fmt"
3535
"strings"
36+
"sync"
3637
"sync/atomic"
3738
"time"
3839

@@ -69,6 +70,7 @@ type (
6970
logger log.Logger
7071
metricsHandler metrics.Handler
7172
indexerConcurrency uint32
73+
shutdownLock sync.RWMutex
7274
}
7375

7476
// ProcessorConfig contains all configs for processor
@@ -97,6 +99,10 @@ const (
9799
visibilityProcessorName = "visibility-processor"
98100
)
99101

102+
var (
103+
errVisibilityShutdown = errors.New("visiblity processor was shut down")
104+
)
105+
100106
// NewProcessor create new processorImpl
101107
func NewProcessor(
102108
cfg *ProcessorConfig,
@@ -150,14 +156,15 @@ func (p *processorImpl) Stop() {
150156
return
151157
}
152158

159+
p.shutdownLock.Lock()
160+
defer p.shutdownLock.Unlock()
161+
153162
err := p.bulkProcessor.Stop()
154163
if err != nil {
155164
// This could happen if ES is down when we're trying to shut down the server.
156165
p.logger.Error("Unable to stop Elasticsearch processor.", tag.LifeCycleStopFailed, tag.Error(err))
157166
return
158167
}
159-
p.mapToAckFuture = nil
160-
p.bulkProcessor = nil
161168
}
162169

163170
func (p *processorImpl) hashFn(key interface{}) uint32 {
@@ -172,7 +179,17 @@ func (p *processorImpl) hashFn(key interface{}) uint32 {
172179

173180
// Add request to the bulk and return a future object which will receive ack signal when request is processed.
174181
func (p *processorImpl) Add(request *client.BulkableRequest, visibilityTaskKey string) *future.FutureImpl[bool] {
175-
newFuture := newAckFuture()
182+
newFuture := newAckFuture() // Create future first to measure impact of following RWLock on latency
183+
184+
p.shutdownLock.RLock()
185+
defer p.shutdownLock.RUnlock()
186+
187+
if atomic.LoadInt32(&p.status) == common.DaemonStatusStopped {
188+
p.logger.Warn("Rejecting ES request for visibility task key because processor has been shut down.", tag.Key(visibilityTaskKey), tag.ESDocID(request.ID), tag.Value(request.Doc))
189+
newFuture.future.Set(false, errVisibilityShutdown)
190+
return newFuture.future
191+
}
192+
176193
_, isDup, _ := p.mapToAckFuture.PutOrDo(visibilityTaskKey, newFuture, func(key interface{}, value interface{}) error {
177194
existingFuture, ok := value.(*ackFuture)
178195
if !ok {

common/persistence/visibility/store/elasticsearch/processor_test.go

+41-2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/olivere/elastic/v7"
3737
"github.com/stretchr/testify/suite"
3838

39+
"go.temporal.io/server/common"
3940
"go.temporal.io/server/common/collection"
4041
"go.temporal.io/server/common/dynamicconfig"
4142
"go.temporal.io/server/common/future"
@@ -89,6 +90,7 @@ func (s *processorSuite) SetupTest() {
8990
// esProcessor.Start mock
9091
s.esProcessor.mapToAckFuture = collection.NewShardedConcurrentTxMap(1024, s.esProcessor.hashFn)
9192
s.esProcessor.bulkProcessor = s.mockBulkProcessor
93+
s.esProcessor.status = common.DaemonStatusStarted
9294
}
9395

9496
func (s *processorSuite) TearDownTest() {
@@ -126,8 +128,8 @@ func (s *processorSuite) TestNewESProcessorAndStartStop() {
126128
s.NotNil(p.bulkProcessor)
127129

128130
p.Stop()
129-
s.Nil(p.mapToAckFuture)
130-
s.Nil(p.bulkProcessor)
131+
s.NotNil(p.mapToAckFuture)
132+
s.NotNil(p.bulkProcessor)
131133
}
132134

133135
func (s *processorSuite) TestAdd() {
@@ -219,6 +221,43 @@ func (s *processorSuite) TestAdd_ConcurrentAdd_Duplicates() {
219221
s.Equal(1, s.esProcessor.mapToAckFuture.Len(), "only one request should be in the bulk")
220222
}
221223

224+
func (s *processorSuite) TestAdd_ConcurrentAdd_Shutdown() {
225+
request := &client.BulkableRequest{}
226+
docsCount := 1000
227+
parallelFactor := 10
228+
futures := make([]future.Future[bool], docsCount)
229+
230+
s.mockBulkProcessor.EXPECT().Add(request).MaxTimes(docsCount + 2) // +2 for explicit adds before and after shutdown
231+
s.mockBulkProcessor.EXPECT().Stop().Return(nil).Times(1)
232+
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorWaitAddLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc).MaxTimes(docsCount + 2)
233+
234+
addBefore := s.esProcessor.Add(request, "test-key-before")
235+
236+
wg := sync.WaitGroup{}
237+
wg.Add(parallelFactor + 1) // +1 for separate shutdown goroutine
238+
for i := 0; i < parallelFactor; i++ {
239+
go func(i int) {
240+
for j := 0; j < docsCount/parallelFactor; j++ {
241+
futures[i*docsCount/parallelFactor+j] = s.esProcessor.Add(request, fmt.Sprintf("test-key-%d-%d", i, j))
242+
}
243+
wg.Done()
244+
}(i)
245+
}
246+
go func() {
247+
time.Sleep(1 * time.Millisecond) // slight delay so at least a few docs get added
248+
s.esProcessor.Stop()
249+
wg.Done()
250+
}()
251+
252+
wg.Wait()
253+
addAfter := s.esProcessor.Add(request, "test-key-after")
254+
255+
s.False(addBefore.Ready()) // first request should be in bulk
256+
s.True(addAfter.Ready()) // final request should be only error
257+
_, err := addAfter.Get(context.Background())
258+
s.ErrorIs(err, errVisibilityShutdown)
259+
}
260+
222261
func (s *processorSuite) TestBulkAfterAction_Ack() {
223262
version := int64(3)
224263
testKey := "testKey"

0 commit comments

Comments
 (0)