Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - Adding scram authentication for kafka #2110

Closed
wants to merge 5 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -67,6 +67,7 @@ require (
github.com/uber/jaeger-lib v2.4.0+incompatible
github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
go.mongodb.org/mongo-driver v1.3.2 // indirect
go.uber.org/atomic v1.6.0
go.uber.org/automaxprocs v1.3.0
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -349,6 +349,7 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
@@ -465,6 +466,7 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi
github.com/pierrec/lz4 v2.4.1+incompatible h1:mFe7ttWaflA46Mhqh+jUfjp2qTbPYxLB2/OyBppH9dg=
github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -525,6 +527,7 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
@@ -587,8 +590,10 @@ github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5 h1:Xim2mBRFdXzXmKRO
github.com/vektra/mockery v0.0.0-20181123154057-e78b021dcbb5/go.mod h1:ppEjwdhyy7Y31EnHRDm1JkChoC7LXIJ7Ex0VYLWtZtQ=
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad h1:W0LEBv82YCGEtcmPA3uNZBI33/qF//HAAs3MawDjRa0=
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad/go.mod h1:Hy8o65+MXnS6EwGElrSRjUzQDLXreJlzYLlWiHtt8hM=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
9 changes: 9 additions & 0 deletions pkg/kafka/auth/config.go
Original file line number Diff line number Diff line change
@@ -30,13 +30,15 @@ const (
kerberos = "kerberos"
tls = "tls"
plaintext = "plaintext"
scram = "scram"
)

var authTypes = []string{
none,
kerberos,
tls,
plaintext,
scram,
}

// AuthenticationConfig describes the configuration properties needed authenticate with kafka cluster
@@ -45,6 +47,7 @@ type AuthenticationConfig struct {
Kerberos KerberosConfig `mapstructure:"kerberos"`
TLS tlscfg.Options `mapstructure:"tls"`
PlainText PlainTextConfig `mapstructure:"plaintext"`
SCRAM ScramConfig `mapstructure:"scram"`
}

//SetConfiguration set configure authentication into sarama config structure
@@ -70,6 +73,8 @@ func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config
case plaintext:
setPlainTextConfiguration(&config.PlainText, saramaConfig)
return nil
case scram:
return setSCRAMConfiguration(&config.SCRAM, saramaConfig)
default:
return fmt.Errorf("Unknown/Unsupported authentication method %s to kafka cluster", config.Authentication)
}
@@ -99,4 +104,8 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper.

config.PlainText.UserName = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextUserName)
config.PlainText.Password = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextPassword)

config.SCRAM.UserName = v.GetString(configPrefix + scramPrefix + suffixScramUserName)
config.SCRAM.Password = v.GetString(configPrefix + scramPrefix + suffixScramPassword)
config.SCRAM.Algorithm = v.GetString(configPrefix + scramPrefix + suffixScramAlgorithm)
}
24 changes: 24 additions & 0 deletions pkg/kafka/auth/options.go
Original file line number Diff line number Diff line change
@@ -49,6 +49,14 @@ const (

defaultPlainTextUserName = ""
defaultPlainTextPassword = ""

// Scram configuration options
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a huge fan of the convention, but Go rules state that acronyms should be all in caps, so, SCRAM everywhere in the next few lines.

scramPrefix = ".scram"
suffixScramUserName = ".username"
suffixScramAlgorithm = ".algorithm"
suffixScramPassword = ".password"

defaultScramAlgorithm = "sha512"
)

func addKerberosFlags(configPrefix string, flagSet *flag.FlagSet) {
@@ -93,6 +101,21 @@ func addPlainTextFlags(configPrefix string, flagSet *flag.FlagSet) {
"The plaintext Password for SASL/PLAIN authentication")
}

func addScramFlags(configPrefix string, flagSet *flag.FlagSet) {
flagSet.String(
configPrefix+scramPrefix+suffixScramUserName,
"",
"Scram username used to authenticate with the client")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as well

flagSet.String(
configPrefix+scramPrefix+suffixScramPassword,
"",
"Scram password used to authenticat with the client")
flagSet.String(
configPrefix+scramPrefix+suffixScramAlgorithm,
defaultScramAlgorithm,
"Scram algorithm, 'sha256' or 'sha512'")
}

