Skip to content

Commit f00890a

Browse files
lhaskinssykesm
authored andcommitted
[FAB-9907] Add Zookeeper Runner
This adds the zookeeper runner to the suite. Note: this Runner uses docker containers for executing the binary. Change-Id: I090ef11e366da5049f917a8df1c5da2e2c61f0dd Signed-off-by: Latitia M Haskins <latitia.haskins@gmail.com>
1 parent e9b3d0b commit f00890a

File tree

3 files changed

+391
-0
lines changed

3 files changed

+391
-0
lines changed

integration/runner/zookeeper.go

+241
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package runner
8+
9+
import (
10+
"context"
11+
"fmt"
12+
"io"
13+
"net"
14+
"os"
15+
"sync"
16+
"time"
17+
18+
docker "github.com/fsouza/go-dockerclient"
19+
"github.com/pkg/errors"
20+
"github.com/tedsuo/ifrit"
21+
)
22+
23+
const ZookeeperDefaultImage = "hyperledger/fabric-zookeeper:latest"
24+
25+
type Zookeeper struct {
26+
Client *docker.Client
27+
Image string
28+
HostIP string
29+
HostPort []int
30+
ContainerPorts []docker.Port
31+
Name string
32+
StartTimeout time.Duration
33+
34+
NetworkID string
35+
NetworkName string
36+
ClientPort docker.Port
37+
LeaderPort docker.Port
38+
PeerPort docker.Port
39+
ZooMyID int
40+
ZooServers string
41+
42+
ErrorStream io.Writer
43+
OutputStream io.Writer
44+
45+
containerID string
46+
hostAddress string
47+
containerAddress string
48+
address string
49+
50+
mutex sync.Mutex
51+
stopped bool
52+
}
53+
54+
func (z *Zookeeper) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error {
55+
if z.Image == "" {
56+
z.Image = ZookeeperDefaultImage
57+
}
58+
59+
if z.Name == "" {
60+
z.Name = DefaultNamer()
61+
}
62+
63+
if z.HostIP == "" {
64+
z.HostIP = "127.0.0.1"
65+
}
66+
67+
if z.ContainerPorts == nil {
68+
if z.ClientPort == docker.Port("") {
69+
z.ClientPort = docker.Port("2181/tcp")
70+
}
71+
if z.LeaderPort == docker.Port("") {
72+
z.LeaderPort = docker.Port("3888/tcp")
73+
}
74+
if z.PeerPort == docker.Port("") {
75+
z.PeerPort = docker.Port("2888/tcp")
76+
}
77+
78+
z.ContainerPorts = []docker.Port{
79+
z.ClientPort,
80+
z.LeaderPort,
81+
z.PeerPort,
82+
}
83+
}
84+
85+
if z.StartTimeout == 0 {
86+
z.StartTimeout = DefaultStartTimeout
87+
}
88+
89+
if z.ZooMyID == 0 {
90+
z.ZooMyID = 1
91+
}
92+
93+
if z.Client == nil {
94+
client, err := docker.NewClientFromEnv()
95+
if err != nil {
96+
return err
97+
}
98+
z.Client = client
99+
}
100+
101+
config := &docker.Config{
102+
Image: z.Image,
103+
Env: []string{
104+
fmt.Sprintf("ZOO_MY_ID=%d", z.ZooMyID),
105+
fmt.Sprintf("ZOO_SERVERS=%s", z.ZooServers),
106+
},
107+
}
108+
109+
containerOptions := docker.CreateContainerOptions{
110+
Name: z.Name,
111+
HostConfig: &docker.HostConfig{
112+
AutoRemove: true,
113+
},
114+
Config: config,
115+
}
116+
117+
if z.NetworkName != "" && z.NetworkID != "" {
118+
networkingConfig := &docker.NetworkingConfig{
119+
EndpointsConfig: map[string]*docker.EndpointConfig{
120+
z.NetworkName: &docker.EndpointConfig{
121+
NetworkID: z.NetworkID,
122+
},
123+
},
124+
}
125+
126+
containerOptions.NetworkingConfig = networkingConfig
127+
}
128+
129+
container, err := z.Client.CreateContainer(containerOptions)
130+
if err != nil {
131+
return err
132+
}
133+
z.containerID = container.ID
134+
135+
err = z.Client.StartContainer(container.ID, nil)
136+
if err != nil {
137+
return err
138+
}
139+
defer z.Stop()
140+
141+
container, err = z.Client.InspectContainer(container.ID)
142+
if err != nil {
143+
return err
144+
}
145+
146+
z.containerAddress = net.JoinHostPort(
147+
container.NetworkSettings.IPAddress,
148+
z.ContainerPorts[0].Port(),
149+
)
150+
151+
streamCtx, streamCancel := context.WithCancel(context.Background())
152+
defer streamCancel()
153+
go z.streamLogs(streamCtx)
154+
155+
containerExit := z.wait()
156+
ctx, cancel := context.WithTimeout(context.Background(), z.StartTimeout)
157+
defer cancel()
158+
159+
select {
160+
case <-ctx.Done():
161+
return errors.Wrapf(ctx.Err(), "zookeeper in container %s did not start", z.containerID)
162+
case <-containerExit:
163+
return errors.New("container exited before ready")
164+
default:
165+
z.address = z.containerAddress
166+
}
167+
168+
close(ready)
169+
170+
select {
171+
case err := <-containerExit:
172+
return err
173+
case <-sigCh:
174+
return z.Stop()
175+
}
176+
}
177+
178+
func (z *Zookeeper) wait() <-chan error {
179+
exitCh := make(chan error)
180+
go func() {
181+
if _, err := z.Client.WaitContainer(z.containerID); err != nil {
182+
exitCh <- err
183+
}
184+
}()
185+
186+
return exitCh
187+
}
188+
189+
func (z *Zookeeper) streamLogs(ctx context.Context) error {
190+
if z.ErrorStream == nil && z.OutputStream == nil {
191+
return nil
192+
}
193+
194+
logOptions := docker.LogsOptions{
195+
Context: ctx,
196+
Container: z.ContainerID(),
197+
ErrorStream: z.ErrorStream,
198+
OutputStream: z.OutputStream,
199+
Stderr: z.ErrorStream != nil,
200+
Stdout: z.OutputStream != nil,
201+
Follow: true,
202+
}
203+
return z.Client.Logs(logOptions)
204+
}
205+
206+
func (z *Zookeeper) ContainerID() string {
207+
return z.containerID
208+
}
209+
210+
func (z *Zookeeper) ContainerAddress() string {
211+
return z.containerAddress
212+
}
213+
214+
func (z *Zookeeper) Start() error {
215+
p := ifrit.Invoke(z)
216+
217+
select {
218+
case <-p.Ready():
219+
return nil
220+
case err := <-p.Wait():
221+
return err
222+
}
223+
}
224+
225+
func (z *Zookeeper) Stop() error {
226+
z.mutex.Lock()
227+
if z.stopped {
228+
z.mutex.Unlock()
229+
return errors.Errorf("container %s already stopped", z.Name)
230+
}
231+
z.stopped = true
232+
z.mutex.Unlock()
233+
234+
err := z.Client.StopContainer(z.containerID, 0)
235+
if err != nil {
236+
return err
237+
}
238+
239+
_, err = z.Client.PruneVolumes(docker.PruneVolumesOptions{})
240+
return err
241+
}

