Skip to content

Commit 2fa6b82

Browse files
author
Davit Yeghshatyan
committed
Take internalConsumer as a parameter
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
1 parent 259107d commit 2fa6b82

File tree

4 files changed

+40
-38
lines changed

4 files changed

+40
-38
lines changed

cmd/ingester/app/consumer/consumer.go

+12-26
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,15 @@ import (
2828

2929
// Configuration stores the configurable options of a Consumer
3030
type Configuration struct {
31-
kafkaConsumer.Configuration
31+
Topic string
3232
Parallelism int
3333
}
3434

3535
// Params are the parameters of a Consumer
3636
type Params struct {
37-
BaseProcessor processor.SpanProcessor
38-
Config Configuration
37+
BaseProcessor processor.SpanProcessor
38+
InternalConsumer kafkaConsumer.Consumer
39+
Config Configuration
3940

4041
MetricsFactory metrics.Factory
4142
Logger *zap.Logger
@@ -55,36 +56,21 @@ type Consumer struct {
5556

5657
// New is a constructor for a Consumer
5758
func New(params Params) (*Consumer, error) {
58-
consumerConfig := kafkaConsumer.Configuration{
59-
Brokers: params.Config.Brokers,
60-
Topic: params.Config.Topic,
61-
GroupID: params.Config.GroupID,
62-
}
63-
saramaConsumer, err := consumerConfig.NewConsumer()
64-
if err != nil {
65-
return nil, err
66-
}
67-
6859
factoryParams := ProcessorFactoryParams{
69-
Topic: params.Config.Topic,
70-
Parallelism: params.Config.Parallelism,
71-
SaramaConsumer: saramaConsumer,
72-
BaseProcessor: params.BaseProcessor,
73-
Logger: params.Logger,
74-
Factory: params.MetricsFactory,
60+
Topic: params.Config.Topic,
61+
Parallelism: params.Config.Parallelism,
62+
InternalConsumer: params.InternalConsumer,
63+
BaseProcessor: params.BaseProcessor,
64+
Logger: params.Logger,
65+
Factory: params.MetricsFactory,
7566
}
76-
processorFactory, err := NewProcessorFactory(factoryParams)
77-
if err != nil {
78-
return nil, err
79-
}
80-
8167
return &Consumer{
68+
internalConsumer: params.InternalConsumer,
69+
processorFactory: NewProcessorFactory(factoryParams),
8270
metricsFactory: params.MetricsFactory,
8371
logger: params.Logger,
8472
close: make(chan struct{}, 1),
8573
isClosed: sync.WaitGroup{},
86-
internalConsumer: saramaConsumer,
87-
processorFactory: processorFactory,
8874
}, nil
8975
}
9076

cmd/ingester/app/consumer/processor_factory.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ import (
2828

2929
// ProcessorFactoryParams are the parameters of a ProcessorFactory
3030
type ProcessorFactoryParams struct {
31-
Parallelism int
32-
Topic string
33-
BaseProcessor processor.SpanProcessor
34-
SaramaConsumer consumer.Consumer
35-
Factory metrics.Factory
36-
Logger *zap.Logger
31+
Parallelism int
32+
Topic string
33+
BaseProcessor processor.SpanProcessor
34+
InternalConsumer consumer.Consumer
35+
Factory metrics.Factory
36+
Logger *zap.Logger
3737
}
3838

3939
// ProcessorFactory is a factory for creating startedProcessors
@@ -47,15 +47,15 @@ type ProcessorFactory struct {
4747
}
4848

4949
// NewProcessorFactory constructs a new ProcessorFactory
50-
func NewProcessorFactory(params ProcessorFactoryParams) (*ProcessorFactory, error) {
50+
func NewProcessorFactory(params ProcessorFactoryParams) *ProcessorFactory {
5151
return &ProcessorFactory{
5252
topic: params.Topic,
53-
consumer: params.SaramaConsumer,
53+
consumer: params.InternalConsumer,
5454
metricsFactory: params.Factory,
5555
logger: params.Logger,
5656
baseProcessor: params.BaseProcessor,
5757
parallelism: params.Parallelism,
58-
}, nil
58+
}
5959
}
6060

6161
func (c *ProcessorFactory) new(partition int32, minOffset int64) processor.SpanProcessor {

cmd/ingester/app/options.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222

2323
"github.com/spf13/viper"
2424

25-
"github.com/jaegertracing/jaeger/cmd/ingester/app/consumer"
25+
kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer"
2626
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
2727
)
2828

@@ -46,8 +46,9 @@ const (
4646

4747
// Options stores the configuration options for a Kafka consumer
4848
type Options struct {
49-
consumer.Configuration
49+
kafkaConsumer.Configuration
5050
Encoding string
51+
Parallelism int
5152
}
5253

5354
// AddFlags adds flags for Options

cmd/ingester/main.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/jaegertracing/jaeger/cmd/env"
3030
"github.com/jaegertracing/jaeger/cmd/flags"
3131
"github.com/jaegertracing/jaeger/cmd/ingester/app"
32+
kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer"
3233
"github.com/jaegertracing/jaeger/cmd/ingester/app/consumer"
3334
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
3435
"github.com/jaegertracing/jaeger/pkg/config"
@@ -92,11 +93,25 @@ func main() {
9293
}
9394
spanProcessor := processor.NewSpanProcessor(spParams)
9495

96+
consumerConfig := kafkaConsumer.Configuration{
97+
Brokers: options.Brokers,
98+
Topic: options.Topic,
99+
GroupID: options.GroupID,
100+
}
101+
saramaConsumer, err := consumerConfig.NewConsumer()
102+
if err != nil {
103+
logger.Fatal("Failed to create sarama consumer", zap.Error(err))
104+
}
105+
95106
consumerParams := consumer.Params{
96107
MetricsFactory: metricsFactory,
97108
Logger: logger,
98109
BaseProcessor: spanProcessor,
99-
Config: options.Configuration,
110+
InternalConsumer: saramaConsumer,
111+
Config: consumer.Configuration{
112+
Topic: options.Topic,
113+
Parallelism: options.Parallelism,
114+
},
100115
}
101116
spanConsumer, err := consumer.New(consumerParams)
102117
if err != nil {

0 commit comments

Comments
 (0)