Skip to content

Commit 9ffac74

Browse files
author
mhoffmann
committed
WIP AddFlags and InitFromViper
1 parent 89402a8 commit 9ffac74

File tree

10 files changed

+174
-159
lines changed

10 files changed

+174
-159
lines changed

cmd/ingester/app/flags.go

+7-76
Original file line numberDiff line numberDiff line change
@@ -32,47 +32,14 @@ const (
3232
ConfigPrefix = "ingester"
3333
// KafkaConsumerConfigPrefix is a prefix for the Kafka flags
3434
KafkaConsumerConfigPrefix = "kafka.consumer"
35-
// SuffixBrokers is a suffix for the brokers flag
36-
SuffixBrokers = ".brokers"
37-
// SuffixTopic is a suffix for the topic flag
38-
SuffixTopic = ".topic"
39-
// SuffixGroupID is a suffix for the group-id flag
40-
SuffixGroupID = ".group-id"
41-
// SuffixClientID is a suffix for the client-id flag
42-
SuffixClientID = ".client-id"
35+
4336
// SuffixEncoding is a suffix for the encoding flag
4437
SuffixEncoding = ".encoding"
45-
// SuffixTLS is a suffix for the tls flag
46-
SuffixTLS = ".tls"
47-
// SuffixCert is a suffix for the tls certificate path flag
48-
SuffixCert = ".tls.cert"
49-
// SuffixKey is a suffix for the tls key path flag
50-
SuffixKey = ".tls.key"
51-
// SuffixCA is a suffix for the tls ca path flag
52-
SuffixCA = ".tls.ca"
5338
// SuffixDeadlockInterval is a suffix for deadlock detecor flag
5439
SuffixDeadlockInterval = ".deadlockInterval"
5540
// SuffixParallelism is a suffix for the parallelism flag
5641
SuffixParallelism = ".parallelism"
57-
// SuffixHTTPPort is a suffix for the HTTP port
58-
SuffixHTTPPort = ".http-port"
5942

60-
// DefaultBroker is the default kafka broker
61-
DefaultBroker = "127.0.0.1:9092"
62-
// DefaultTopic is the default kafka topic
63-
DefaultTopic = "jaeger-spans"
64-
// DefaultGroupID is the default consumer Group ID
65-
DefaultGroupID = "jaeger-ingester"
66-
// DefaultClientID is the default consumer Client ID
67-
DefaultClientID = "jaeger-ingester"
68-
// DefaultTLS is the default for TLS enabled
69-
DefaultTLS = false
70-
// DefaultCAPath is the default for the TLS CA path
71-
DefaultCAPath = ""
72-
// DefaultCertPath is the default for the TLS Cert path
73-
DefaultCertPath = ""
74-
// DefaultKeyPath is the default for the TLS key path
75-
DefaultKeyPath = ""
7643
// DefaultParallelism is the default parallelism for the span processor
7744
DefaultParallelism = 1000
7845
// DefaultEncoding is the default span encoding
@@ -84,49 +51,21 @@ const (
8451
// Options stores the configuration options for the Ingester
8552
type Options struct {
8653
kafkaConsumer.Configuration
54+
8755
Parallelism int
8856
Encoding string
8957
DeadlockInterval time.Duration
9058
}
9159

9260
// AddFlags adds flags for Builder
9361
func AddFlags(flagSet *flag.FlagSet) {
94-
flagSet.String(
95-
KafkaConsumerConfigPrefix+SuffixBrokers,
96-
DefaultBroker,
97-
"The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'")
98-
flagSet.String(
99-
KafkaConsumerConfigPrefix+SuffixTopic,
100-
DefaultTopic,
101-
"The name of the kafka topic to consume from")
102-
flagSet.String(
103-
KafkaConsumerConfigPrefix+SuffixGroupID,
104-
DefaultGroupID,
105-
"The Consumer Group that ingester will be consuming on behalf of")
106-
flagSet.String(
107-
KafkaConsumerConfigPrefix+SuffixClientID,
108-
DefaultClientID,
109-
"The Consumer Client ID that ingester will use")
62+
kafkaConsumer.AddFlags(KafkaConsumerConfigPrefix, flagSet)
63+
11064
flagSet.String(
11165
KafkaConsumerConfigPrefix+SuffixEncoding,
11266
DefaultEncoding,
11367
fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join(kafka.AllEncodings, "\", \"")))
114-
flagSet.Bool(
115-
KafkaConsumerConfigPrefix+SuffixTLS,
116-
DefaultTLS,
117-
"Enable TLS for the Kafka connection")
118-
flagSet.String(
119-
KafkaConsumerConfigPrefix+SuffixCA,
120-
DefaultCAPath,
121-
"Path to the TLS CA for the Kafka connection")
122-
flagSet.String(
123-
KafkaConsumerConfigPrefix+SuffixCert,
124-
DefaultCertPath,
125-
"Path to the TLS Certificate for the Kafka connection")
126-
flagSet.String(
127-
KafkaConsumerConfigPrefix+SuffixKey,
128-
DefaultKeyPath,
129-
"Path to the TLS Key for the Kafka connection")
68+
13069
flagSet.String(
13170
ConfigPrefix+SuffixParallelism,
13271
strconv.Itoa(DefaultParallelism),
@@ -139,17 +78,9 @@ func AddFlags(flagSet *flag.FlagSet) {
13978

14079
// InitFromViper initializes Builder with properties from viper
14180
func (o *Options) InitFromViper(v *viper.Viper) {
142-
o.Brokers = strings.Split(stripWhiteSpace(v.GetString(KafkaConsumerConfigPrefix+SuffixBrokers)), ",")
143-
o.Topic = v.GetString(KafkaConsumerConfigPrefix + SuffixTopic)
144-
o.GroupID = v.GetString(KafkaConsumerConfigPrefix + SuffixGroupID)
145-
o.ClientID = v.GetString(KafkaConsumerConfigPrefix + SuffixClientID)
146-
o.Encoding = v.GetString(KafkaConsumerConfigPrefix + SuffixEncoding)
81+
o.Configuration.InitFromViper(KafkaConsumerConfigPrefix, v)
14782

83+
o.Encoding = v.GetString(KafkaConsumerConfigPrefix + SuffixEncoding)
14884
o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
14985
o.DeadlockInterval = v.GetDuration(ConfigPrefix + SuffixDeadlockInterval)
15086
}
151-
152-
// stripWhiteSpace removes all whitespace characters from a string
153-
func stripWhiteSpace(str string) string {
154-
return strings.Replace(str, " ", "", -1)
155-
}

cmd/ingester/app/flags_test.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package app
1616

1717
import (
18+
kafkaConfig "github.com/jaegertracing/jaeger/pkg/kafka/config"
19+
"github.com/jaegertracing/jaeger/pkg/kafka/consumer"
1820
"testing"
1921
"time"
2022

@@ -53,10 +55,10 @@ func TestFlagDefaults(t *testing.T) {
5355
command.ParseFlags([]string{})
5456
o.InitFromViper(v)
5557

56-
assert.Equal(t, DefaultTopic, o.Topic)
57-
assert.Equal(t, []string{DefaultBroker}, o.Brokers)
58-
assert.Equal(t, DefaultGroupID, o.GroupID)
59-
assert.Equal(t, DefaultClientID, o.ClientID)
58+
assert.Equal(t, consumer.DefaultTopic, o.Topic)
59+
assert.Equal(t, []string{kafkaConfig.DefaultBroker}, o.Brokers)
60+
assert.Equal(t, consumer.DefaultGroupID, o.GroupID)
61+
assert.Equal(t, consumer.DefaultClientID, o.ClientID)
6062
assert.Equal(t, DefaultParallelism, o.Parallelism)
6163
assert.Equal(t, DefaultEncoding, o.Encoding)
6264
assert.Equal(t, DefaultDeadlockInterval, o.DeadlockInterval)

pkg/es/config/.nocover

-1
This file was deleted.

pkg/kafka/config/.nocover

-1
This file was deleted.

pkg/kafka/config/config.go

+81-23
Original file line numberDiff line numberDiff line change
@@ -17,56 +17,114 @@ package config
1717
import (
1818
"crypto/tls"
1919
"crypto/x509"
20+
"flag"
21+
"github.com/pkg/errors"
22+
"github.com/spf13/viper"
2023
"io/ioutil"
24+
"strings"
25+
)
26+
27+
const (
28+
// SuffixBrokers is a suffix for the brokers flag
29+
SuffixBrokers = ".brokers"
30+
// SuffixTopic is a suffix for the topic flag
31+
SuffixTLS = ".tls"
32+
// SuffixCert is a suffix for the tls certificate path flag
33+
SuffixCert = ".tls.cert"
34+
// SuffixKey is a suffix for the tls key path flag
35+
SuffixKey = ".tls.key"
36+
// SuffixCA is a suffix for the tls ca path flag
37+
SuffixCA = ".tls.ca"
38+
39+
// DefaultBroker is the default kafka broker
40+
DefaultBroker = "127.0.0.1:9092"
41+
// DefaultTLS is the default for TLS enabled
42+
DefaultTLS = false
43+
// DefaultCAPath is the default for the TLS CA path
44+
DefaultCAPath = ""
45+
// DefaultCertPath is the default for the TLS Cert path
46+
DefaultCertPath = ""
47+
// DefaultKeyPath is the default for the TLS key path
48+
DefaultKeyPath = ""
2149
)
2250

2351
// Configuration describes the shared configuration options for Producers or Consumers
2452
type Configuration struct {
2553
Brokers []string
26-
TLS TLSConfig
54+
TLS TLSConfiguration
2755
}
2856

29-
// TLSConfig describes the configuration properties for TLS Connections to the Kafka Brokers
30-
type TLSConfig struct {
57+
// TLSConfiguration describes the configuration properties for TLS Connections to the Kafka Brokers
58+
type TLSConfiguration struct {
3159
Enabled bool
3260
CertPath string
3361
KeyPath string
3462
CaPath string
3563
}
3664

3765
// GetTLSConfig creates TLS Configuration
38-
func (tlsConfig *TLSConfig) GetTLSConfig() (*tls.Config, error) {
39-
rootCerts, err := tlsConfig.loadCertificate()
66+
func (tlsConfig TLSConfiguration) GetTLSConfig() (*tls.Config, error) {
67+
ca, err := loadCA(tlsConfig.CaPath)
4068
if err != nil {
41-
return nil, err
69+
return nil, errors.Wrapf(err, "error reading ca")
4270
}
43-
clientPrivateKey, err := tlsConfig.loadPrivateKey()
71+
72+
cert, err := tls.LoadX509KeyPair(tlsConfig.KeyPath, tlsConfig.CertPath)
4473
if err != nil {
45-
return nil, err
74+
return nil, errors.Wrap(err, "error loading certificate")
4675
}
76+
4777
return &tls.Config{
48-
RootCAs: rootCerts,
49-
Certificates: []tls.Certificate{*clientPrivateKey},
78+
RootCAs: ca,
79+
Certificates: []tls.Certificate{cert},
5080
}, nil
51-
5281
}
5382

54-
// loadCertificate is used to load root certification
55-
func (tlsConfig *TLSConfig) loadCertificate() (*x509.CertPool, error) {
56-
caCert, err := ioutil.ReadFile(tlsConfig.CaPath)
83+
func loadCA(caPath string) (*x509.CertPool, error) {
84+
caBytes, err := ioutil.ReadFile(caPath)
5785
if err != nil {
58-
return nil, err
86+
return nil, errors.Wrapf(err, "error reading caFile %s", caPath)
5987
}
6088
certificates := x509.NewCertPool()
61-
certificates.AppendCertsFromPEM(caCert)
89+
if ok := certificates.AppendCertsFromPEM(caBytes); !ok {
90+
return nil, errors.Errorf("no ca certificates could be parsed")
91+
}
6292
return certificates, nil
6393
}
6494

65-
// loadPrivateKey is used to load the private certificate and key for TLS
66-
func (tlsConfig *TLSConfig) loadPrivateKey() (*tls.Certificate, error) {
67-
privateKey, err := tls.LoadX509KeyPair(tlsConfig.CertPath, tlsConfig.KeyPath)
68-
if err != nil {
69-
return nil, err
70-
}
71-
return &privateKey, nil
95+
func AddFlags(prefix string, flagSet *flag.FlagSet) {
96+
flagSet.String(
97+
prefix+SuffixBrokers,
98+
DefaultBroker,
99+
"The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'")
100+
flagSet.Bool(
101+
prefix+SuffixTLS,
102+
DefaultTLS,
103+
"Enable TLS for the Kafka connection")
104+
flagSet.String(
105+
prefix+SuffixCA,
106+
DefaultCAPath,
107+
"Path to the TLS CA for the Kafka connection")
108+
flagSet.String(
109+
prefix+SuffixCert,
110+
DefaultCertPath,
111+
"Path to the TLS Certificate for the Kafka connection")
112+
flagSet.String(
113+
prefix+SuffixKey,
114+
DefaultKeyPath,
115+
"Path to the TLS Key for the Kafka connection")
116+
117+
}
118+
119+
func (c *Configuration) InitFromViper(prefix string, v *viper.Viper) {
120+
c.Brokers = strings.Split(stripWhiteSpace(v.GetString(prefix+SuffixBrokers)), ",")
121+
c.TLS.Enabled = v.GetBool(prefix + SuffixTLS)
122+
c.TLS.CaPath = v.GetString(prefix + SuffixCA)
123+
c.TLS.CertPath = v.GetString(prefix + SuffixCert)
124+
c.TLS.KeyPath = v.GetString(prefix + SuffixKey)
125+
}
126+
127+
// stripWhiteSpace removes all whitespace characters from a string
128+
func stripWhiteSpace(str string) string {
129+
return strings.Replace(str, " ", "", -1)
72130
}

pkg/kafka/config/config_test.go

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package config
2+
3+
import (
4+
"github.com/stretchr/testify/assert"
5+
"testing"
6+
)
7+
8+
const nonExistantFilepath = "/hope/that/this/does/not/exist"
9+
10+
func TestTLSConfiguration(t *testing.T) {
11+
_, err := TLSConfiguration{
12+
Enabled: true,
13+
CertPath: nonExistantFilepath,
14+
KeyPath: nonExistantFilepath,
15+
CaPath: nonExistantFilepath,
16+
}.GetTLSConfig()
17+
18+
assert.Error(t, err)
19+
}

pkg/kafka/consumer/config.go

+44-2
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,30 @@
1515
package consumer
1616

1717
import (
18+
"flag"
19+
"github.com/spf13/viper"
1820
"io"
1921

2022
"github.com/bsm/sarama-cluster"
2123
"github.com/pkg/errors"
2224

23-
"github.com/jaegertracing/jaeger/pkg/kafka/config"
25+
kafkaConfig "github.com/jaegertracing/jaeger/pkg/kafka/config"
26+
)
27+
28+
const (
29+
// SuffixTopic is a suffix for the topic flag
30+
SuffixTopic = ".topic"
31+
// SuffixGroupID is a suffix for the group-id flag
32+
SuffixGroupID = ".group-id"
33+
// SuffixClientID is a suffix for the client-id flag
34+
SuffixClientID = ".client-id"
35+
36+
// DefaultTopic is the default kafka topic
37+
DefaultTopic = "jaeger-spans"
38+
// DefaultGroupID is the default consumer Group ID
39+
DefaultGroupID = "jaeger-ingester"
40+
// DefaultClientID is the default consumer Client ID
41+
DefaultClientID = "jaeger-ingester"
2442
)
2543

2644
// Consumer is an interface to features of Sarama that are necessary for the consumer
@@ -37,7 +55,7 @@ type Builder interface {
3755

3856
// Configuration describes the configuration properties needed to create a Kafka consumer
3957
type Configuration struct {
40-
config.Configuration
58+
kafkaConfig.Configuration
4159
Topic string
4260
GroupID string
4361
ClientID string
@@ -59,3 +77,27 @@ func (c *Configuration) NewConsumer() (Consumer, error) {
5977
}
6078
return cluster.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig)
6179
}
80+
81+
func AddFlags(prefix string, flagSet *flag.FlagSet) {
82+
kafkaConfig.AddFlags(prefix, flagSet)
83+
flagSet.String(
84+
prefix+SuffixTopic,
85+
DefaultTopic,
86+
"The name of the kafka topic to consume from")
87+
flagSet.String(
88+
prefix+SuffixGroupID,
89+
DefaultGroupID,
90+
"The Consumer Group the consumer will use")
91+
flagSet.String(
92+
prefix+SuffixClientID,
93+
DefaultClientID,
94+
"The Consumer Client ID the consumer")
95+
}
96+
97+
func (c *Configuration) InitFromViper(prefix string, v *viper.Viper) {
98+
c.Configuration.InitFromViper(prefix, v)
99+
100+
c.Topic = v.GetString(prefix + SuffixTopic)
101+
c.GroupID = v.GetString(prefix + SuffixGroupID)
102+
c.ClientID = v.GetString(prefix + SuffixClientID)
103+
}

0 commit comments

Comments
 (0)