Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve mqtt-kafka to use only one kafka consumer group per mqtt client. #727

Merged
merged 68 commits into from
Jan 11, 2024
Merged

Improve mqtt-kafka to use only one kafka consumer group per mqtt client. #727

merged 68 commits into from
Jan 11, 2024

Conversation

akrambek
Copy link
Contributor

Description

Improve mqtt-kafka to use only one kafka consumer group per mqtt client.

akrambek and others added 30 commits October 25, 2023 13:35
…artial data frame while computing crc32c value
@@ -529,6 +529,8 @@ public void writeProduceEntryStart(
MutableInteger position,
long timestamp,
long ownerId,
long produceId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
long produceId,
long producerId,

entryInfo.putInt(6 * Long.BYTES + Integer.BYTES, 0x00);
entryInfo.putInt(6 * Long.BYTES + 2 * Integer.BYTES, NO_DELTA_POSITION);
entryInfo.putShort(6 * Long.BYTES + 3 * Integer.BYTES, ackMode.value());
entryInfo.putLong(4 * Long.BYTES, produceId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
entryInfo.putLong(4 * Long.BYTES, produceId);
entryInfo.putLong(4 * Long.BYTES, producerId);

factories.put(KafkaBeginExFW.KIND_OFFSET_FETCH, cacheOffsetFetchFactory);
factories.put(KafkaBeginExFW.KIND_INIT_PRODUCE_ID, cacheInitProduceIdFactory);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INIT_PRODUCER_ID

@@ -96,6 +96,8 @@ public final class KafkaCacheClientProduceFactory implements BindingHandler
new Array32FW.Builder<>(new KafkaHeaderFW.Builder(), new KafkaHeaderFW())
.wrap(new UnsafeBuffer(new byte[8]), 0, 8)
.build();
private static final long PRODUCE_FLUSH_PRODUCE_ID = -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static final long PRODUCE_FLUSH_PRODUCE_ID = -1;
private static final long PRODUCE_FLUSH_PRODUCER_ID = -1;

@@ -96,6 +96,8 @@ public final class KafkaCacheClientProduceFactory implements BindingHandler
new Array32FW.Builder<>(new KafkaHeaderFW.Builder(), new KafkaHeaderFW())
.wrap(new UnsafeBuffer(new byte[8]), 0, 8)
.build();
private static final long PRODUCE_FLUSH_PRODUCE_ID = -1;
private static final short PRODUCE_FLUSH_PRODUCE_EPOCH = -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static final short PRODUCE_FLUSH_PRODUCE_EPOCH = -1;
private static final short PRODUCE_FLUSH_PRODUCER_EPOCH = -1;

return limit;
}

private final class KafkaInitProduceIdStream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final class KafkaInitProduceIdStream
private final class KafkaInitProducerIdStream

}
}

private final class KafkaInitProduceIdClient extends KafkaSaslClient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final class KafkaInitProduceIdClient extends KafkaSaslClient
private final class KafkaInitProducerIdClient extends KafkaSaslClient

Comment on lines 1184 to 1188
final InitProduceIdRequestFW initProduceIdRequest =
initProduceIdRequestRW.wrap(encodeBuffer, encodeProgress, encodeLimit)
.producerId(produceId)
.producerEpoch(produceEpoch)
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final InitProduceIdRequestFW initProduceIdRequest =
initProduceIdRequestRW.wrap(encodeBuffer, encodeProgress, encodeLimit)
.producerId(produceId)
.producerEpoch(produceEpoch)
.build();
final InitProducerIdRequestFW initProducerIdRequest =
initProducerIdRequestRW.wrap(encodeBuffer, encodeProgress, encodeLimit)
.producerId(produceId)
.producerEpoch(produceEpoch)
.build();

signaler.signalNow(originId, routedId, initialId, traceId, SIGNAL_NEXT_REQUEST, 0);
}

private void onDecodeInitProduceIdResponse(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private void onDecodeInitProduceIdResponse(
private void onDecodeInitProducerIdResponse(

@@ -185,6 +186,7 @@ scope kafka
case 253: kafka::stream::KafkaGroupBeginEx group;
case 254: kafka::stream::KafkaBootstrapBeginEx bootstrap;
case 255: kafka::stream::KafkaMergedBeginEx merged;
case 22: kafka::stream::KafkaInitProduceIdBeginEx initProduceId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
case 22: kafka::stream::KafkaInitProduceIdBeginEx initProduceId;
case 22: kafka::stream::KafkaInitProducerIdBeginEx initProducerId;


KafkaInitProduceIdBeginExFW initProduceIdBeginEx = beginEx.initProduceId();
KafkaInitProducerIdBeginExFW initProduceIdBeginEx = beginEx.initProducerId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
KafkaInitProducerIdBeginExFW initProduceIdBeginEx = beginEx.initProducerId();
KafkaInitProducerIdBeginExFW initProducerIdBeginEx = beginEx.initProducerId();

@jfallows jfallows merged commit 5b05c1f into aklivity:feature/mqtt-kafka Jan 11, 2024
0 of 3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants