Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OTel Trace from kafka source with otel_trace_raw #5446

Open
Xsthr opened this issue Feb 20, 2025 · 4 comments
Open

OTel Trace from kafka source with otel_trace_raw #5446

Xsthr opened this issue Feb 20, 2025 · 4 comments
Labels
enhancement New feature or request

Comments

@Xsthr
Copy link

Xsthr commented Feb 20, 2025

I have set up tracing in the method Otelcol -> DataPrepper -> OpenSearch, where Kafka is used as a buffer in DataPrepper. This method works completely and there are no issues with it.
After that, I needed to remove the Kafka buffer and use Kafka as a trace source instead, where they are written using otelcol.
Configuration:

entry-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - kafka_host:9092
      topics:
        - name: test_trace_topic
          group_id: trace-test
      encryption:
        type: none 
  sink:
    - pipeline:
        name: "raw-trace-pipeline"
    - pipeline:
        name: "service-map-pipeline"

raw-trace-pipeline:
  source:
    pipeline:
      name: "entry-pipeline"
  buffer:
    bounded_blocking:
       buffer_size: 65536
       batch_size: 8
  processor:
    - otel_trace_raw:
    - otel_trace_group:
        hosts: ["http://OpenSearch_host"]
        insecure: true
        username: admin
        password: password
  sink:
    - opensearch:
        hosts: ["http://OpenSearch_host"]
        insecure: true
        username: admin
        password: password
        index_type: trace-analytics-raw

service-map-pipeline:
  delay: "100"
  source:
    pipeline:
      name: "entry-pipeline"
  buffer:
    bounded_blocking:
      buffer_size: 65536
      batch_size: 8
  processor:
    - otel_trace_raw:
    - otel_trace_group:
        hosts: ["http://OpenSearch_host"]
        insecure: true
        username: admin
        password: password
  sink:
    - opensearch:
        hosts: ["http://OpenSearch_host"]
        insecure: true
        username: admin
        password: password
        index_type: trace-analytics-raw

service-map-pipeline:
  delay: "100"
  source:
    pipeline:
      name: "entry-pipeline"
  buffer:
    bounded_blocking:
      buffer_size: 65536
      batch_size: 8
  processor:
    - service_map_stateful:
  sink:
    - opensearch:
        hosts: ["http://OpenSearch_host"]
        insecure: true
        username: admin
        password: password
        index_type: trace-analytics-service-map

At the execution stage of the
processor - otel_trace_raw,
DataPrepper reports processing errors related to parsing.
log of the error:

2025-02-20T15:56:09,339 [raw-trace-pipeline-processor-worker-3-thread-1] ERROR org.opensearch.dataprepper.pipeline.ProcessWorker - A processor threw an exception. This batch of Events will be dropped, and their EventHandles will be released: 
java.lang.ClassCastException: class org.opensearch.dataprepper.model.log.JacksonLog cannot be cast to class org.opensearch.dataprepper.model.trace.Span (org.opensearch.dataprepper.model.log.JacksonLog and org.opensearch.dataprepper.model.trace.Span are in unnamed module of loader 'app')
        at org.opensearch.dataprepper.plugins.processor.oteltrace.OTelTraceRawProcessor.doExecute(OTelTraceRawProcessor.java:89) ~[otel-trace-raw-processor-2.10.1.jar:?]
        at org.opensearch.dataprepper.model.processor.AbstractProcessor.lambda$execute$0(AbstractProcessor.java:54) ~[data-prepper-api-2.10.1.jar:?]
        at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:69) ~[micrometer-core-1.13.0.jar:1.13.0]
        at org.opensearch.dataprepper.model.processor.AbstractProcessor.execute(AbstractProcessor.java:54) ~[data-prepper-api-2.10.1.jar:?]
        at org.opensearch.dataprepper.peerforwarder.PeerForwardingProcessorDecorator.execute(PeerForwardingProcessorDecorator.java:103) ~[data-prepper-core-2.10.1.jar:?]
        at org.opensearch.dataprepper.pipeline.ProcessWorker.doRun(ProcessWorker.java:139) [data-prepper-core-2.10.1.jar:?]
        at org.opensearch.dataprepper.pipeline.ProcessWorker.run(ProcessWorker.java:61) [data-prepper-core-2.10.1.jar:?]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
        at java.base/java.lang.Thread.run(Thread.java:840) [?:?]
