Skip to content

Commit a4c0365

Browse files
author
Davit Yeghshatyan
committed
Move ingester options to flags
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
1 parent 788dc1d commit a4c0365

File tree

4 files changed

+88
-89
lines changed

4 files changed

+88
-89
lines changed

cmd/ingester/app/builder/builder.go

+10-59
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@ package builder
1616

1717
import (
1818
"fmt"
19-
"strings"
20-
21-
"github.com/spf13/viper"
2219
"github.com/uber/jaeger-lib/metrics"
2320
"go.uber.org/zap"
2421

@@ -27,56 +24,19 @@ import (
2724
kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer"
2825
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
2926
"github.com/jaegertracing/jaeger/storage/spanstore"
27+
"github.com/jaegertracing/jaeger/cmd/ingester/app"
3028
)
3129

32-
const (
33-
// EncodingJSON indicates spans are encoded as a json byte array
34-
EncodingJSON = "json"
35-
// EncodingProto indicates spans are encoded as a protobuf byte array
36-
EncodingProto = "protobuf"
37-
38-
// ConfigPrefix is a prefix fro the ingester flags
39-
ConfigPrefix = "ingester"
40-
// SuffixBrokers is a suffix for the brokers flag
41-
SuffixBrokers = ".brokers"
42-
// SuffixTopic is a suffix for the topic flag
43-
SuffixTopic = ".topic"
44-
// SuffixGroupID is a suffix for the group-id flag
45-
SuffixGroupID = ".group-id"
46-
// SuffixParallelism is a suffix for the parallelism flag
47-
SuffixParallelism = ".parallelism"
48-
// SuffixEncoding is a suffix for the encoding flag
49-
SuffixEncoding = ".encoding"
50-
51-
// DefaultBroker is the default kafka broker
52-
DefaultBroker = "127.0.0.1:9092"
53-
// DefaultTopic is the default kafka topic
54-
DefaultTopic = "jaeger-spans"
55-
// DefaultGroupID is the default consumer Group ID
56-
DefaultGroupID = "jaeger-ingester"
57-
// DefaultParallelism is the default parallelism for the span processor
58-
DefaultParallelism = 1000
59-
// DefaultEncoding is the default span encoding
60-
DefaultEncoding = EncodingProto
61-
)
62-
63-
// Builder stores the configuration options for the Ingester
64-
type Builder struct {
65-
kafkaConsumer.Configuration
66-
Parallelism int
67-
Encoding string
68-
}
69-
7030
// CreateConsumer creates a new span consumer for the ingester
71-
func (b *Builder) CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWriter spanstore.Writer) (*consumer.Consumer, error) {
31+
func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWriter spanstore.Writer, options app.Options) (*consumer.Consumer, error) {
7232
var unmarshaller kafka.Unmarshaller
73-
if b.Encoding == EncodingJSON {
33+
if options.Encoding == app.EncodingJSON {
7434
unmarshaller = kafka.NewJSONUnmarshaller()
75-
} else if b.Encoding == EncodingProto {
35+
} else if options.Encoding == app.EncodingProto {
7636
unmarshaller = kafka.NewProtobufUnmarshaller()
7737
} else {
7838
return nil, fmt.Errorf(`encoding '%s' not recognised, use one of ("%s" or "%s")`,
79-
b.Encoding, EncodingProto, EncodingJSON)
39+
options.Encoding, app.EncodingProto, app.EncodingJSON)
8040
}
8141

8242
spParams := processor.SpanProcessorParams{
@@ -86,18 +46,18 @@ func (b *Builder) CreateConsumer(logger *zap.Logger, metricsFactory metrics.Fact
8646
spanProcessor := processor.NewSpanProcessor(spParams)
8747

8848
consumerConfig := kafkaConsumer.Configuration{
89-
Brokers: b.Brokers,
90-
Topic: b.Topic,
91-
GroupID: b.GroupID,
49+
Brokers: options.Brokers,
50+
Topic: options.Topic,
51+
GroupID: options.GroupID,
9252
}
9353
saramaConsumer, err := consumerConfig.NewConsumer()
9454
if err != nil {
9555
return nil, err
9656
}
9757

9858
factoryParams := consumer.ProcessorFactoryParams{
99-
Topic: b.Topic,
100-
Parallelism: b.Parallelism,
59+
Topic: options.Topic,
60+
Parallelism: options.Parallelism,
10161
SaramaConsumer: saramaConsumer,
10262
BaseProcessor: spanProcessor,
10363
Logger: logger,
@@ -116,12 +76,3 @@ func (b *Builder) CreateConsumer(logger *zap.Logger, metricsFactory metrics.Fact
11676
}
11777
return consumer.New(consumerParams)
11878
}
119-
120-
// InitFromViper initializes Builder with properties from viper
121-
func (b *Builder) InitFromViper(v *viper.Viper) {
122-
b.Brokers = strings.Split(v.GetString(ConfigPrefix+SuffixBrokers), ",")
123-
b.Topic = v.GetString(ConfigPrefix + SuffixTopic)
124-
b.GroupID = v.GetString(ConfigPrefix + SuffixGroupID)
125-
b.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
126-
b.Encoding = v.GetString(ConfigPrefix + SuffixEncoding)
127-
}

cmd/ingester/app/flags.go

+61-12
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,78 @@ import (
1919
"fmt"
2020
"strconv"
2121

22-
"github.com/jaegertracing/jaeger/cmd/ingester/app/builder"
22+
kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer"
23+
"strings"
24+
"github.com/spf13/viper"
2325
)
2426

27+
const (
28+
// EncodingJSON indicates spans are encoded as a json byte array
29+
EncodingJSON = "json"
30+
// EncodingProto indicates spans are encoded as a protobuf byte array
31+
EncodingProto = "protobuf"
32+
33+
// ConfigPrefix is a prefix fro the ingester flags
34+
ConfigPrefix = "ingester"
35+
// SuffixBrokers is a suffix for the brokers flag
36+
SuffixBrokers = ".brokers"
37+
// SuffixTopic is a suffix for the topic flag
38+
SuffixTopic = ".topic"
39+
// SuffixGroupID is a suffix for the group-id flag
40+
SuffixGroupID = ".group-id"
41+
// SuffixParallelism is a suffix for the parallelism flag
42+
SuffixParallelism = ".parallelism"
43+
// SuffixEncoding is a suffix for the encoding flag
44+
SuffixEncoding = ".encoding"
45+
46+
// DefaultBroker is the default kafka broker
47+
DefaultBroker = "127.0.0.1:9092"
48+
// DefaultTopic is the default kafka topic
49+
DefaultTopic = "jaeger-spans"
50+
// DefaultGroupID is the default consumer Group ID
51+
DefaultGroupID = "jaeger-ingester"
52+
// DefaultParallelism is the default parallelism for the span processor
53+
DefaultParallelism = 1000
54+
// DefaultEncoding is the default span encoding
55+
DefaultEncoding = EncodingProto
56+
)
57+
58+
// Options stores the configuration options for the Ingester
59+
type Options struct {
60+
kafkaConsumer.Configuration
61+
Parallelism int
62+
Encoding string
63+
}
64+
2565
// AddFlags adds flags for Builder
2666
func AddFlags(flagSet *flag.FlagSet) {
2767
flagSet.String(
28-
builder.ConfigPrefix+builder.SuffixBrokers,
29-
builder.DefaultBroker,
68+
ConfigPrefix+SuffixBrokers,
69+
DefaultBroker,
3070
"The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'")
3171
flagSet.String(
32-
builder.ConfigPrefix+builder.SuffixTopic,
33-
builder.DefaultTopic,
72+
ConfigPrefix+SuffixTopic,
73+
DefaultTopic,
3474
"The name of the kafka topic to consume from")
3575
flagSet.String(
36-
builder.ConfigPrefix+builder.SuffixGroupID,
37-
builder.DefaultGroupID,
76+
ConfigPrefix+SuffixGroupID,
77+
DefaultGroupID,
3878
"The Consumer Group that ingester will be consuming on behalf of")
3979
flagSet.String(
40-
builder.ConfigPrefix+builder.SuffixParallelism,
41-
strconv.Itoa(builder.DefaultParallelism),
80+
ConfigPrefix+SuffixParallelism,
81+
strconv.Itoa(DefaultParallelism),
4282
"The number of messages to process in parallel")
4383
flagSet.String(
44-
builder.ConfigPrefix+builder.SuffixEncoding,
45-
builder.DefaultEncoding,
46-
fmt.Sprintf(`The encoding of spans ("%s" or "%s") consumed from kafka`, builder.EncodingProto, builder.EncodingJSON))
84+
ConfigPrefix+SuffixEncoding,
85+
DefaultEncoding,
86+
fmt.Sprintf(`The encoding of spans ("%s" or "%s") consumed from kafka`, EncodingProto, EncodingJSON))
87+
}
88+
89+
// InitFromViper initializes Builder with properties from viper
90+
func (o *Options) InitFromViper(v *viper.Viper) {
91+
o.Brokers = strings.Split(v.GetString(ConfigPrefix+SuffixBrokers), ",")
92+
o.Topic = v.GetString(ConfigPrefix + SuffixTopic)
93+
o.GroupID = v.GetString(ConfigPrefix + SuffixGroupID)
94+
o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
95+
o.Encoding = v.GetString(ConfigPrefix + SuffixEncoding)
4796
}

cmd/ingester/app/flags_test.go

+14-15
Original file line numberDiff line numberDiff line change
@@ -19,37 +19,36 @@ import (
1919

2020
"github.com/stretchr/testify/assert"
2121

22-
"github.com/jaegertracing/jaeger/cmd/ingester/app/builder"
2322
"github.com/jaegertracing/jaeger/pkg/config"
2423
)
2524

2625
func TestOptionsWithFlags(t *testing.T) {
27-
b := &builder.Builder{}
26+
o := &Options{}
2827
v, command := config.Viperize(AddFlags)
2928
command.ParseFlags([]string{
3029
"--ingester.topic=topic1",
3130
"--ingester.brokers=127.0.0.1:9092,0.0.0:1234",
3231
"--ingester.group-id=group1",
3332
"--ingester.parallelism=5",
3433
"--ingester.encoding=json"})
35-
b.InitFromViper(v)
34+
o.InitFromViper(v)
3635

37-
assert.Equal(t, "topic1", b.Topic)
38-
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, b.Brokers)
39-
assert.Equal(t, "group1", b.GroupID)
40-
assert.Equal(t, 5, b.Parallelism)
41-
assert.Equal(t, builder.EncodingJSON, b.Encoding)
36+
assert.Equal(t, "topic1", o.Topic)
37+
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers)
38+
assert.Equal(t, "group1", o.GroupID)
39+
assert.Equal(t, 5, o.Parallelism)
40+
assert.Equal(t, EncodingJSON, o.Encoding)
4241
}
4342