// AddFlags add configuration flags to a flagSet.
func AddFlags(configPrefix string, flagSet *flag.FlagSet) {
flagSet.String(
@@ -110,4 +133,5 @@ func AddFlags(configPrefix string, flagSet *flag.FlagSet) {
tlsClientConfig.AddFlags(flagSet)

addPlainTextFlags(configPrefix, flagSet)
addScramFlags(configPrefix, flagSet)
}
96 changes: 96 additions & 0 deletions pkg/kafka/auth/scram.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package auth

import (
"crypto/sha512"
"fmt"
"hash"

"github.com/Shopify/sarama"
scrampkg "github.com/xdg/scram"
)

// ScramConfig describes the configuration properties required for the SCRAM handshake
// between the collector and the pipeline
type ScramConfig struct {
UserName string `mapstructure:"username"`
Password string `mapstructure:"password"`
Algorithm string `mapstructure:"algorithm"`
}

// SetSCRAMConfiguration ...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... ? :-)

func setSCRAMConfiguration(config *ScramConfig, saramaConfig *sarama.Config) error {
var fn func() sarama.SCRAMClient

var mechanism sarama.SASLMechanism

switch config.Algorithm {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Matter of taste I guess, but I think the version from this PR more readable: open-telemetry/opentelemetry-collector#2322

case "sha512":
fn = func() sarama.SCRAMClient {
return &scramClient{
HashGeneratorFcn: func() hash.Hash { return sha512.New() },
}
}
mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
case "sha256":
fn = func() sarama.SCRAMClient {
return &scramClient{
HashGeneratorFcn: scrampkg.SHA256,
}
}
mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
default:
return fmt.Errorf("invalid SHA algorithm '%s': can be either 'sha256' or 'sha512'", config.Algorithm)
}
saramaConfig.Net.SASL.SCRAMClientGeneratorFunc = fn
saramaConfig.Net.SASL.Mechanism = mechanism
saramaConfig.Net.SASL.Enable = true
saramaConfig.Net.SASL.User = config.UserName
saramaConfig.Net.SASL.Password = config.Password

return nil
}

type scramClient struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment I had in the similar PR from OpenTelemetry: you should probably do a type assert here, ensuring that this is a sarama.SCRAMClient

*scrampkg.Client
*scrampkg.ClientConversation
scrampkg.HashGeneratorFcn
}

// Begin uses a username password and authid to generate a new client and instantiate a new conversation
func (client *scramClient) Begin(userName, password, authzID string) (err error) {
client.Client, err = client.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return fmt.Errorf("Begin method failed on: %s", err)
}
client.ClientConversation = client.Client.NewConversation()
return nil
}

// Step takes a challenge string
func (client *scramClient) Step(challenge string) (response string, err error) {
response, err = client.ClientConversation.Step(challenge)
if err != nil {
return "", fmt.Errorf("Step method failed on: %s", err)

}
return response, nil
}

// Done returns a bool based on a completed conversation
func (client *scramClient) Done() bool {
return client.ClientConversation.Done()
}
51 changes: 51 additions & 0 deletions pkg/kafka/auth/scram_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package auth_test

import (
"flag"
"testing"

"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/kafka/auth"
)

func TestSCRAMClientFlags(t *testing.T) {

addFlags := func(flagSet *flag.FlagSet) {
flagSet.String("--kafka.scram.username", "fakeuser", "")
flagSet.String("--kafka.scram.password", "fakepassword", "")
flagSet.String("--kafka.scram.algorithm", "sha256", "")
}

v, _ := config.Viperize(addFlags)

authCfg := auth.AuthenticationConfig{
Authentication: "scram",
}

authCfg.InitFromViper("--kafka", v)
// check to see if the configs are the same
assert.Equal(t, auth.ScramConfig{
UserName: "fakeuser",
Password: "fakepassword",
Algorithm: "sha256",
}, authCfg.SCRAM)
}

// testing Begin, Step, and Done require a network connection to test,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it hard to mock this?

// they otherwise will have a perpetual connection that cannot be closed