-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Updates the processor to close all potential stop chans #674
Conversation
Some streaming services, like the pagination one, use a different stop chan for each call, but the actual implementation would only close the last one, which would lead to go routine leaks. Cancels dedis/cothority#2437
@ineiti sonar cloud makes it fail for no reason. The PR is ready otherwise. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some changes needed for correctly handling the channels in the processor.
require.Nil(t, p.RegisterStreamingHandler(h)) | ||
|
||
n := 5 | ||
buf, err := protobuf.Encode(&testMsg{int64(n)}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's already tested in the _Simple
test, so you can remove it here.
@@ -201,6 +201,57 @@ func TestServiceProcessor_ProcessClientRequest_Streaming(t *testing.T) { | |||
close(inputChan) | |||
} | |||
|
|||
func TestServiceProcessor_ProcessClientRequest_Streaming_Multiple(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for me to understand, please confirm or correct as necessary:
- this test sends multiple requests simulating a single client sending messages over the same connection
- do we need to test multiple clients sending multiple requests? Or is that handled in
websocket.go
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test sends multiple requests simulating a single client sending messages over the same connection
yes
do we need to test multiple clients sending multiple requests?
The ProcessClientStreamRequest
is mostly pure, except it uses the global p.handlers
(which is not thread-safe, but that's another story...) and p.Context.server.Suite()
so no, there isn't a huge need to check for multiple clients sending multiple requests, as it would be the same as calling multiple times the function.
// This goroutine listens on any new messages from the client and executes | ||
// the request. Executing the request should fill the service's channel, as | ||
// the service will use the same chanel for further requests. | ||
stopAll := make(chan struct{}) | ||
closing := sync.Mutex{} | ||
|
||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure that the channels are not correctly handled here. Some things to take into account:
- every call to
callInterfaceFunc
will create a newstopServiceChan
- I think the one from the previous call should be closed to indicate that the previous call is done now - there are some
return
s which will not close the channels
One possibility is to have only one select in the go-routine. But there are some initial conditions that are a bit awkward: stopServiceChan
and reply
need to be initialized as empty channels at the beginning, and before closing, the stopServiceChan
needs always to be tested in a select.
Anyway, I hope it reflects my proposition to close all past calls to callInterfaceFun
whenever a new input comes in. Also, it reduces the number of go-routines from 3 to 1. And it should correctly handle any errors that are printed and then return
ed.
Perhaps @tharvik has a better idea how to handle the initial conditions of stopServiceChan
and reply
...
go func(){
var stopServiceChan := make chan
var reply := make chan
defer func(){
close(stopServiceChan) // need to use select here
close(outChan)
}()
for {
select{
case buf, ok := <- clientInputs:
if !ok {
return
}
close(stopServiceChan) // that one might work w/o the select
reply, stopServiceChan, err = callInterfaceFunc...
if err != nil{
log.Err()
return
}
case out := <- reply: // I suppose that the reply channel will never be closed. I also suppose you'll find the reflect magic to have an actual channel ;)
outChan <- reply
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the one from the previous call should be closed to indicate that the previous call is done now
no, because it would prematurely close the outChan
, which is used among all calls.
there are some returns which will not close the channels
please elaborate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, because it would prematurely close the
outChan
, which is used among all calls.
In your code this is true. But in the select I propose above, this wouldn't happen anymore.
If you don't close the previous stopServiceChan
, every time a user sends a message through an existing connection you'll have one more callInterfaceFun
sending data. Which is probably not a problem for the paginate
service, as it stops after final block is sent anyway. But for a service like StreamingTransaction
, if the user sends a message to an existing connection, you'll have two go-routines sending block updates to the client.
So it looks cleaner and simpler to me to have only one callInterfaceFun
active at any one time by closing the stopServiceChan
.
please elaborate
Any of the return
s in the beginning of the for
-loop will just silently quit and not close the outChan
to indicate that the connection is done.
Tried for a while the solution proposed by @ineiti but without success. I would be happy to discuss more about the best way to refactor the |
SonarCloud Quality Gate failed.
|
For further discussion, see #675 |
Some streaming services, like the pagination one, use a different stop chan for each call, but the actual implementation would only close the last one, which would lead to go routine leaks.
Cancels dedis/cothority#2437
@ineiti @tharvik: that is related to dedis/cothority#2437. Based on your comments Linus I think the problem should be solved by onet, as I originally did but finally went with the cothority PR. The pagination service should be completely stateless, and onet should ensure it closes all the stop channels.