2025-02-20T15:56:09,426 [service-map-pipeline-processor-worker-5-thread-1] ERROR org.opensearch.dataprepper.pipeline.ProcessWorker - A processor threw an exception. This batch of Events will be dropped, and their EventHandles will be released: 
java.lang.ClassCastException: class org.opensearch.dataprepper.model.event.JacksonEvent cannot be cast to class org.opensearch.dataprepper.model.trace.Span (org.opensearch.dataprepper.model.event.JacksonEvent and org.opensearch.dataprepper.model.trace.Span are in unnamed module of loader 'app')
        at org.opensearch.dataprepper.plugins.processor.ServiceMapStatefulProcessor.lambda$doExecute$5(ServiceMapStatefulProcessor.java:152) ~[service-map-stateful-2.10.1.jar:?]
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) ~[?:?]
        at org.opensearch.dataprepper.plugins.processor.ServiceMapStatefulProcessor.doExecute(ServiceMapStatefulProcessor.java:152) ~[service-map-stateful-2.10.1.jar:?]
        at org.opensearch.dataprepper.model.processor.AbstractProcessor.lambda$execute$0(AbstractProcessor.java:54) ~[data-prepper-api-2.10.1.jar:?]
        at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:69) ~[micrometer-core-1.13.0.jar:1.13.0]
        at org.opensearch.dataprepper.model.processor.AbstractProcessor.execute(AbstractProcessor.java:54) ~[data-prepper-api-2.10.1.jar:?]
        at org.opensearch.dataprepper.peerforwarder.PeerForwardingProcessorDecorator.execute(PeerForwardingProcessorDecorator.java:103) ~[data-prepper-core-2.10.1.jar:?]
        at org.opensearch.dataprepper.pipeline.ProcessWorker.doRun(ProcessWorker.java:139) [data-prepper-core-2.10.1.jar:?]
        at org.opensearch.dataprepper.pipeline.ProcessWorker.run(ProcessWorker.java:61) [data-prepper-core-2.10.1.jar:?]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
        at java.base/java.lang.Thread.run(Thread.java:840) [?:?]

Is there a solution to this problem, because DataPrepper can work with data obtained from Kafka when it is used at the buffer stage, but it cannot work with the same data when it is obtained at the source stage? As a result, the situation looks like a bug.

Environment (please complete the following information):

  • OS: [Debian 12]
  • Version DataPrepper [2.10.1]
@dlvenable
Copy link
Member

dlvenable commented Feb 25, 2025

@Xsthr , The kafka source does not currently support OTel. We need to add support for reading this as a codec.

@kkondaka is working on providing this new feature.

@dlvenable dlvenable added enhancement New feature or request and removed bug Something isn't working untriaged labels Feb 25, 2025
@dlvenable
Copy link
Member

@Xsthr to support this we need to use the codec added in #5030 in the kafka source.

If you are interested in working on this, you could take it up.

@Xsthr
Copy link
Author

Xsthr commented Mar 12, 2025

@dlvenable The codec mentioned in #5030 is not a solution to this problem, as it uses the otel_logs codec, which does not address the issue of reading data from Kafka. The Processor: otel_traces cannot process the data because it expects an input of class org.opensearch.dataprepper.model.trace.Span, which is why the error persists. It is necessary to add a codec parameter like a codec: otel_trace_in_kafka_source when using source: kafka, so that the data is transformed from org.opensearch.dataprepper.model.log.JacksonLog to org.opensearch.dataprepper.model.trace.Span
can you please explain why there is no propblem with kafka_buffer and it doesn't work with kafka_source

@dlvenable dlvenable changed the title [BUG] Trace from kafka source not working with otel_trace_raw Trace from kafka source with otel_trace_raw Mar 13, 2025
@dlvenable dlvenable changed the title Trace from kafka source with otel_trace_raw OTel Trace from kafka source with otel_trace_raw Mar 13, 2025
@dlvenable
Copy link
Member

@Xsthr , The reason this works for Kafka buffer and not Kafka directly as a source is because the data was already processed by the otel_traces source before putting it into Kafka.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Development

No branches or pull requests

2 participants