Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zilla crashes with IllegalArgumentException: cannot accept missingValue when using defaultOffset: live #1051

Closed
vordimous opened this issue May 24, 2024 · 1 comment · Fixed by #1052
Assignees
Labels
bug Something isn't working

Comments

@vordimous
Copy link
Contributor

Describe the bug
Zilla crashes with IllegalArgumentException: cannot accept missingValue when using defaultOffset: live in the sse.kafka.fanout zilla example

To Reproduce
Steps to reproduce the behavior:

  1. Using the sse.kafka.fanout example
  2. add the defaultOffset: live config option to the kafka cache_server
      topics:
        - name: events
          defaultOffset: live
  1. start the zilla exmple
  2. Zilla crashes with the below error
org.agrona.concurrent.AgentTerminationException: java.lang.IllegalArgumentException: cannot accept missingValue
    at io.aklivity.zilla.runtime.engine@0.9.80/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:823)
    at org.agrona.core/org.agrona.concurrent.AgentRunner.doWork(AgentRunner.java:304)
    at org.agrona.core/org.agrona.concurrent.AgentRunner.workLoop(AgentRunner.java:296)
    at org.agrona.core/org.agrona.concurrent.AgentRunner.run(AgentRunner.java:162)
    at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.IllegalArgumentException: cannot accept missingValue
    at org.agrona.core/org.agrona.collections.Long2LongHashMap.putIfAbsent(Long2LongHashMap.java:271)
    at io.aklivity.zilla.runtime.binding.kafka@0.9.80/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaCacheBootstrapFactory$KafkaBootstrapStream.onPartitionLeaderReady(KafkaCacheBootstrapFactory.java:784)
    at io.aklivity.zilla.runtime.binding.kafka@0.9.80/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaCacheBootstrapFactory$KafkaBootstrapFetchStream.onFetchReplyBegin(KafkaCacheBootstrapFactory.java:1548)
    at io.aklivity.zilla.runtime.binding.kafka@0.9.80/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaCacheBootstrapFactory$KafkaBootstrapFetchStream.onFetchReply(KafkaCacheBootstrapFactory.java:1514)
    at io.aklivity.zilla.runtime.engine@0.9.80/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleDefaultReadReply(EngineWorker.java:1497)
    at io.aklivity.zilla.runtime.engine@0.9.80/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleReadReply(EngineWorker.java:1428)
    at io.aklivity.zilla.runtime.engine@0.9.80/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleRead(EngineWorker.java:1209)
    at io.aklivity.zilla.runtime.engine@0.9.80/io.aklivity.zilla.runtime.engine.internal.concurent.ManyToOneRingBuffer.read(ManyToOneRingBuffer.java:229)
    at io.aklivity.zilla.runtime.engine@0.9.80/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:817)
    ... 4 more
    Suppressed: java.lang.Exception: [engine/data#3]        [0x030300000000001c] streams=[consumeAt=0x00003930 (0x0000000000003930), produceAt=0x00003b60 (0x0000000000003b60)]
            at io.aklivity.zilla.runtime.engine@0.9.80/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:821)
            ... 4 more
@jfallows
Copy link
Contributor

Note that the fix for this bug will cause Zilla to start populating the cache from the most recent message in Kafka, but that does not impact the behavior of the sse-kafka mapping when there are multiple messages in the cache. It will still fetch older messages if they are already in the cache.

#885 may also be needed to fully meet the needs of the scenario, such that each new SSE stream would start only from the latest message.

However, if the id field from the most recently received SSE message is sent back by the SSE client via Last-Event-Id request header, then the message flow will pick up from where it left off without losing messages while disconnected, nor repeating messages already received.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants