Skip to content

Commit 486beb0

Browse files
author
Davit Yeghshatyan
committed
Move ingester options to flags
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
1 parent 788dc1d commit 486beb0

File tree

4 files changed

+89
-88
lines changed

4 files changed

+89
-88
lines changed

cmd/ingester/app/builder/builder.go

+10-58
Original file line numberDiff line numberDiff line change
@@ -16,67 +16,28 @@ package builder
1616

1717
import (
1818
"fmt"
19-
"strings"
2019

21-
"github.com/spf13/viper"
2220
"github.com/uber/jaeger-lib/metrics"
2321
"go.uber.org/zap"
2422

23+
"github.com/jaegertracing/jaeger/cmd/ingester/app"
2524
"github.com/jaegertracing/jaeger/cmd/ingester/app/consumer"
2625
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
2726
kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer"
2827
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
2928
"github.com/jaegertracing/jaeger/storage/spanstore"
3029
)
3130

32-
const (
33-
// EncodingJSON indicates spans are encoded as a json byte array
34-
EncodingJSON = "json"
35-
// EncodingProto indicates spans are encoded as a protobuf byte array
36-
EncodingProto = "protobuf"
37-
38-
// ConfigPrefix is a prefix fro the ingester flags
39-
ConfigPrefix = "ingester"
40-
// SuffixBrokers is a suffix for the brokers flag
41-
SuffixBrokers = ".brokers"
42-
// SuffixTopic is a suffix for the topic flag
43-
SuffixTopic = ".topic"
44-
// SuffixGroupID is a suffix for the group-id flag
45-
SuffixGroupID = ".group-id"
46-
// SuffixParallelism is a suffix for the parallelism flag
47-
SuffixParallelism = ".parallelism"
48-
// SuffixEncoding is a suffix for the encoding flag
49-
SuffixEncoding = ".encoding"
50-
51-
// DefaultBroker is the default kafka broker
52-
DefaultBroker = "127.0.0.1:9092"
53-
// DefaultTopic is the default kafka topic
54-
DefaultTopic = "jaeger-spans"
55-
// DefaultGroupID is the default consumer Group ID
56-
DefaultGroupID = "jaeger-ingester"
57-
// DefaultParallelism is the default parallelism for the span processor
58-
DefaultParallelism = 1000
59-
// DefaultEncoding is the default span encoding
60-
DefaultEncoding = EncodingProto
61-
)
62-
63-
// Builder stores the configuration options for the Ingester
64-
type Builder struct {
65-
kafkaConsumer.Configuration
66-
Parallelism int
67-
Encoding string
68-
}
69-
7031
// CreateConsumer creates a new span consumer for the ingester
71-
func (b *Builder) CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWriter spanstore.Writer) (*consumer.Consumer, error) {
32+
func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWriter spanstore.Writer, options app.Options) (*consumer.Consumer, error) {
7233
var unmarshaller kafka.Unmarshaller
73-
if b.Encoding == EncodingJSON {
34+
if options.Encoding == app.EncodingJSON {
7435
unmarshaller = kafka.NewJSONUnmarshaller()
75-
} else if b.Encoding == EncodingProto {
36+
} else if options.Encoding == app.EncodingProto {
7637
unmarshaller = kafka.NewProtobufUnmarshaller()
7738
} else {
7839
return nil, fmt.Errorf(`encoding '%s' not recognised, use one of ("%s" or "%s")`,
79-
b.Encoding, EncodingProto, EncodingJSON)
40+
options.Encoding, app.EncodingProto, app.EncodingJSON)
8041
}
8142

8243
spParams := processor.SpanProcessorParams{
@@ -86,18 +47,18 @@ func (b *Builder) CreateConsumer(logger *zap.Logger, metricsFactory metrics.Fact
8647
spanProcessor := processor.NewSpanProcessor(spParams)
8748

8849
consumerConfig := kafkaConsumer.Configuration{
89-
Brokers: b.Brokers,
90-
Topic: b.Topic,
91-
GroupID: b.GroupID,
50+
Brokers: options.Brokers,
51+
Topic: options.Topic,
52+
GroupID: options.GroupID,
9253
}
9354
saramaConsumer, err := consumerConfig.NewConsumer()
9455
if err != nil {
9556
return nil, err
9657
}
9758

9859
factoryParams := consumer.ProcessorFactoryParams{
99-
Topic: b.Topic,
100-
Parallelism: b.Parallelism,
60+
Topic: options.Topic,
61+
Parallelism: options.Parallelism,
10162
SaramaConsumer: saramaConsumer,
10263
BaseProcessor: spanProcessor,
10364
Logger: logger,
@@ -116,12 +77,3 @@ func (b *Builder) CreateConsumer(logger *zap.Logger, metricsFactory metrics.Fact
11677
}
11778
return consumer.New(consumerParams)
11879
}
119-
120-
// InitFromViper initializes Builder with properties from viper
121-
func (b *Builder) InitFromViper(v *viper.Viper) {
122-
b.Brokers = strings.Split(v.GetString(ConfigPrefix+SuffixBrokers), ",")
123-
b.Topic = v.GetString(ConfigPrefix + SuffixTopic)
124-
b.GroupID = v.GetString(ConfigPrefix + SuffixGroupID)
125-
b.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
126-
b.Encoding = v.GetString(ConfigPrefix + SuffixEncoding)
127-
}

cmd/ingester/app/flags.go

+62-12
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,80 @@ import (
1818
"flag"
1919
"fmt"
2020
"strconv"
21+
"strings"
2122

22-
"github.com/jaegertracing/jaeger/cmd/ingester/app/builder"
23+
"github.com/spf13/viper"
24+
25+
kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer"
26+
)
27+
28+
const (
29+
// EncodingJSON indicates spans are encoded as a json byte array
30+
EncodingJSON = "json"
31+
// EncodingProto indicates spans are encoded as a protobuf byte array
32+
EncodingProto = "protobuf"
33+
34+
// ConfigPrefix is a prefix fro the ingester flags
35+
ConfigPrefix = "ingester"
36+
// SuffixBrokers is a suffix for the brokers flag
37+
SuffixBrokers = ".brokers"
38+
// SuffixTopic is a suffix for the topic flag
39+
SuffixTopic = ".topic"
40+
// SuffixGroupID is a suffix for the group-id flag
41+
SuffixGroupID = ".group-id"
42+
// SuffixParallelism is a suffix for the parallelism flag
43+
SuffixParallelism = ".parallelism"
44+
// SuffixEncoding is a suffix for the encoding flag
45+
SuffixEncoding = ".encoding"
46+
47+
// DefaultBroker is the default kafka broker
48+
DefaultBroker = "127.0.0.1:9092"
49+
// DefaultTopic is the default kafka topic
50+
DefaultTopic = "jaeger-spans"
51+
// DefaultGroupID is the default consumer Group ID
52+
DefaultGroupID = "jaeger-ingester"
53+
// DefaultParallelism is the default parallelism for the span processor
54+
DefaultParallelism = 1000
55+
// DefaultEncoding is the default span encoding
56+
DefaultEncoding = EncodingProto
2357
)
2458

59+
// Options stores the configuration options for the Ingester
60+
type Options struct {
61+
kafkaConsumer.Configuration
62+
Parallelism int
63+
Encoding string
64+
}
65+
2566
// AddFlags adds flags for Builder
2667
func AddFlags(flagSet *flag.FlagSet) {
2768
flagSet.String(
28-
builder.ConfigPrefix+builder.SuffixBrokers,
29-
builder.DefaultBroker,
69+
ConfigPrefix+SuffixBrokers,
70+
DefaultBroker,
3071
"The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'")
3172
flagSet.String(
32-
builder.ConfigPrefix+builder.SuffixTopic,
33-
builder.DefaultTopic,
73+
ConfigPrefix+SuffixTopic,
74+
DefaultTopic,
3475
"The name of the kafka topic to consume from")
3576
flagSet.String(
36-
builder.ConfigPrefix+builder.SuffixGroupID,
37-
builder.DefaultGroupID,
77+
ConfigPrefix+SuffixGroupID,
78+
DefaultGroupID,
3879
"The Consumer Group that ingester will be consuming on behalf of")
3980
flagSet.String(
40-
builder.ConfigPrefix+builder.SuffixParallelism,
41-
strconv.Itoa(builder.DefaultParallelism),
81+
ConfigPrefix+SuffixParallelism,
82+
strconv.Itoa(DefaultParallelism),
4283
"The number of messages to process in parallel")
4384
flagSet.String(
44-
builder.ConfigPrefix+builder.SuffixEncoding,
45-
builder.DefaultEncoding,
46-
fmt.Sprintf(`The encoding of spans ("%s" or "%s") consumed from kafka`, builder.EncodingProto, builder.EncodingJSON))
85+
ConfigPrefix+SuffixEncoding,
86+
DefaultEncoding,
87+
fmt.Sprintf(`The encoding of spans ("%s" or "%s") consumed from kafka`, EncodingProto, EncodingJSON))
88+
}
89+
90+
// InitFromViper initializes Builder with properties from viper
91+
func (o *Options) InitFromViper(v *viper.Viper) {
92+
o.Brokers = strings.Split(v.GetString(ConfigPrefix+SuffixBrokers), ",")
93+
o.Topic = v.GetString(ConfigPrefix + SuffixTopic)
94+
o.GroupID = v.GetString(ConfigPrefix + SuffixGroupID)
95+
o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
96+
o.Encoding = v.GetString(ConfigPrefix + SuffixEncoding)
4797
}

cmd/ingester/app/flags_test.go

+14-15
Original file line numberDiff line numberDiff line change
@@ -19,37 +19,36 @@ import (
1919

2020
"github.com/stretchr/testify/assert"
2121

22-
"github.com/jaegertracing/jaeger/cmd/ingester/app/builder"
2322
"github.com/jaegertracing/jaeger/pkg/config"
2423
)
2524

2625
func TestOptionsWithFlags(t *testing.T) {
27-
b := &builder.Builder{}
26+
o := &Options{}
2827
v, command := config.Viperize(AddFlags)
2928
command.ParseFlags([]string{
3029
"--ingester.topic=topic1",
3130
"--ingester.brokers=127.0.0.1:9092,0.0.0:1234",
3231
"--ingester.group-id=group1",
3332
"--ingester.parallelism=5",
3433
"--ingester.encoding=json"})
35-
b.InitFromViper(v)
34+
o.InitFromViper(v)
3635

37-
assert.Equal(t, "topic1", b.Topic)
38-
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, b.Brokers)
39-
assert.Equal(t, "group1", b.GroupID)
40-
assert.Equal(t, 5, b.Parallelism)
41-
assert.Equal(t, builder.EncodingJSON, b.Encoding)
36+
assert.Equal(t, "topic1", o.Topic)
37+
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers)
38+
assert.Equal(t, "group1", o.GroupID)
39+
assert.Equal(t, 5, o.Parallelism)
40+
assert.Equal(t, EncodingJSON, o.Encoding)
4241
}
4342

