Skip to content

Commit 9c6603d

Browse files
authored
GODRIVER-2335 Preemptively cancel in progress operations when SDAM heartbeats timeout. (#1549)
1 parent 556e2f2 commit 9c6603d

22 files changed

+1572
-83
lines changed

event/monitoring.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,9 @@ type PoolEvent struct {
120120
Reason string `json:"reason"`
121121
// ServiceID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field
122122
// can be used to distinguish between individual servers in a load balanced deployment.
123-
ServiceID *primitive.ObjectID `json:"serviceId"`
124-
Error error `json:"error"`
123+
ServiceID *primitive.ObjectID `json:"serviceId"`
124+
Interruption bool `json:"interruptInUseConnections"`
125+
Error error `json:"error"`
125126
}
126127

127128
// PoolMonitor is a function that allows the user to gain access to events occurring in the pool

internal/eventtest/eventtest.go

+8
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,11 @@ func (tpm *TestPoolMonitor) IsPoolCleared() bool {
7676
})
7777
return len(poolClearedEvents) > 0
7878
}
79+
80+
// Interruptions returns the number of interruptions in the events recorded by the testPoolMonitor.
81+
func (tpm *TestPoolMonitor) Interruptions() int {
82+
interruptions := tpm.Events(func(evt *event.PoolEvent) bool {
83+
return evt.Interruption
84+
})
85+
return len(interruptions)
86+
}

mongo/integration/unified/client_entity.go

