@@ -201,9 +201,13 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
201
201
logInfoWithMDC(" Stopping Artemis because stopping AMQP bridge" )
202
202
closeConsumer()
203
203
consumer = null
204
+ val closingSession = session
204
205
eventLoop.execute {
205
206
artemis(ArtemisState .STOPPING ) {
206
- stopSession()
207
+ stopSession(session)
208
+ if (session != closingSession) {
209
+ stopSession(closingSession)
210
+ }
207
211
session = null
208
212
ArtemisState .STOPPED
209
213
}
@@ -271,19 +275,28 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
271
275
logInfoWithMDC(" Stopping Artemis because AMQP bridge disconnected" )
272
276
closeConsumer()
273
277
consumer = null
278
+ val closingSession = session
274
279
eventLoop.execute {
275
- artemis(ArtemisState .STOPPING ) {
276
- stopSession()
277
- session = null
278
- when (precedingState) {
279
- ArtemisState .AMQP_STOPPED ->
280
- ArtemisState .STOPPED_AMQP_START_SCHEDULED (scheduledArtemis(artemisHeartbeatPlusBackoff,
281
- TimeUnit .MILLISECONDS , ArtemisState .AMQP_STARTING ) { startOutbound() })
282
- ArtemisState .AMQP_RESTARTED -> {
283
- artemis(ArtemisState .AMQP_STARTING ) { startOutbound() }
284
- ArtemisState .AMQP_STARTING
280
+ synchronized(artemis!! ) {
281
+ if (session == closingSession) {
282
+ artemis(ArtemisState .STOPPING ) {
283
+ stopSession(session)
284
+ session = null
285
+ when (precedingState) {
286
+ ArtemisState .AMQP_STOPPED ->
287
+ ArtemisState .STOPPED_AMQP_START_SCHEDULED (scheduledArtemis(artemisHeartbeatPlusBackoff,
288
+ TimeUnit .MILLISECONDS , ArtemisState .AMQP_STARTING ) { startOutbound() })
289
+
290
+ ArtemisState .AMQP_RESTARTED -> {
291
+ artemis(ArtemisState .AMQP_STARTING ) { startOutbound() }
292
+ ArtemisState .AMQP_STARTING
293
+ }
294
+
295
+ else -> ArtemisState .STOPPED
296
+ }
285
297
}
286
- else -> ArtemisState .STOPPED
298
+ } else {
299
+ stopSession(closingSession)
287
300
}
288
301
}
289
302
}
@@ -339,10 +352,10 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
339
352
}
340
353
}
341
354
342
- private fun stopSession (): Boolean {
355
+ private fun stopSession (localSession : ClientSession ? ): Boolean {
343
356
var stopped = false
344
357
try {
345
- session ?.apply {
358
+ localSession ?.apply {
346
359
if (! isClosed) {
347
360
stop()
348
361
}
@@ -356,7 +369,7 @@ open class AMQPBridgeManager(keyStore: CertificateStore,
356
369
}
357
370
358
371
private fun restartSession (): Boolean {
359
- if (! stopSession()) {
372
+ if (! stopSession(session )) {
360
373
// Session timed out stopping. The request/responses can be out of sequence on the session now, so abandon it.
361
374
session = null
362
375
// The consumer is also dead now too as attached to the dead session.
0 commit comments