Skip to content

Commit 11f2a9f

Browse files
committed
[FAB-10229] Address comments from CR21683
Change-Id: Id5bfe0c5bc4eaf7041d0014687d73d729bffb652 Signed-off-by: Matthew Sykes <sykesmat@us.ibm.com>
1 parent 7400cc1 commit 11f2a9f

File tree

2 files changed

+20
-35
lines changed

2 files changed

+20
-35
lines changed

integration/runner/kafka.go

+14-26
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (k *Kafka) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error {
128128

129129
config := &docker.Config{
130130
Image: k.Image,
131-
Env: k.setEnv(),
131+
Env: k.buildEnv(),
132132
}
133133

134134
networkingConfig := &docker.NetworkingConfig{
@@ -201,33 +201,21 @@ func (k *Kafka) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error {
201201
}
202202
}
203203

204-
func (k *Kafka) setEnv() []string {
204+
func (k *Kafka) buildEnv() []string {
205205
env := []string{
206206
"KAFKA_LOG_RETENTION_MS=-1",
207-
fmt.Sprintf("KAFKA_MESSAGE_MAX_BYTES=%d",
208-
k.MessageMaxBytes),
209-
fmt.Sprintf("KAFKA_REPLICA_FETCH_MAX_BYTES=%d",
210-
k.ReplicaFetchMaxBytes),
211-
fmt.Sprintf("KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE=%s",
212-
strconv.FormatBool(k.UncleanLeaderElectionEnable)),
213-
fmt.Sprintf("KAFKA_DEFAULT_REPLICATION_FACTOR=%d",
214-
k.DefaultReplicationFactor),
215-
fmt.Sprintf("KAFKA_MIN_INSYNC_REPLICAS=%d",
216-
k.MinInsyncReplicas),
217-
fmt.Sprintf("KAFKA_BROKER_ID=%d",
218-
k.BrokerID),
219-
fmt.Sprintf("KAFKA_ZOOKEEPER_CONNECT=%s",
220-
k.ZookeeperConnect),
221-
fmt.Sprintf("KAFKA_REPLICA_FETCH_RESPONSE_MAX_BYTES=%d",
222-
k.ReplicaFetchResponseMaxBytes),
223-
fmt.Sprintf("KAFKA_ADVERTISED_LISTENERS=EXTERNAL://localhost:%d,%s://%s:9093",
224-
k.HostPort, k.NetworkName, k.Name),
225-
fmt.Sprintf("KAFKA_LISTENERS=EXTERNAL://0.0.0.0:9092,%s://0.0.0.0:9093",
226-
k.NetworkName),
227-
fmt.Sprintf("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=EXTERNAL:PLAINTEXT,%s:PLAINTEXT",
228-
k.NetworkName),
229-
fmt.Sprintf("KAFKA_INTER_BROKER_LISTENER_NAME=%s",
230-
k.NetworkName),
207+
fmt.Sprintf("KAFKA_MESSAGE_MAX_BYTES=%d", k.MessageMaxBytes),
208+
fmt.Sprintf("KAFKA_REPLICA_FETCH_MAX_BYTES=%d", k.ReplicaFetchMaxBytes),
209+
fmt.Sprintf("KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE=%s", strconv.FormatBool(k.UncleanLeaderElectionEnable)),
210+
fmt.Sprintf("KAFKA_DEFAULT_REPLICATION_FACTOR=%d", k.DefaultReplicationFactor),
211+
fmt.Sprintf("KAFKA_MIN_INSYNC_REPLICAS=%d", k.MinInsyncReplicas),
212+
fmt.Sprintf("KAFKA_BROKER_ID=%d", k.BrokerID),
213+
fmt.Sprintf("KAFKA_ZOOKEEPER_CONNECT=%s", k.ZookeeperConnect),
214+
fmt.Sprintf("KAFKA_REPLICA_FETCH_RESPONSE_MAX_BYTES=%d", k.ReplicaFetchResponseMaxBytes),
215+
fmt.Sprintf("KAFKA_ADVERTISED_LISTENERS=EXTERNAL://localhost:%d,%s://%s:9093", k.HostPort, k.NetworkName, k.Name),
216+
fmt.Sprintf("KAFKA_LISTENERS=EXTERNAL://0.0.0.0:9092,%s://0.0.0.0:9093", k.NetworkName),
217+
fmt.Sprintf("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=EXTERNAL:PLAINTEXT,%s:PLAINTEXT", k.NetworkName),
218+
fmt.Sprintf("KAFKA_INTER_BROKER_LISTENER_NAME=%s", k.NetworkName),
231219
}
232220
return env
233221
}

integration/runner/kafka_test.go

+6-9
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,9 @@ import (
2323

2424
var _ = Describe("Kafka Runner", func() {
2525
var (
26-
err error
27-
client *docker.Client
28-
network *docker.Network
29-
networkName string
26+
client *docker.Client
27+
network *docker.Network
3028

31-
errBuffer *gbytes.Buffer
3229
outBuffer *gbytes.Buffer
3330
kafka *runner.Kafka
3431
zookeeper *runner.Zookeeper
@@ -37,15 +34,15 @@ var _ = Describe("Kafka Runner", func() {
3734
)
3835

3936
BeforeEach(func() {
40-
errBuffer = gbytes.NewBuffer()
4137
outBuffer = gbytes.NewBuffer()
4238
process = nil
4339

40+
var err error
4441
client, err = docker.NewClientFromEnv()
4542
Expect(err).NotTo(HaveOccurred())
4643

4744
// Create a network
48-
networkName = runner.UniqueName()
45+
networkName := runner.UniqueName()
4946
network, err = client.CreateNetwork(
5047
docker.CreateNetworkOptions{
5148
Name: networkName,
@@ -65,7 +62,7 @@ var _ = Describe("Kafka Runner", func() {
6562

6663
kafka = &runner.Kafka{
6764
Name: "kafka1",
68-
ErrorStream: io.MultiWriter(errBuffer, GinkgoWriter),
65+
ErrorStream: GinkgoWriter,
6966
OutputStream: io.MultiWriter(outBuffer, GinkgoWriter),
7067
ZookeeperConnect: "zookeeper0:2181",
7168
BrokerID: 1,
@@ -83,7 +80,7 @@ var _ = Describe("Kafka Runner", func() {
8380
Expect(err).NotTo(HaveOccurred())
8481

8582
if network != nil {
86-
client.RemoveNetwork(networkName)
83+
client.RemoveNetwork(network.Name)
8784
}
8885
})
8986

0 commit comments

Comments
 (0)