Skip to content

Commit

Permalink
rpcsrv: properly cleanup WS reader in tests
Browse files Browse the repository at this point in the history
Close #3378.

Signed-off-by: Anna Shaleva <shaleva.ann@nspcc.ru>
  • Loading branch information
AnnaShaleva committed Apr 1, 2024
1 parent 00ac38b commit 1b072ea
Showing 1 changed file with 21 additions and 36 deletions.
57 changes: 21 additions & 36 deletions pkg/services/rpcsrv/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (

const testOverflow = false

func wsReader(t *testing.T, ws *websocket.Conn, msgCh chan<- []byte, isFinished *atomic.Bool) {
for {
func wsReader(t *testing.T, ws *websocket.Conn, msgCh chan<- []byte, isFinished *atomic.Bool, readerToExitCh chan struct{}) {
for isFinished.Load() {
err := ws.SetReadDeadline(time.Now().Add(time.Second))
if isFinished.Load() {
require.Error(t, err)
Expand All @@ -36,6 +36,7 @@ func wsReader(t *testing.T, ws *websocket.Conn, msgCh chan<- []byte, isFinished
require.NoError(t, err)
msgCh <- body
}
close(readerToExitCh)
}

func callWSGetRaw(t *testing.T, ws *websocket.Conn, msg string, respCh <-chan []byte) *neorpc.Response {
Expand All @@ -56,7 +57,7 @@ func getNotification(t *testing.T, respCh <-chan []byte) *neorpc.Notification {
return resp
}

func initCleanServerAndWSClient(t *testing.T) (*core.Blockchain, *Server, *websocket.Conn, chan []byte, *atomic.Bool) {
func initCleanServerAndWSClient(t *testing.T) (*core.Blockchain, *Server, *websocket.Conn, chan []byte) {
chain, rpcSrv, httpSrv := initClearServerWithInMemoryChain(t)

dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second}
Expand All @@ -69,8 +70,14 @@ func initCleanServerAndWSClient(t *testing.T) (*core.Blockchain, *Server, *webso
// responses from it.
respMsgs := make(chan []byte, 16)
finishedFlag := &atomic.Bool{}
go wsReader(t, ws, respMsgs, finishedFlag)
return chain, rpcSrv, ws, respMsgs, finishedFlag
readerToExitCh := make(chan struct{})
go wsReader(t, ws, respMsgs, finishedFlag, readerToExitCh)
t.Cleanup(func() {
finishedFlag.Store(true)
<-readerToExitCh
ws.Close()
})
return chain, rpcSrv, ws, respMsgs
}

func callSubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, params string) string {
Expand All @@ -95,8 +102,7 @@ func TestSubscriptions(t *testing.T) {
var subIDs = make([]string, 0)
var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event", "header_of_added_block"}

chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)

chain, rpcSrv, c, respMsgs := initCleanServerAndWSClient(t)
rpcSrv.coreServer.Start()
defer rpcSrv.coreServer.Shutdown()

Expand Down Expand Up @@ -156,8 +162,6 @@ func TestSubscriptions(t *testing.T) {
for _, id := range subIDs {
callUnsubscribe(t, c, respMsgs, id)
}
finishedFlag.CompareAndSwap(false, true)
c.Close()
}

func TestFilteredSubscriptions(t *testing.T) {
Expand Down Expand Up @@ -293,7 +297,7 @@ func TestFilteredSubscriptions(t *testing.T) {

for name, this := range cases {
t.Run(name, func(t *testing.T) {
chain, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
chain, _, c, respMsgs := initCleanServerAndWSClient(t)

// It's used as an end-of-event-stream, so it's always present.
blockSubID := callSubscribe(t, c, respMsgs, `["block_added"]`)
Expand All @@ -320,8 +324,6 @@ func TestFilteredSubscriptions(t *testing.T) {

callUnsubscribe(t, c, respMsgs, subID)
callUnsubscribe(t, c, respMsgs, blockSubID)
finishedFlag.CompareAndSwap(false, true)
c.Close()
})
}
}
Expand Down Expand Up @@ -389,7 +391,7 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) {
},
}

chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
chain, rpcSrv, c, respMsgs := initCleanServerAndWSClient(t)
rpcSrv.coreServer.Start()

// blocks are needed to make GAS deposit for priv0
Expand Down Expand Up @@ -421,15 +423,13 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) {
callUnsubscribe(t, c, respMsgs, subID)
})
}
finishedFlag.CompareAndSwap(false, true)
c.Close()
}

func TestFilteredBlockSubscriptions(t *testing.T) {
// We can't fit this into TestFilteredSubscriptions, because it uses
// blocks as EOF events to wait for.
const numBlocks = 10
chain, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
chain, _, c, respMsgs := initCleanServerAndWSClient(t)

blockSubID := callSubscribe(t, c, respMsgs, `["block_added", {"primary":3}]`)

Expand Down Expand Up @@ -458,13 +458,11 @@ func TestFilteredBlockSubscriptions(t *testing.T) {
require.Equal(t, 3, int(primary))
}
callUnsubscribe(t, c, respMsgs, blockSubID)
finishedFlag.CompareAndSwap(false, true)
c.Close()
}

func TestHeaderOfAddedBlockSubscriptions(t *testing.T) {
const numBlocks = 10
chain, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
chain, _, c, respMsgs := initCleanServerAndWSClient(t)

headerSubID := callSubscribe(t, c, respMsgs, `["header_of_added_block", {"primary":3}]`)

Expand Down Expand Up @@ -493,13 +491,11 @@ func TestHeaderOfAddedBlockSubscriptions(t *testing.T) {
require.Equal(t, 3, int(primary))
}
callUnsubscribe(t, c, respMsgs, headerSubID)
finishedFlag.CompareAndSwap(false, true)
c.Close()
}

func TestMaxSubscriptions(t *testing.T) {
var subIDs = make([]string, 0)
_, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
_, _, c, respMsgs := initCleanServerAndWSClient(t)

for i := 0; i < maxFeeds+1; i++ {
var s string
Expand All @@ -518,9 +514,6 @@ func TestMaxSubscriptions(t *testing.T) {
require.Nil(t, resp.Result)
}
}

finishedFlag.CompareAndSwap(false, true)
c.Close()
}

func TestBadSubUnsub(t *testing.T) {
Expand All @@ -542,7 +535,7 @@ func TestBadSubUnsub(t *testing.T) {
"bad id": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": ["vasiliy"], "id": 1}`,
"not subscribed id": `{"jsonrpc": "2.0", "method": "unsubscribe", "params": ["7"], "id": 1}`,
}
_, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
_, _, c, respMsgs := initCleanServerAndWSClient(t)

testF := func(t *testing.T, cases map[string]string) func(t *testing.T) {
return func(t *testing.T) {
Expand All @@ -557,9 +550,6 @@ func TestBadSubUnsub(t *testing.T) {
}
t.Run("subscribe", testF(t, subCases))
t.Run("unsubscribe", testF(t, unsubCases))

finishedFlag.CompareAndSwap(false, true)
c.Close()
}

func doSomeWSRequest(t *testing.T, ws *websocket.Conn) {
Expand Down Expand Up @@ -625,7 +615,7 @@ func TestSubscriptionOverflow(t *testing.T) {
const blockCnt = notificationBufSize * 5
var receivedMiss bool

chain, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
chain, _, c, respMsgs := initCleanServerAndWSClient(t)

resp := callWSGetRaw(t, c, `{"jsonrpc": "2.0","method": "subscribe","params": ["block_added"],"id": 1}`, respMsgs)
require.Nil(t, resp.Error)
Expand All @@ -647,9 +637,6 @@ func TestSubscriptionOverflow(t *testing.T) {
require.Equal(t, true, receivedMiss)
// `Missed` is the last event and there is nothing afterwards.
require.Equal(t, 0, len(respMsgs))

finishedFlag.CompareAndSwap(false, true)
c.Close()
}

func TestFilteredSubscriptions_InvalidFilter(t *testing.T) {
Expand All @@ -663,7 +650,7 @@ func TestFilteredSubscriptions_InvalidFilter(t *testing.T) {
params: `["transaction_executed", {"state":"NOTHALT"}]`,
},
}
_, _, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
_, _, c, respMsgs := initCleanServerAndWSClient(t)

for name, this := range cases {
t.Run(name, func(t *testing.T) {
Expand All @@ -673,6 +660,4 @@ func TestFilteredSubscriptions_InvalidFilter(t *testing.T) {
require.Contains(t, resp.Error.Error(), neorpc.ErrInvalidSubscriptionFilter.Error())
})
}
finishedFlag.CompareAndSwap(false, true)
c.Close()
}

0 comments on commit 1b072ea

Please sign in to comment.