integration/runner/zookeeper_test.go

+139
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package runner_test
8+
9+
import (
10+
"io"
11+
"io/ioutil"
12+
"net"
13+
"os"
14+
"syscall"
15+
"time"
16+
17+
"github.com/fsouza/go-dockerclient"
18+
"github.com/hyperledger/fabric/integration/runner"
19+
. "github.com/onsi/ginkgo"
20+
. "github.com/onsi/gomega"
21+
"github.com/onsi/gomega/gbytes"
22+
"github.com/tedsuo/ifrit"
23+
)
24+
25+
var _ = Describe("Zookeeper Runner", func() {
26+
var (
27+
errBuffer *gbytes.Buffer
28+
outBuffer *gbytes.Buffer
29+
zookeeper *runner.Zookeeper
30+
31+
process ifrit.Process
32+
)
33+
34+
BeforeEach(func() {
35+
client, err := docker.NewClientFromEnv()
36+
Expect(err).NotTo(HaveOccurred())
37+
38+
errBuffer = gbytes.NewBuffer()
39+
outBuffer = gbytes.NewBuffer()
40+
zookeeper = &runner.Zookeeper{
41+
Name: "zookeeper0",
42+
StartTimeout: time.Second,
43+
ErrorStream: io.MultiWriter(errBuffer, GinkgoWriter),
44+
OutputStream: io.MultiWriter(outBuffer, GinkgoWriter),
45+
Client: client,
46+
}
47+
48+
process = nil
49+
})
50+
51+
AfterEach(func() {
52+
if process != nil {
53+
process.Signal(syscall.SIGTERM)
54+
}
55+
tempDir, _ := ioutil.TempDir("", "zk-runner")
56+
os.RemoveAll(tempDir)
57+
})
58+
59+
It("starts and stops a docker container with the specified image", func() {
60+
By("using a real docker daemon")
61+
zookeeper.Client = nil
62+
zookeeper.StartTimeout = 5 * time.Second
63+
64+
By("starting zookeeper")
65+
process = ifrit.Invoke(zookeeper)
66+
Eventually(process.Ready(), runner.DefaultStartTimeout).Should(BeClosed())
67+
Consistently(process.Wait(), 5*time.Second).ShouldNot(Receive())
68+
69+
By("inspecting the container by name")
70+
container, err := zookeeper.Client.InspectContainer("zookeeper0")
71+
Expect(err).NotTo(HaveOccurred())
72+
73+
Expect(container.Name).To(Equal("/zookeeper0"))
74+
Expect(container.State.Status).To(Equal("running"))
75+
Expect(container.Config).NotTo(BeNil())
76+
Expect(container.Config.Image).To(Equal("hyperledger/fabric-zookeeper:latest"))
77+
Expect(container.ID).To(Equal(zookeeper.ContainerID()))
78+
79+
Expect(zookeeper.ContainerAddress()).To(Equal(net.JoinHostPort(container.NetworkSettings.IPAddress, "2181")))
80+
81+
By("getting the container logs")
82+
Eventually(errBuffer, 5*time.Second).Should(gbytes.Say(`Using config: /conf/zoo.cfg`))
83+
Eventually(outBuffer, 5*time.Second).Should(gbytes.Say(`binding to port 0.0.0.0/0.0.0.0:2181`))
84+
85+
By("terminating the container")
86+
err = zookeeper.Stop()
87+
Expect(err).NotTo(HaveOccurred())
88+
})
89+
90+
It("starts and stops multiple zookeepers", func() {
91+
client, err := docker.NewClientFromEnv()
92+
zk1 := &runner.Zookeeper{
93+
Name: "zookeeper1",
94+
ZooMyID: 1,
95+
ZooServers: "server.1=zookeeper1:2888:3888 server.2=zookeeper2:2888:3888 server.3=zookeeper3:2888:3888",
96+
StartTimeout: 5 * time.Second,
97+
Client: client,
98+
}
99+
err = zk1.Start()
100+
Expect(err).NotTo(HaveOccurred())
101+
102+
zk2 := &runner.Zookeeper{
103+
Name: "zookeeper2",
104+
ZooMyID: 2,
105+
ZooServers: "server.1=zookeeper1:2888:3888 server.2=zookeeper2:2888:3888 server.3=zookeeper3:2888:3888",
106+
StartTimeout: 5 * time.Second,
107+
Client: client,
108+
}
109+
err = zk2.Start()
110+
Expect(err).NotTo(HaveOccurred())
111+
112+
zk3 := &runner.Zookeeper{
113+
Name: "zookeeper3",
114+
ZooMyID: 3,
115+
ZooServers: "server.1=zookeeper1:2888:3888 server.2=zookeeper2:2888:3888 server.3=zookeeper3:2888:3888",
116+
StartTimeout: 5 * time.Second,
117+
Client: client,
118+
}
119+
err = zk3.Start()
120+
Expect(err).NotTo(HaveOccurred())
121+
122+
container, err := zk1.Client.InspectContainer("zookeeper1")
123+
Expect(container.Config.Env).To(ContainElement(ContainSubstring("ZOO_MY_ID=1")))
124+
Expect(container.Config.Env).To(ContainElement(ContainSubstring("ZOO_SERVERS=server.1=zookeeper1:2888:3888 server.2=zookeeper2:2888:3888 server.3=zookeeper3:2888:3888")))
125+
container, err = zk2.Client.InspectContainer("zookeeper2")
126+
Expect(container.Config.Env).To(ContainElement(ContainSubstring("ZOO_MY_ID=2")))
127+
Expect(container.Config.Env).To(ContainElement(ContainSubstring("ZOO_SERVERS=server.1=zookeeper1:2888:3888 server.2=zookeeper2:2888:3888 server.3=zookeeper3:2888:3888")))
128+
container, err = zk3.Client.InspectContainer("zookeeper3")
129+
Expect(container.Config.Env).To(ContainElement(ContainSubstring("ZOO_MY_ID=3")))
130+
Expect(container.Config.Env).To(ContainElement(ContainSubstring("ZOO_SERVERS=server.1=zookeeper1:2888:3888 server.2=zookeeper2:2888:3888 server.3=zookeeper3:2888:3888")))
131+
132+
err = zk3.Stop()
133+
Expect(err).NotTo(HaveOccurred())
134+
err = zk2.Stop()
135+
Expect(err).NotTo(HaveOccurred())
136+
err = zk1.Stop()
137+
Expect(err).NotTo(HaveOccurred())
138+
})
139+
})

0 commit comments

Comments
 (0)