+2
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,8 @@ func setClientOptionsFromURIOptions(clientOpts *options.ClientOptions, uriOpts b
571571
switch strings.ToLower(key) {
572572
case "appname":
573573
clientOpts.SetAppName(value.(string))
574+
case "connecttimeoutms":
575+
clientOpts.SetConnectTimeout(time.Duration(value.(int32)) * time.Millisecond)
574576
case "heartbeatfrequencyms":
575577
clientOpts.SetHeartbeatInterval(time.Duration(value.(int32)) * time.Millisecond)
576578
case "loadbalanced":

mongo/integration/unified/entity.go

+69
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,70 @@ func newCollectionEntityOptions(id string, databaseID string, collectionName str
112112
return options
113113
}
114114

115+
type task struct {
116+
name string
117+
execute func() error
118+
}
119+
120+
type backgroundRoutine struct {
121+
tasks chan *task
122+
wg sync.WaitGroup
123+
err error
124+
}
125+
126+
func (b *backgroundRoutine) start() {
127+
b.wg.Add(1)
128+
129+
go func() {
130+
defer b.wg.Done()
131+
132+
for t := range b.tasks {
133+
if b.err != nil {
134+
continue
135+
}
136+
137+
ch := make(chan error)
138+
go func(task *task) {
139+
ch <- task.execute()
140+
}(t)
141+
select {
142+
case err := <-ch:
143+
if err != nil {
144+
b.err = fmt.Errorf("error running operation %s: %v", t.name, err)
145+
}
146+
case <-time.After(10 * time.Second):
147+
b.err = fmt.Errorf("timed out after 10 seconds")
148+
}
149+
}
150+
}()
151+
}
152+
153+
func (b *backgroundRoutine) stop() error {
154+
close(b.tasks)
155+
b.wg.Wait()
156+
return b.err
157+
}
158+
159+
func (b *backgroundRoutine) addTask(name string, execute func() error) bool {
160+
select {
161+
case b.tasks <- &task{
162+
name: name,
163+
execute: execute,
164+
}:
165+
return true
166+
default:
167+
return false
168+
}
169+
}
170+
171+
func newBackgroundRoutine() *backgroundRoutine {
172+
routine := &backgroundRoutine{
173+
tasks: make(chan *task, 10),
174+
}
175+
176+
return routine
177+
}
178+
115179
type clientEncryptionOpts struct {
116180
KeyVaultClient string `bson:"keyVaultClient"`
117181
KeyVaultNamespace string `bson:"keyVaultNamespace"`
@@ -136,6 +200,7 @@ type EntityMap struct {
136200
successValues map[string]int32
137201
iterationValues map[string]int32
138202
clientEncryptionEntities map[string]*mongo.ClientEncryption
203+
routinesMap sync.Map // maps thread name to *backgroundRoutine
139204
evtLock sync.Mutex
140205
closed atomic.Value
141206
// keyVaultClientIDs tracks IDs of clients used as a keyVaultClient in ClientEncryption objects.
@@ -283,6 +348,10 @@ func (em *EntityMap) addEntity(ctx context.Context, entityType string, entityOpt
283348
err = em.addCollectionEntity(entityOptions)
284349
case "session":
285350
err = em.addSessionEntity(entityOptions)
351+
case "thread":
352+
routine := newBackgroundRoutine()
353+
em.routinesMap.Store(entityOptions.ID, routine)
354+
routine.start()
286355
case "bucket":
287356
err = em.addGridFSBucketEntity(entityOptions)
288357
case "clientEncryption":

mongo/integration/unified/event_verification.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ type cmapEvent struct {
6161
ConnectionCheckedInEvent *struct{} `bson:"connectionCheckedInEvent"`
6262

6363
PoolClearedEvent *struct {
64-
HasServiceID *bool `bson:"hasServiceId"`
64+
HasServiceID *bool `bson:"hasServiceId"`
65+
InterruptInUseConnections *bool `bson:"interruptInUseConnections"`
6566
} `bson:"poolClearedEvent"`
6667
}
6768

@@ -361,6 +362,10 @@ func verifyCMAPEvents(client *clientEntity, expectedEvents *expectedEvents) erro
361362
return newEventVerificationError(idx, client, "error verifying serviceID: %v", err)
362363
}
363364
}
365+
if expectInterruption := evt.PoolClearedEvent.InterruptInUseConnections; expectInterruption != nil && *expectInterruption != actual.Interruption {
366+
return newEventVerificationError(idx, client, "expected interruptInUseConnections %v, got %v",
367+
expectInterruption, actual.Interruption)
368+
}
364369
default:
365370
return newEventVerificationError(idx, client, "no expected event set on cmapEvent instance")
366371
}

mongo/integration/unified/testrunner_operation.go

+30-5
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,10 @@ func (lp *loopArgs) iterationsStored() bool {
5050
return lp.IterationsEntityID != ""
5151
}
5252

53-
func executeTestRunnerOperation(ctx context.Context, operation *operation, loopDone <-chan struct{}) error {
54-
args := operation.Arguments
53+
func executeTestRunnerOperation(ctx context.Context, op *operation, loopDone <-chan struct{}) error {
54+
args := op.Arguments
5555

56-
switch operation.Name {
56+
switch op.Name {
5757
case "failPoint":
5858
clientID := lookupString(args, "client")
5959
client, err := entities(ctx).client(clientID)
@@ -187,9 +187,34 @@ func executeTestRunnerOperation(ctx context.Context, operation *operation, loopD
187187
}
188188
}
189189
return nil
190+
case "runOnThread":
191+
operationRaw, err := args.LookupErr("operation")
192+
if err != nil {
193+
return fmt.Errorf("'operation' argument not found in runOnThread operation")
194+
}
195+
threadOp := new(operation)
196+
if err := operationRaw.Unmarshal(threadOp); err != nil {
197+
return fmt.Errorf("error unmarshaling 'operation' argument: %v", err)
198+
}
199+
thread := lookupString(args, "thread")
200+
routine, ok := entities(ctx).routinesMap.Load(thread)
201+
if !ok {
202+
return fmt.Errorf("run on unknown thread: %s", thread)
203+
}
204+
routine.(*backgroundRoutine).addTask(threadOp.Name, func() error {
205+
return threadOp.execute(ctx, loopDone)
206+
})
207+
return nil
208+
case "waitForThread":
209+
thread := lookupString(args, "thread")
210+
routine, ok := entities(ctx).routinesMap.Load(thread)
211+
if !ok {
212+
return fmt.Errorf("wait for unknown thread: %s", thread)
213+
}
214+
return routine.(*backgroundRoutine).stop()
190215
case "waitForEvent":
191216
var wfeArgs waitForEventArguments
192-
if err := bson.Unmarshal(operation.Arguments, &wfeArgs); err != nil {
217+
if err := bson.Unmarshal(op.Arguments, &wfeArgs); err != nil {
193218
return fmt.Errorf("error unmarshalling event to waitForEventArguments: %v", err)
194219
}
195220

@@ -198,7 +223,7 @@ func executeTestRunnerOperation(ctx context.Context, operation *operation, loopD
198223

199224
return waitForEvent(wfeCtx, wfeArgs)
200225
default:
201-
return fmt.Errorf("unrecognized testRunner operation %q", operation.Name)
226+
return fmt.Errorf("unrecognized testRunner operation %q", op.Name)
202227
}
203228
}
204229

mongo/integration/unified/unified_spec_runner.go

+6
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ var (
2828
// the "find" and one for the "getMore", but we send three for both.
2929
"A successful find event with a getmore and the server kills the cursor (<= 4.4)": "See GODRIVER-1773",
3030

31+
// GODRIVER-2577: The following spec tests require canceling ops immediately, but the current logic clears pools
32+
// and cancels in-progress ops after two the heartbeat failures.
33+
"Connection pool clear uses interruptInUseConnections=true after monitor timeout": "Godriver clears after multiple timeout",
34+
"Error returned from connection pool clear with interruptInUseConnections=true is retryable": "Godriver clears after multiple timeout",
35+
"Error returned from connection pool clear with interruptInUseConnections=true is retryable for write": "Godriver clears after multiple timeout",
36+
3137
// TODO(GODRIVER-2843): Fix and unskip these test cases.
3238
"Find operation with snapshot": "Test fails frequently. See GODRIVER-2843",
3339
"Write commands with snapshot session do not affect snapshot reads": "Test fails frequently. See GODRIVER-2843",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
{
2+
"version": 1,
3+
"style": "unit",
4+
"description": "Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)",
5+
"poolOptions": {
6+
"backgroundThreadIntervalMS": 10000
7+
},
8+
"operations": [
9+
{
10+
"name": "ready"
11+
},
12+
{
13+
"name": "checkOut"
14+
},
15+
{
16+
"name": "checkOut",
17+
"label": "conn"
18+
},
19+
{
20+
"name": "clear",
21+
"interruptInUseConnections": true
22+
},
23+
{
24+
"name": "waitForEvent",
25+
"event": "ConnectionPoolCleared",
26+
"count": 1,
27+
"timeout": 1000
28+
},
29+
{
30+
"name": "waitForEvent",
31+
"event": "ConnectionClosed",
32+
"count": 2,
33+
"timeout": 1000
34+
},
35+
{
36+
"name": "close"
37+
}
38+
],
39+
"events": [
40+
{
41+
"type": "ConnectionCheckedOut",
42+
"connectionId": 1,
43+
"address": 42
44+
},
45+
{
46+
"type": "ConnectionCheckedOut",
47+
"connectionId": 2,
48+
"address": 42
49+
},
50+
{
51+
"type": "ConnectionPoolCleared",
52+
"interruptInUseConnections": true
53+
},
54+
{
55+
"type": "ConnectionClosed",
56+
"reason": "stale",
57+
"address": 42
58+
},
59+
{
60+
"type": "ConnectionClosed",
61+
"reason": "stale",
62+
"address": 42
63+
},
64+
{
65+
"type": "ConnectionPoolClosed",
66+
"address": 42
67+
}
68+
],
69+
"ignore": [
70+
"ConnectionCreated",
71+
"ConnectionPoolReady",
72+
"ConnectionReady",
73+
"ConnectionCheckOutStarted",
74+
"ConnectionPoolCreated",
75+
"ConnectionCheckedIn"
76+
]
77+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
version: 1
2+
style: unit
3+
description: Connections MUST be interrupted as soon as possible (interruptInUseConnections=true)
4+
poolOptions:
5+
# ensure it's not involved by default
6+
backgroundThreadIntervalMS: 10000
7+
operations:
8+
- name: ready
9+
- name: checkOut
10+
- name: checkOut
11+
label: conn
12+
- name: clear
13+
interruptInUseConnections: true
14+
- name: waitForEvent
15+
event: ConnectionPoolCleared
16+
count: 1
17+
timeout: 1000
18+
- name: waitForEvent
19+
event: ConnectionClosed
20+
count: 2
21+
timeout: 1000
22+
- name: close
23+
events:
24+
- type: ConnectionCheckedOut
25+
connectionId: 1
26+
address: 42
27+
- type: ConnectionCheckedOut
28+
connectionId: 2
29+
address: 42
30+
- type: ConnectionPoolCleared
31+
interruptInUseConnections: true
32+
- type: ConnectionClosed
33+
reason: stale
34+
address: 42
35+
- type: ConnectionClosed
36+
reason: stale
37+
address: 42
38+
- type: ConnectionPoolClosed
39+
address: 42
40+
ignore:
41+
- ConnectionCreated
42+
- ConnectionPoolReady
43+
- ConnectionReady
44+
- ConnectionCheckOutStarted
45+
- ConnectionPoolCreated
46+
- ConnectionCheckedIn

0 commit comments

Comments
 (0)