4443
func TestFlagDefaults(t *testing.T) {
45-
b := &builder.Builder{}
44+
o := &Options{}
4645
v, command := config.Viperize(AddFlags)
4746
command.ParseFlags([]string{})
48-
b.InitFromViper(v)
47+
o.InitFromViper(v)
4948

50-
assert.Equal(t, builder.DefaultTopic, b.Topic)
51-
assert.Equal(t, []string{builder.DefaultBroker}, b.Brokers)
52-
assert.Equal(t, builder.DefaultGroupID, b.GroupID)
53-
assert.Equal(t, builder.DefaultParallelism, b.Parallelism)
54-
assert.Equal(t, builder.DefaultEncoding, b.Encoding)
49+
assert.Equal(t, DefaultTopic, o.Topic)
50+
assert.Equal(t, []string{DefaultBroker}, o.Brokers)
51+
assert.Equal(t, DefaultGroupID, o.GroupID)
52+
assert.Equal(t, DefaultParallelism, o.Parallelism)
53+
assert.Equal(t, DefaultEncoding, o.Encoding)
5554
}

cmd/ingester/main.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,9 @@ func main() {
8282
logger.Fatal("Failed to create span writer", zap.Error(err))
8383
}
8484

85-
b := builder.Builder{}
86-
b.InitFromViper(v)
87-
consumer, err := b.CreateConsumer(logger, metricsFactory, spanWriter)
85+
options := app.Options{}
86+
options.InitFromViper(v)
87+
consumer, err := builder.CreateConsumer(logger, metricsFactory, spanWriter, options)
8888
if err != nil {
8989
logger.Fatal("Unable to create consumer", zap.Error(err))
9090
}

0 commit comments

Comments
 (0)