-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Better handle request with same group id #498
Conversation
if (!encoders.isEmpty()) | ||
{ | ||
signaler.signalNow(originId, routedId, initialId, traceId, SIGNAL_NEXT_REQUEST, 0); | ||
} | ||
} | ||
|
||
private void onSynGroupRebalance( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
onSyncGroupRebalance
.../java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientConnectionPool.java
Show resolved
Hide resolved
@@ -909,6 +951,9 @@ private void onSignal( | |||
case SIGNAL_STREAM_INITIAL_RESET: | |||
doStreamReset(traceId); | |||
break; | |||
case SIGNAL_STREAM_REPLY_WINDOW: | |||
doStreamWindow(traceId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not a doXXX
method as it doesn't write a frame for stream.
Please rename doStreamWindow
to onStreamSignalReplyWindow
.
Also, given that the signals are used to defer something that happened, it might be better to name them for where they originated, rather than how you want them to react.
For example, SIGNAL_STREAM_BEGIN
(triggered from onStreamBegin
) and then the reaction can either be doStreamBegin
directly or doStreamBegin
called from onStreamSignalBegin
.
{ | ||
KafkaClientStream stream = streamsByInitialIds.get(s); | ||
stream.cleanup(traceId); | ||
streamsByInitialIds.remove(s); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to remove this one by one, or perhaps this instead?
...
streams.forEach(s -> streamsByInitialIds.get(s).cleanup(traceId));
streamsByInitialIds.clear();
...
Please rename streamsByInitialIds
to streamsByInitialId
.
streamsByInitialIds.remove(streamId); | ||
|
||
KafkaClientStream stream = streamsByInitialIds.get(streamId); | ||
if (stream.replyAck == stream.replySeq) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we not always set stream.replyAck = stream.replySeq
so that a partial stream cannot block progress of connection window?
} | ||
} | ||
|
||
private void doStreamWindow( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets rename this to flushStreamWindow
to avoid confusion with doStreamWindow(traceId, authorization)
which sends an initial window frame on the stream.
|
||
final long traceId = begin.traceId(); | ||
final OctetsFW extension = begin.extension(); | ||
final ProxyBeginExFW proxyBeginEx = extension.get(proxyBeginExRO::tryWrap); | ||
|
||
String host = null; | ||
int port = 0; | ||
|
||
if (proxyBeginEx != null) | ||
{ | ||
final ProxyAddressInetFW inet = proxyBeginEx.address().inet(); | ||
host = inet.destination().asString(); | ||
port = inet.destinationPort(); | ||
} | ||
|
||
connection.doConnectionBegin(traceId, host, port); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implementing the behavior here seems inconsistent with the other frame types.
Perhaps we can move it to onStreamBegin
instead?
Description
The following bugs have been identified while trying to make mqtt pub request with the same client id
Due to the wrong decoder assignment we may get into a bad state
Not being able to handle unknown member id in heartbeat response
Fixes #500