4443
func TestFlagDefaults(t *testing.T) {
45-
b := &builder.Builder{}
44+
o := &Options{}
4645
v, command := config.Viperize(AddFlags)
4746
command.ParseFlags([]string{})
48-
b.InitFromViper(v)
47+
o.InitFromViper(v)
4948

50-
assert.Equal(t, builder.DefaultTopic, b.Topic)
51-
assert.Equal(t, []string{builder.DefaultBroker}, b.Brokers)
52-
assert.Equal(t, builder.DefaultGroupID, b.GroupID)
53-
assert.Equal(t, builder.DefaultParallelism, b.Parallelism)
54-
assert.Equal(t, builder.DefaultEncoding, b.Encoding)
49+
assert.Equal(t, DefaultTopic, o.Topic)
50+
assert.Equal(t, []string{DefaultBroker}, o.Brokers)
51+
assert.Equal(t, DefaultGroupID, o.GroupID)
52+
assert.Equal(t, DefaultParallelism, o.Parallelism)
53+
assert.Equal(t, DefaultEncoding, o.Encoding)
5554
}

cmd/ingester/main.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,9 @@ func main() {
8282
logger.Fatal("Failed to create span writer", zap.Error(err))
8383
}
8484

85-
b := builder.Builder{}
86-
b.InitFromViper(v)
87-
consumer, err := b.CreateConsumer(logger, metricsFactory, spanWriter)
85+
options := app.Options{}
86+
options.InitFromViper(v)
87+
consumer, err := builder.CreateConsumer(logger, metricsFactory, spanWriter, options)
8888
if err != nil {
8989
logger.Fatal("Unable to create consumer", zap.Error(err))
9090
}

0 commit comments

Comments
 (0)