Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Closed & Outmoded]Add that support Kafka SASL/PLAIN authentication of SCRAM-SHA-256 or SCRAM-SHA-512 mechanism #2723

Closed
wants to merge 40 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
b712c9d
add metrics that show agent connection collector status
Nov 26, 2020
109617c
update comment
WalkerWang731 Nov 26, 2020
eee31fd
exec make fmt
WalkerWang731 Nov 26, 2020
82ed46e
simplify function and add testing relevant code in the builder_test.go
WalkerWang731 Nov 28, 2020
ff406aa
add comment in connect_metrics.go
WalkerWang731 Nov 28, 2020
c2a64f4
simplify code and changed use expvar to show target
WalkerWang731 Nov 30, 2020
1429e76
simplify code and changed use expvar to show target
WalkerWang731 Nov 30, 2020
dd68627
exec make fmt
WalkerWang731 Nov 30, 2020
83452e4
Fix collector panic due to sarama sdk returning nil error (#2654)
Betula-L Nov 26, 2020
68c0b29
Fix flaky tbuffered server test (#2635)
pkositsyn Nov 27, 2020
f651162
Add github actions for integration tests (#2649)
Ashmita152 Nov 27, 2020
8ee024e
Clean-up GH action names (#2661)
yurishkuro Nov 27, 2020
111efb7
Fix for failures in badger integration tests (#2660)
Ashmita152 Nov 27, 2020
e4da609
Add protogen validation test (#2662)
Ashmita152 Nov 27, 2020
9ad6033
Add github action for jaeger all-in-one image (#2663)
Ashmita152 Nov 29, 2020
faa61ac
Update comment that looks confusing during builds
yurishkuro Nov 29, 2020
71755cc
Use GitHub actions based build badges
yurishkuro Nov 29, 2020
0274fcf
Fix and minor improvements to all-in-one github action (#2667)
Ashmita152 Nov 30, 2020
c222b0d
Fix docker login issue with all-in-one build (#2668)
Ashmita152 Nov 30, 2020
01d1e9c
Fix issue with all-in-one build (#2669)
Ashmita152 Nov 30, 2020
fb1f57a
Update cmd/agent/app/reporter/connect_metrics.go
WalkerWang731 Nov 30, 2020
ba18453
Update cmd/agent/app/reporter/connect_metrics.go
WalkerWang731 Nov 30, 2020
68e6d76
simplify the code that remove ConnectMetricsParams{} and integrate Co…
WalkerWang731 Nov 30, 2020
ac84830
simplify the code that remove ConnectMetricsParams{} and integrate Co…
WalkerWang731 Nov 30, 2020
767623e
Merge branch 'master' into master
WalkerWang731 Nov 30, 2020
a1ff642
merage from the lastest master branch and exec make fmt
Nov 30, 2020
d07f8b8
add comment on ConnectMetrics
WalkerWang731 Nov 30, 2020
32c1e9b
Merge branch 'master' into master
WalkerWang731 Nov 30, 2020
512e61e
Merge branch 'master' into master
WalkerWang731 Dec 1, 2020
1222f7c
Merge branch 'master' into master
WalkerWang731 Dec 2, 2020
58113f7
Merge branch 'master' into master
WalkerWang731 Dec 2, 2020
a75e032
Merge branch 'master' into master
WalkerWang731 Dec 3, 2020
6d69e6e
Merge pull request #1 from jaegertracing/master
WalkerWang731 Jan 12, 2021
6d21a5d
added that suppot Kafka SASL/PLAIN authentication of SCRAM-SHA-256 or…
Jan 12, 2021
1731382
added that suppot Kafka SASL/PLAIN authentication of SCRAM-SHA-256 or…
Jan 12, 2021
4a741a0
Merge branch 'add_kafka_mechanism' of https://github.com/WalkerWang73…
WalkerWang731 Jan 12, 2021
c6f4451
add comment on XDGSCRAMClient
Jan 12, 2021
7b2052b
update TestTLSFlags() expected on options_test.go
WalkerWang731 Jan 12, 2021
81adbb2
sync #2721 plaintext as supported kafka auth option
WalkerWang731 Jan 12, 2021
1feee6f
update TestTLSFlags() expected on flags_test.go
WalkerWang731 Jan 12, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions cmd/agent/app/reporter/connect_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package reporter

import (
"time"

"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
)

type connectMetrics struct {
// used for reflect current connection stability
Reconnects metrics.Counter `metric:"collector_reconnects" help:"Number of successful connections (including reconnects) to the collector."`

// Connection status that jaeger-agent to jaeger-collector, 1 is connected, 0 is disconnected
Status metrics.Gauge `metric:"collector_connected" help:"Status of connection between the agent and the collector; 1 is connected, 0 is disconnected"`
}

// ConnectMetrics include connectMetrics necessary params if want to modify metrics of connectMetrics, must via ConnectMetrics API
type ConnectMetrics struct {
Logger *zap.Logger // required
MetricsFactory metrics.Factory // required
ExpireFrequency time.Duration
ExpireTTL time.Duration
connectMetrics *connectMetrics
}

// NewConnectMetrics will be initialize ConnectMetrics
func (r *ConnectMetrics) NewConnectMetrics() {
if r.ExpireFrequency == 0 {
r.ExpireFrequency = defaultExpireFrequency
}
if r.ExpireTTL == 0 {
r.ExpireTTL = defaultExpireTTL
}

r.connectMetrics = new(connectMetrics)
r.MetricsFactory = r.MetricsFactory.Namespace(metrics.NSOptions{Name: "connection_status"})
metrics.MustInit(r.connectMetrics, r.MetricsFactory, nil)
}

// OnConnectionStatusChange used for pass the status parameter when connection is changed
// 0 is disconnected, 1 is connected
// For quick view status via use `sum(jaeger_agent_connection_status_collector_connected{}) by (instance) > bool 0`
func (r *ConnectMetrics) OnConnectionStatusChange(connected bool) {
if connected {
r.connectMetrics.Status.Update(1)
r.connectMetrics.Reconnects.Inc(1)
} else {
r.connectMetrics.Status.Update(0)
}
}
81 changes: 81 additions & 0 deletions cmd/agent/app/reporter/connect_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package reporter

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/uber/jaeger-lib/metrics/metricstest"
)

type connectMetricsTest struct {
mf *metricstest.Factory
}

func testConnectMetrics(fn func(tr *connectMetricsTest, r *ConnectMetrics)) {
testConnectMetricsWithParams(&ConnectMetrics{}, fn)
}

func testConnectMetricsWithParams(cm *ConnectMetrics, fn func(tr *connectMetricsTest, r *ConnectMetrics)) {
mf := metricstest.NewFactory(time.Hour)
cm.MetricsFactory = mf
cm.NewConnectMetrics()

tr := &connectMetricsTest{
mf: mf,
}

fn(tr, cm)
}

func testCollectorConnected(r *ConnectMetrics) {
r.OnConnectionStatusChange(true)
}

func testCollectorAborted(r *ConnectMetrics) {
r.OnConnectionStatusChange(false)
}

func TestConnectMetrics(t *testing.T) {

testConnectMetrics(func(tr *connectMetricsTest, r *ConnectMetrics) {
getGauge := func() map[string]int64 {
_, gauges := tr.mf.Snapshot()
return gauges
}

getCount := func() map[string]int64 {
counts, _ := tr.mf.Snapshot()
return counts
}

// testing connect aborted
testCollectorAborted(r)
assert.EqualValues(t, 0, getGauge()["connection_status.collector_connected"])

// testing connect connected
testCollectorConnected(r)
assert.EqualValues(t, 1, getGauge()["connection_status.collector_connected"])
assert.EqualValues(t, 1, getCount()["connection_status.collector_reconnects"])

// testing reconnect counts
testCollectorAborted(r)
testCollectorConnected(r)
assert.EqualValues(t, 2, getCount()["connection_status.collector_reconnects"])

})
}
39 changes: 35 additions & 4 deletions cmd/agent/app/reporter/grpc/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@ package grpc
import (
"context"
"errors"
"expvar"
"fmt"
"strings"

grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"

"github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/discovery"
"github.com/jaegertracing/jaeger/pkg/discovery/grpcresolver"
Expand All @@ -43,6 +47,9 @@ type ConnBuilder struct {
DiscoveryMinPeers int
Notifier discovery.Notifier
Discoverer discovery.Discoverer

// for unit test and provide ConnectMetrics and outside call
ConnectMetrics *reporter.ConnectMetrics
}

// NewConnBuilder creates a new grpc connection builder.
Expand All @@ -51,7 +58,7 @@ func NewConnBuilder() *ConnBuilder {
}

// CreateConnection creates the gRPC connection
func (b *ConnBuilder) CreateConnection(logger *zap.Logger) (*grpc.ClientConn, error) {
func (b *ConnBuilder) CreateConnection(logger *zap.Logger, mFactory metrics.Factory) (*grpc.ClientConn, error) {
var dialOptions []grpc.DialOption
var dialTarget string
if b.TLS.Enabled { // user requested a secure connection
Expand Down Expand Up @@ -97,14 +104,38 @@ func (b *ConnBuilder) CreateConnection(logger *zap.Logger) (*grpc.ClientConn, er
return nil, err
}

go func(cc *grpc.ClientConn) {
if b.ConnectMetrics == nil {
cm := reporter.ConnectMetrics{
Logger: logger,
MetricsFactory: mFactory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"protocol": "grpc"}}),
}
cm.NewConnectMetrics()
b.ConnectMetrics = &cm
}

go func(cc *grpc.ClientConn, cm *reporter.ConnectMetrics) {
logger.Info("Checking connection to collector")
var egt *expvar.String
r := expvar.Get("gRPCTarget")
if r == nil {
egt = expvar.NewString("gRPCTarget")
} else {
egt = r.(*expvar.String)
}

for {
s := cc.GetState()
if s == connectivity.Ready {
cm.OnConnectionStatusChange(true)
egt.Set(cc.Target())
} else {
cm.OnConnectionStatusChange(false)
}

logger.Info("Agent collector connection state change", zap.String("dialTarget", dialTarget), zap.Stringer("status", s))
cc.WaitForStateChange(context.Background(), s)
}
}(conn)
}(conn, b.ConnectMetrics)

return conn, nil
}
33 changes: 30 additions & 3 deletions cmd/agent/app/reporter/grpc/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package grpc

import (
"context"
"expvar"
"net"
"strings"
"testing"
Expand All @@ -28,8 +29,9 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
yaml "gopkg.in/yaml.v2"
"gopkg.in/yaml.v2"

"github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/discovery"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
Expand All @@ -46,6 +48,10 @@ var testCertKeyLocation = "../../../../../pkg/config/tlscfg/testdata/"

type noopNotifier struct{}

type connectMetricsTest struct {
mf *metricstest.Factory
}

func (noopNotifier) Register(chan<- []string) {}

func (noopNotifier) Unregister(chan<- []string) {}
Expand All @@ -59,7 +65,7 @@ func TestBuilderFromConfig(t *testing.T) {
t,
[]string{"127.0.0.1:14268", "127.0.0.1:14269"},
cfg.CollectorHostPorts)
r, err := cfg.CreateConnection(zap.NewNop())
r, err := cfg.CreateConnection(zap.NewNop(), metrics.NullFactory)
require.NoError(t, err)
assert.NotNil(t, r)
}
Expand Down Expand Up @@ -149,7 +155,17 @@ func TestBuilderWithCollectors(t *testing.T) {
cfg.Notifier = test.notifier
cfg.Discoverer = test.discoverer

conn, err := cfg.CreateConnection(zap.NewNop())
mf := metricstest.NewFactory(time.Hour)
cm := reporter.ConnectMetrics{
MetricsFactory: mf,
}
cm.NewConnectMetrics()
tr := &connectMetricsTest{
mf: mf,
}
cfg.ConnectMetrics = &cm

conn, err := cfg.CreateConnection(zap.NewNop(), metrics.NullFactory)
if test.expectedError == "" {
require.NoError(t, err)
require.NotNil(t, conn)
Expand All @@ -161,10 +177,21 @@ func TestBuilderWithCollectors(t *testing.T) {
} else {
assert.True(t, conn.Target() == test.target)
}
if test.expectedState == "READY" {
counts, gauges := tr.mf.Snapshot()
assert.EqualValues(t, 1, gauges["connection_status.collector_connected"])
assert.EqualValues(t, 1, counts["connection_status.collector_reconnects"])
assert.Equal(t, test.target, expvar.Get("gRPCTarget").(*expvar.String).Value())
}
if test.expectedState == "TRANSIENT_FAILURE" {
_, gauges := tr.mf.Snapshot()
assert.EqualValues(t, 0, gauges["connection_status.collector_connected"])
}
} else {
require.Error(t, err)
assert.Contains(t, err.Error(), test.expectedError)
}

})
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/reporter/grpc/collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type ProxyBuilder struct {

// NewCollectorProxy creates ProxyBuilder
func NewCollectorProxy(builder *ConnBuilder, agentTags map[string]string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) {
conn, err := builder.CreateConnection(logger)
conn, err := builder.CreateConnection(logger, mFactory)
if err != nil {
return nil, err
}
Expand Down
11 changes: 6 additions & 5 deletions cmd/ingester/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,30 @@ func TestOptionsWithFlags(t *testing.T) {

func TestTLSFlags(t *testing.T) {
kerb := auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"}
plain := auth.PlainTextConfig{UserName: "", Password: "", Mechanism: "PLAIN"}
tests := []struct {
flags []string
expected auth.AuthenticationConfig
}{
{
flags: []string{},
expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb},
expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: kerb, PlainText: plain},
},
{
flags: []string{"--kafka.consumer.authentication=foo"},
expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb},
expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: kerb, PlainText: plain},
},
{
flags: []string{"--kafka.consumer.authentication=kerberos", "--kafka.consumer.tls.enabled=true"},
expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}},
expected: auth.AuthenticationConfig{Authentication: "kerberos", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain},
},
{
flags: []string{"--kafka.consumer.authentication=tls"},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain},
},
{
flags: []string{"--kafka.consumer.authentication=tls", "--kafka.consumer.tls.enabled=false"},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: kerb, TLS: tlscfg.Options{Enabled: true}, PlainText: plain},
},
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ require (
github.com/uber/jaeger-lib v2.4.0+incompatible
github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
go.mongodb.org/mongo-driver v1.3.2 // indirect
go.uber.org/atomic v1.6.0
go.uber.org/automaxprocs v1.3.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -587,8 +587,10 @@ github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5 h1:Xim2mBRFdXzXmKRO
github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5/go.mod h1:ppEjwdhyy7Y31EnHRDm1JkChoC7LXIJ7Ex0VYLWtZtQ=
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad h1:W0LEBv82YCGEtcmPA3uNZBI33/qF//HAAs3MawDjRa0=
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad/go.mod h1:Hy8o65+MXnS6EwGElrSRjUzQDLXreJlzYLlWiHtt8hM=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
Expand Down
6 changes: 5 additions & 1 deletion pkg/kafka/auth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config
setKerberosConfiguration(&config.Kerberos, saramaConfig)
return nil
case plaintext:
setPlainTextConfiguration(&config.PlainText, saramaConfig)
err := setPlainTextConfiguration(&config.PlainText, saramaConfig)
if err != nil {
return err
}
return nil
default:
return fmt.Errorf("Unknown/Unsupported authentication method %s to kafka cluster", config.Authentication)
Expand Down Expand Up @@ -99,4 +102,5 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper.

config.PlainText.UserName = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextUserName)
config.PlainText.Password = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextPassword)
config.PlainText.Mechanism = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextMechanism)
}
Loading