From 211e354d3de68494403f5de4a7f3da9e5eb3e520 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Wed, 21 Feb 2024 12:15:13 +0300 Subject: [PATCH] network: fix server shutdown by waiting for goroutines to finish s.Shutdown() does not wait for all goroutines of the node server to finish normally just because the server exits without dependent goroutines awaiting. Which causes logs to attempt to write after the test has ended. The consequence of this bug fix is that corresponding tests are fixed. Close #2973 Close #2974 Signed-off-by: Ekaterina Pavlova --- pkg/network/server.go | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index 4dafe90131..5398f4327e 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -125,11 +125,14 @@ type ( // lastRequestedHeader contains a height of the last requested header. lastRequestedHeader atomic.Uint32 - register chan Peer - unregister chan peerDrop - handshake chan Peer - quit chan struct{} - relayFin chan struct{} + register chan Peer + unregister chan peerDrop + handshake chan Peer + quit chan struct{} + relayFin chan struct{} + runFin chan struct{} + broadcastTxFin chan struct{} + runProtoFin chan struct{} transactions chan *transaction.Transaction @@ -143,6 +146,8 @@ type ( started atomic.Bool // runLoopStarted indicates that the server's main run loop has started. runLoopStarted atomic.Bool + + txHandlerLoopWG sync.WaitGroup } peerDrop struct { @@ -185,6 +190,9 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy config: chain.GetConfig().ProtocolConfiguration, quit: make(chan struct{}), relayFin: make(chan struct{}), + runFin: make(chan struct{}), + broadcastTxFin: make(chan struct{}), + runProtoFin: make(chan struct{}), register: make(chan Peer), unregister: make(chan peerDrop), handshake: make(chan Peer), @@ -281,6 +289,7 @@ func (s *Server) Start() { s.initStaleMemPools() var txThreads = optimalNumOfThreads() + s.txHandlerLoopWG.Add(txThreads) for i := 0; i < txThreads; i++ { go s.txHandlerLoop() } @@ -321,7 +330,11 @@ func (s *Server) Shutdown() { s.notaryRequestPool.StopSubscriptions() } close(s.quit) + <-s.broadcastTxFin + <-s.runProtoFin <-s.relayFin + <-s.runFin + s.txHandlerLoopWG.Wait() _ = s.log.Sync() } @@ -440,6 +453,7 @@ func (s *Server) run() { s.runLoopStarted.Store(true) defer func() { s.runLoopStarted.Store(false) + close(s.runFin) }() defer addrTimer.Stop() defer peerTimer.Stop() @@ -539,6 +553,7 @@ func (s *Server) run() { // runProto is a goroutine that manages server-wide protocol events. func (s *Server) runProto() { + defer close(s.runProtoFin) pingTimer := time.NewTimer(s.PingInterval) for { prevHeight := s.chain.BlockHeight() @@ -1141,6 +1156,7 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error { } func (s *Server) txHandlerLoop() { + defer s.txHandlerLoopWG.Done() txloop: for { select { @@ -1657,6 +1673,7 @@ func (s *Server) broadcastTxLoop() { batchSize = 42 ) + defer close(s.broadcastTxFin) txs := make([]util.Uint256, 0, batchSize) var timer *time.Timer