Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qos2 idempotent producer #733

Merged
merged 44 commits into from
Feb 6, 2024

Conversation

bmaidics
Copy link
Contributor

No description provided.

@bmaidics bmaidics self-assigned this Jan 15, 2024
@bmaidics bmaidics changed the base branch from develop to feature/mqtt-kafka January 15, 2024 14:56
@bmaidics bmaidics marked this pull request as ready for review January 24, 2024 14:57
Copy link
Contributor

@jfallows jfallows left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cyclomatic complexity of the code has gone up quite a bit with these proposed changes.

MqttKafkaSessionFactory
  MqttSessionProxy
  -> KafkaOffsetFetchStream
  -> KafkaInitProducerStream
  -> KafkaOffsetCommitStream
     -> MqttKafkaPublishFactory.MqttPublishProxy
     -> MqttKafkaPublishFactory.KafkaProxy
  -> MqttKafkaPublishFactory.KafkaMessagesProxy
  -> MqttKafkaPublishFactory.KafkaRetainedProxy

MqttKafkaPublishFactory
  MqttPublishProxy
  -> MqttKafkaSessionFactory.MqttSessionProxy

In general, we don't want to reach across stream factories for behavior, even doing so for state should be kept to a minimum.

In this case, we had discussed a minor change to have session handle offset commit instead of publish, but this change seems to have more inter-stream behavioral dependencies than that.

Perhaps we should have a discussion about it to determine how best to simplify if possible.

@bmaidics bmaidics marked this pull request as draft January 31, 2024 17:42
Comment on lines +2016 to +2017
return new String16FW(BitUtil.toHex(offsetMetadata.buffer().byteArray(),
offsetMetadata.offset(), offsetMetadata.limit()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll want to optimize this at some point to avoid allocation during qos2 ack.

@jfallows jfallows marked this pull request as ready for review February 4, 2024 22:34
Copy link
Contributor

@jfallows jfallows left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks better!

jfallows
jfallows previously approved these changes Feb 6, 2024
@jfallows jfallows merged commit 3f33ed3 into aklivity:feature/mqtt-kafka Feb 6, 2024
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants