Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 90e43d2

Browse files
committedSep 11, 2019
Improve configuration parameters for Kafka #1359
Signed-off-by: chandresh-pancholi <chandreshpancholi007@gmail.com>
1 parent 464acd0 commit 90e43d2

File tree

3 files changed

+248
-11
lines changed

3 files changed

+248
-11
lines changed
 

‎pkg/kafka/producer/config.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,20 @@ type Builder interface {
2727

2828
// Configuration describes the configuration properties needed to create a Kafka producer
2929
type Configuration struct {
30-
Brokers []string
31-
ProtocolVersion string
30+
Brokers []string
31+
RequiredAcks sarama.RequiredAcks
32+
Compression sarama.CompressionCodec
33+
CompressionLevel int
34+
ProtocolVersion string
3235
auth.AuthenticationConfig
3336
}
3437

3538
// NewProducer creates a new asynchronous kafka producer
3639
func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) {
3740
saramaConfig := sarama.NewConfig()
41+
saramaConfig.Producer.RequiredAcks = c.RequiredAcks
42+
saramaConfig.Producer.Compression = c.Compression
43+
saramaConfig.Producer.CompressionLevel = c.CompressionLevel
3844
saramaConfig.Producer.Return.Successes = true
3945
c.AuthenticationConfig.SetConfiguration(saramaConfig)
4046
if len(c.ProtocolVersion) > 0 {

‎plugin/storage/kafka/options.go

+132-8
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ package kafka
1717
import (
1818
"flag"
1919
"fmt"
20+
"log"
2021
"strings"
2122

23+
"github.com/Shopify/sarama"
2224
"github.com/spf13/viper"
2325

2426
"github.com/jaegertracing/jaeger/pkg/kafka/auth"
@@ -33,19 +35,69 @@ const (
3335
// EncodingZipkinThrift is used for spans encoded as Zipkin Thrift.
3436
EncodingZipkinThrift = "zipkin-thrift"
3537

36-
configPrefix = "kafka.producer"
37-
suffixBrokers = ".brokers"
38-
suffixTopic = ".topic"
39-
suffixProtocolVersion = ".protocol-version"
40-
suffixEncoding = ".encoding"
41-
defaultBroker = "127.0.0.1:9092"
42-
defaultTopic = "jaeger-spans"
43-
defaultEncoding = EncodingProto
38+
configPrefix = "kafka.producer"
39+
suffixBrokers = ".brokers"
40+
suffixTopic = ".topic"
41+
suffixEncoding = ".encoding"
42+
suffixRequiredAcks = ".required-acks"
43+
suffixCompression = ".compression"
44+
suffixCompressionLevel = ".compression-level"
45+
suffixProtocolVersion = ".protocol-version"
46+
47+
defaultBroker = "127.0.0.1:9092"
48+
defaultTopic = "jaeger-spans"
49+
defaultEncoding = EncodingProto
50+
defaultRequiredAcks = "local"
51+
defaultCompression = "none"
52+
defaultCompressionLevel = 0
4453
)
4554

4655
var (
4756
// AllEncodings is a list of all supported encodings.
4857
AllEncodings = []string{EncodingJSON, EncodingProto, EncodingZipkinThrift}
58+
59+
//requiredAcks is mapping of sarama supported requiredAcks
60+
requiredAcks = map[string]sarama.RequiredAcks{
61+
"noack": sarama.NoResponse,
62+
"local": sarama.WaitForLocal,
63+
"all": sarama.WaitForAll,
64+
}
65+
66+
// compressionModes is a mapping of supported CompressionType to compressionCodec along with default, min, max compression level
67+
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-390%3A+Allow+fine-grained+configuration+for+compression
68+
compressionModes = map[string]struct {
69+
compressor sarama.CompressionCodec
70+
defaultCompressionLevel int
71+
minCompressionLevel int
72+
maxCompressionLevel int
73+
}{
74+
"none": {
75+
compressor: sarama.CompressionNone,
76+
defaultCompressionLevel: 0,
77+
},
78+
"gzip": {
79+
compressor: sarama.CompressionGZIP,
80+
defaultCompressionLevel: 6,
81+
minCompressionLevel: 1,
82+
maxCompressionLevel: 9,
83+
},
84+
"snappy": {
85+
compressor: sarama.CompressionSnappy,
86+
defaultCompressionLevel: 0,
87+
},
88+
"lz4": {
89+
compressor: sarama.CompressionLZ4,
90+
defaultCompressionLevel: 9,
91+
minCompressionLevel: 1,
92+
maxCompressionLevel: 17,
93+
},
94+
"zstd": {
95+
compressor: sarama.CompressionZSTD,
96+
defaultCompressionLevel: 3,
97+
minCompressionLevel: -131072,
98+
maxCompressionLevel: 22,
99+
},
100+
}
49101
)
50102

51103
// Options stores the configuration options for Kafka
@@ -74,15 +126,50 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
74126
defaultEncoding,
75127
fmt.Sprintf(`Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto),
76128
)
129+
flagSet.String(
130+
configPrefix+suffixRequiredAcks,
131+
defaultRequiredAcks,
132+
"(experimental) Required kafka broker acknowledgement. i.e. noack, local, all",
133+
)
134+
flagSet.String(
135+
configPrefix+suffixCompression,
136+
defaultCompression,
137+
"(experimental) Type of compression (none, gzip, snappy, lz4, zstd) to use on messages",
138+
)
139+
flagSet.Int(
140+
configPrefix+suffixCompressionLevel,
141+
defaultCompressionLevel,
142+
"(experimental) compression level to use on messages. gzip = 1-9 (default = 6), snappy = none, lz4 = 1-17 (default = 9), zstd = -131072 - 22 (default = 3)",
143+
)
77144
auth.AddFlags(configPrefix, flagSet)
78145
}
79146

80147
// InitFromViper initializes Options with properties from viper
81148
func (opt *Options) InitFromViper(v *viper.Viper) {
82149
authenticationOptions := auth.AuthenticationConfig{}
83150
authenticationOptions.InitFromViper(configPrefix, v)
151+
152+
requiredAcks, err := getRequiredAcks(v.GetString(configPrefix + suffixRequiredAcks))
153+
if err != nil {
154+
log.Fatal(err)
155+
}
156+
157+
compressionMode := strings.ToLower(v.GetString(configPrefix + suffixCompression))
158+
compressionModeCodec, err := getCompressionMode(compressionMode)
159+
if err != nil {
160+
log.Fatal(err)
161+
}
162+
163+
compressionLevel, err := getCompressionLevel(compressionMode, v.GetInt(configPrefix+suffixCompressionLevel))
164+
if err != nil {
165+
log.Fatal(err)
166+
}
167+
84168
opt.config = producer.Configuration{
85169
Brokers: strings.Split(stripWhiteSpace(v.GetString(configPrefix+suffixBrokers)), ","),
170+
RequiredAcks: requiredAcks,
171+
Compression: compressionModeCodec,
172+
CompressionLevel: compressionLevel,
86173
ProtocolVersion: v.GetString(configPrefix + suffixProtocolVersion),
87174
AuthenticationConfig: authenticationOptions,
88175
}
@@ -94,3 +181,40 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
94181
func stripWhiteSpace(str string) string {
95182
return strings.Replace(str, " ", "", -1)
96183
}
184+
185+
// getCompressionLevel to get compression level from compression type
186+
func getCompressionLevel(mode string, compressionLevel int) (int, error) {
187+
compressionModeData, ok := compressionModes[mode]
188+
if !ok {
189+
return 0, fmt.Errorf("cannot find compression mode for compressionMode %v", mode)
190+
}
191+
192+
if compressionLevel == defaultCompressionLevel {
193+
return compressionModeData.defaultCompressionLevel, nil
194+
}
195+
196+
if compressionModeData.minCompressionLevel > compressionLevel || compressionModeData.maxCompressionLevel < compressionLevel {
197+
return 0, fmt.Errorf("compression level %d for '%s' is not within valid range [%d, %d]", compressionLevel, mode, compressionModeData.minCompressionLevel, compressionModeData.maxCompressionLevel)
198+
}
199+
200+
return compressionLevel, nil
201+
}
202+
203+
//getCompressionMode maps input modes to sarama CompressionCodec
204+
func getCompressionMode(mode string) (sarama.CompressionCodec, error) {
205+
compressionMode, ok := compressionModes[mode]
206+
if !ok {
207+
return 0, fmt.Errorf("unknown compression mode: %v", mode)
208+
}
209+
210+
return compressionMode.compressor, nil
211+
}
212+
213+
//getRequiredAcks maps input ack values to sarama requiredAcks
214+
func getRequiredAcks(acks string) (sarama.RequiredAcks, error) {
215+
requiredAcks, ok := requiredAcks[strings.ToLower(acks)]
216+
if !ok {
217+
return 0, fmt.Errorf("unknown Required Ack: %s", acks)
218+
}
219+
return requiredAcks, nil
220+
}

‎plugin/storage/kafka/options_test.go

+108-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ package kafka
1717
import (
1818
"testing"
1919

20+
"github.com/Shopify/sarama"
2021
"github.com/stretchr/testify/assert"
22+
"github.com/stretchr/testify/require"
2123

2224
"github.com/jaegertracing/jaeger/pkg/config"
2325
)
@@ -28,12 +30,18 @@ func TestOptionsWithFlags(t *testing.T) {
2830
command.ParseFlags([]string{
2931
"--kafka.producer.topic=topic1",
3032
"--kafka.producer.brokers=127.0.0.1:9092, 0.0.0:1234",
31-
"--kafka.producer.encoding=protobuf"})
33+
"--kafka.producer.encoding=protobuf",
34+
"--kafka.producer.required-acks=local",
35+
"--kafka.producer.compression=gzip",
36+
"--kafka.producer.compression-level=7"})
3237
opts.InitFromViper(v)
3338

3439
assert.Equal(t, "topic1", opts.topic)
3540
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.config.Brokers)
3641
assert.Equal(t, "protobuf", opts.encoding)
42+
assert.Equal(t, sarama.WaitForLocal, opts.config.RequiredAcks)
43+
assert.Equal(t, sarama.CompressionGZIP, opts.config.Compression)
44+
assert.Equal(t, 7, opts.config.CompressionLevel)
3745
}
3846

3947
func TestFlagDefaults(t *testing.T) {
@@ -45,4 +53,103 @@ func TestFlagDefaults(t *testing.T) {
4553
assert.Equal(t, defaultTopic, opts.topic)
4654
assert.Equal(t, []string{defaultBroker}, opts.config.Brokers)
4755
assert.Equal(t, defaultEncoding, opts.encoding)
56+
assert.Equal(t, sarama.WaitForLocal, opts.config.RequiredAcks)
57+
assert.Equal(t, sarama.CompressionNone, opts.config.Compression)
58+
assert.Equal(t, 0, opts.config.CompressionLevel)
59+
}
60+
61+
func TestCompressionLevelDefaults(t *testing.T) {
62+
compressionLevel, err := getCompressionLevel("none", defaultCompressionLevel)
63+
require.NoError(t, err)
64+
assert.Equal(t, compressionModes["none"].defaultCompressionLevel, compressionLevel)
65+
66+
compressionLevel, err = getCompressionLevel("gzip", defaultCompressionLevel)
67+
require.NoError(t, err)
68+
assert.Equal(t, compressionModes["gzip"].defaultCompressionLevel, compressionLevel)
69+
70+
compressionLevel, err = getCompressionLevel("snappy", defaultCompressionLevel)
71+
require.NoError(t, err)
72+
assert.Equal(t, compressionModes["snappy"].defaultCompressionLevel, compressionLevel)
73+
74+
compressionLevel, err = getCompressionLevel("lz4", defaultCompressionLevel)
75+
require.NoError(t, err)
76+
assert.Equal(t, compressionModes["lz4"].defaultCompressionLevel, compressionLevel)
77+
78+
compressionLevel, err = getCompressionLevel("zstd", defaultCompressionLevel)
79+
require.NoError(t, err)
80+
assert.Equal(t, compressionModes["zstd"].defaultCompressionLevel, compressionLevel)
81+
}
82+
83+
func TestCompressionLevel(t *testing.T) {
84+
compressionLevel, err := getCompressionLevel("none", 0)
85+
require.NoError(t, err)
86+
assert.Equal(t, compressionModes["none"].defaultCompressionLevel, compressionLevel)
87+
88+
compressionLevel, err = getCompressionLevel("gzip", 4)
89+
require.NoError(t, err)
90+
assert.Equal(t, 4, compressionLevel)
91+
92+
compressionLevel, err = getCompressionLevel("snappy", 0)
93+
require.NoError(t, err)
94+
assert.Equal(t, compressionModes["snappy"].defaultCompressionLevel, compressionLevel)
95+
96+
compressionLevel, err = getCompressionLevel("lz4", 10)
97+
require.NoError(t, err)
98+
assert.Equal(t, 10, compressionLevel)
99+
100+
compressionLevel, err = getCompressionLevel("zstd", 20)
101+
require.NoError(t, err)
102+
assert.Equal(t, 20, compressionLevel)
103+
}
104+
105+
func TestFailedCompressionLevelScenario(t *testing.T) {
106+
_, err := getCompressionLevel("gzip", 14)
107+
assert.Error(t, err)
108+
109+
_, err = getCompressionLevel("lz4", 18)
110+
assert.Error(t, err)
111+
112+
_, err = getCompressionLevel("zstd", 25)
113+
assert.Error(t, err)
114+
115+
_, err = getCompressionLevel("test", 1)
116+
assert.Error(t, err)
117+
}
118+
119+
func TestCompressionModes(t *testing.T) {
120+
compressionModes, err := getCompressionMode("gzip")
121+
require.NoError(t, err)
122+
assert.Equal(t, sarama.CompressionGZIP, compressionModes)
123+
124+
compressionModes, err = getCompressionMode("snappy")
125+
require.NoError(t, err)
126+
assert.Equal(t, sarama.CompressionSnappy, compressionModes)
127+
128+
compressionModes, err = getCompressionMode("none")
129+
require.NoError(t, err)
130+
assert.Equal(t, sarama.CompressionNone, compressionModes)
131+
}
132+
133+
func TestCompressionModeFailures(t *testing.T) {
134+
_, err := getCompressionMode("test")
135+
assert.Error(t, err)
136+
}
137+
138+
func TestRequiredAcks(t *testing.T) {
139+
acks, err := getRequiredAcks("noack")
140+
require.NoError(t, err)
141+
assert.Equal(t, sarama.NoResponse, acks)
142+
143+
acks, err = getRequiredAcks("local")
144+
require.NoError(t, err)
145+
assert.Equal(t, sarama.WaitForLocal, acks)
146+
147+
acks, err = getRequiredAcks("all")
148+
require.NoError(t, err)
149+
assert.Equal(t, sarama.WaitForAll, acks)
150+
}
151+
152+
func TestRequiredAcksFailures(t *testing.T) {
153+
_, err := getRequiredAcks("test")
154+
assert.Error(t, err)
48155
}

0 commit comments

Comments
 (0)
Please sign in to comment.