From 28eab683950a3e6e2d4f4495f9f5b5e40041763c Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Sun, 16 Feb 2025 17:01:55 +0530 Subject: [PATCH] fix: MQTT subscribe routing --- .../kafka/internal/stream/MqttKafkaSubscribeFactory.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java index aaf843df7b..3f231e451b 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java @@ -337,12 +337,12 @@ private MqttSubscribeProxy( this.messages = new Long2ObjectHashMap<>(); routes.forEach(r -> { - KafkaMessagesProxy messagesProxy = new KafkaMessagesProxy(originId, r, this); + KafkaMessagesProxy messagesProxy = new KafkaMessagesProxy(routedId, r, this); messages.put(r.order, messagesProxy); messagesPerTopicKey.put(messagesProxy.topicKey, r.order); }); final MqttKafkaRouteConfig retainedRoute = routes.get(0); - this.retained = new KafkaRetainedProxy(originId, retainedRoute.id, retainedRoute.retained, this); + this.retained = new KafkaRetainedProxy(routedId, retainedRoute.id, retainedRoute.retained, this); } private void onMqttMessage( @@ -533,7 +533,7 @@ private void onFiltersChanged( final long routeOrder = r.order; if (!messages.containsKey(routeOrder)) { - KafkaMessagesProxy messagesProxy = new KafkaMessagesProxy(originId, r, this); + KafkaMessagesProxy messagesProxy = new KafkaMessagesProxy(routedId, r, this); messages.put(routeOrder, messagesProxy); messagesPerTopicKey.put(messagesProxy.topicKey, r.order); messagesProxy.doKafkaBegin(traceId, authorization, 0, filters);