Skip to content

Commit 35210fd

Browse files
author
Davit Yeghshatyan
committed
Improve span processor
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
1 parent 94479c7 commit 35210fd

File tree

2 files changed

+24
-16
lines changed

2 files changed

+24
-16
lines changed

cmd/ingester/app/processor/span_processor.go

+20-13
Original file line numberDiff line numberDiff line change
@@ -23,35 +23,42 @@ import (
2323
"github.com/jaegertracing/jaeger/storage/spanstore"
2424
)
2525

26-
//go:generate mockery -name=SpanProcessor
26+
//go:generate mockery -name=KafkaSpanProcessor
2727

2828
// SpanProcessor processes kafka spans
2929
type SpanProcessor interface {
3030
Process(input Message) error
3131
io.Closer
3232
}
3333

34-
type spanProcessor struct {
35-
unmarshaller kafka.Unmarshaller
36-
writer spanstore.Writer
37-
io.Closer
38-
}
39-
4034
// Message contains the fields of the kafka message that the span processor uses
4135
type Message interface {
4236
Value() []byte
4337
}
4438

45-
// NewSpanProcessor creates a new SpanProcessor
46-
func NewSpanProcessor(writer spanstore.Writer, unmarshaller kafka.Unmarshaller) SpanProcessor {
47-
return &spanProcessor{
48-
unmarshaller: unmarshaller,
49-
writer: writer,
39+
// SpanProcessorParams stores the necessary parameters for a SpanProcessor
40+
type SpanProcessorParams struct {
41+
Writer spanstore.Writer
42+
Unmarshaller kafka.Unmarshaller
43+
}
44+
45+
// KafkaSpanProcessor implements SpanProcessor for Kafka messages
46+
type KafkaSpanProcessor struct {
47+
unmarshaller kafka.Unmarshaller
48+
writer spanstore.Writer
49+
io.Closer
50+
}
51+
52+
// NewSpanProcessor creates a new KafkaSpanProcessor
53+
func NewSpanProcessor(params SpanProcessorParams) KafkaSpanProcessor {
54+
return KafkaSpanProcessor{
55+
unmarshaller: params.Unmarshaller,
56+
writer: params.Writer,
5057
}
5158
}
5259

5360
// Process unmarshals and writes a single kafka message
54-
func (s spanProcessor) Process(message Message) error {
61+
func (s KafkaSpanProcessor) Process(message Message) error {
5562
mSpan, err := s.unmarshaller.Unmarshal(message.Value())
5663
if err != nil {
5764
return errors.Wrap(err, "cannot unmarshall byte array into span")

cmd/ingester/app/processor/span_processor_test.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,14 @@ import (
2727
)
2828

2929
func TestNewSpanProcessor(t *testing.T) {
30-
assert.NotNil(t, NewSpanProcessor(&smocks.Writer{}, &umocks.Unmarshaller{}))
30+
p := SpanProcessorParams{}
31+
assert.NotNil(t, NewSpanProcessor(p))
3132
}
3233

3334
func TestSpanProcessor_Process(t *testing.T) {
3435
writer := &smocks.Writer{}
3536
unmarshallerMock := &umocks.Unmarshaller{}
36-
processor := &spanProcessor{
37+
processor := &KafkaSpanProcessor{
3738
unmarshaller: unmarshallerMock,
3839
writer: writer,
3940
}
@@ -55,7 +56,7 @@ func TestSpanProcessor_Process(t *testing.T) {
5556
func TestSpanProcessor_ProcessError(t *testing.T) {
5657
writer := &smocks.Writer{}
5758
unmarshallerMock := &umocks.Unmarshaller{}
58-
processor := &spanProcessor{
59+
processor := &KafkaSpanProcessor{
5960
unmarshaller: unmarshallerMock,
6061
writer: writer,
6162
}

0 commit comments

Comments
 (0)